cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
message_sender.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <atomic>
23#include <memory>
24#include <optional>
25#include <utility>
26
33
35
40public:
47 MessageSender(std::weak_ptr<ClientConnector> connector,
48 std::shared_ptr<logging::Logger> logger)
49 : connector_(std::move(connector)), logger_(std::move(logger)) {}
50
58 std::optional<messages::MessageID> id = std::nullopt) {
59 sent_messages_.push(std::move(message), id);
60 send_next();
61 }
62
66 void send_next() {
67 if (is_sending_.load(std::memory_order_relaxed)) {
68 MSGPACK_RPC_TRACE(logger_, "Another message is being sent.");
69 return;
70 }
71 const auto connection = get_connection();
72 if (!connection) {
74 logger_, "No connection now, so wait for connection.");
75 return;
76 }
77 const auto [message, request_id] = sent_messages_.next();
78 if (!message) {
79 MSGPACK_RPC_TRACE(logger_, "No message to be sent for now.");
80 return;
81 }
82 if (is_sending_.exchange(true, std::memory_order_acquire)) {
83 MSGPACK_RPC_TRACE(logger_, "Another message is being sent.");
84 return;
85 }
86
87 connection->async_send(*message);
88 MSGPACK_RPC_TRACE(logger_, "Sending next message.");
89 }
90
95 MSGPACK_RPC_TRACE(logger_, "A message has been sent.");
96 sent_messages_.pop();
97 is_sending_.store(false, std::memory_order_release);
98 }
99
104 MSGPACK_RPC_TRACE(logger_, "Connection closed, so reconnecting.");
105 is_sending_.store(false, std::memory_order_release);
106 }
107
108private:
114 [[nodiscard]] std::shared_ptr<transport::IConnection> get_connection() {
115 const auto connector = connector_.lock();
116 if (!connector) {
117 return nullptr;
118 }
119 return connector->connection();
120 }
121
123 std::weak_ptr<ClientConnector> connector_;
124
126 std::shared_ptr<logging::Logger> logger_;
127
130
132 std::atomic<bool> is_sending_{false};
133};
134
135} // namespace msgpack_rpc::clients::impl
SentMessageQueue sent_messages_
Queue of messages to be sent.
void handle_sent_message()
Handle a sent message.
void send_next()
Send the next message if possible.
void handle_disconnection()
Handle disconnection.
void send(messages::SerializedMessage message, std::optional< messages::MessageID > id=std::nullopt)
Send a message.
MessageSender(std::weak_ptr< ClientConnector > connector, std::shared_ptr< logging::Logger > logger)
Constructor.
std::shared_ptr< transport::IConnection > get_connection()
Get the connection.
std::shared_ptr< logging::Logger > logger_
Logger.
std::weak_ptr< ClientConnector > connector_
Connector.
std::atomic< bool > is_sending_
Whether this client is sending a message.
Class of queues of messages to be sent.
Class of serialized message data.
Definition of ClientConnector class.
Definition of IConnection class.
Definition of Logger class.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
Definition of MessageID type.
Namespace of internal implementations.
STL namespace.
Definition of SentMessageQueue class.
Definition of SerializedMessage class.