cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
server_connection.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <atomic>
23#include <cassert>
24#include <memory>
25#include <mutex>
26#include <queue>
27#include <string>
28#include <type_traits>
29#include <utility>
30#include <variant>
31
45
46namespace msgpack_rpc::servers {
47
51class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
52public:
61 ServerConnection(const std::shared_ptr<transport::IConnection>& connection,
62 std::weak_ptr<executors::IExecutor> executor,
63 std::shared_ptr<methods::IMethodProcessor> processor,
64 std::shared_ptr<logging::Logger> logger)
65 : connection_(connection),
66 executor_(std::move(executor)),
67 processor_(std::move(processor)),
68 logger_(std::move(logger)),
69 formatted_remote_address_(connection->remote_address().to_string()) {}
70
74 void start() {
75 const auto connection = connection_.lock();
76 if (connection) {
77 connection->start(
78 [self = this->shared_from_this()](
80 self->on_received(std::move(message));
81 },
82 [self = this->shared_from_this()]() { self->on_sent(); },
83 [](const Status& /*status*/) {
84 // No operation when this connection is closed.
85 });
86 }
87 }
88
89private:
96 const auto executor = executor_.lock();
97 assert(executor);
98
99 std::visit(
100 [this, executor](auto&& concrete_message) {
101 if constexpr (std::is_same_v<messages::ParsedRequest,
102 std::decay_t<decltype(concrete_message)>>) {
105 [self = this->shared_from_this(),
106 // NOLINTNEXTLINE(bugprone-move-forwarding-reference): This actually always moves rvalue.
107 request = std::move(concrete_message)] {
108 self->on_request(request);
109 });
110 } else if constexpr (std::is_same_v<
112 std::decay_t<
113 decltype(concrete_message)>>) {
116 [self = this->shared_from_this(),
117 // NOLINTNEXTLINE(bugprone-move-forwarding-reference): This actually always moves rvalue.
118 notification = std::move(concrete_message)] {
119 self->on_notification(notification);
120 });
121 } else {
122 this->on_invalid_message();
123 }
124 },
125 std::move(message));
126 }
127
133 void on_request(const messages::ParsedRequest& request) {
134 MSGPACK_RPC_DEBUG(logger_, "{} request {} (id: {})",
135 formatted_remote_address_, request.method_name(), request.id());
136
137 auto serialized_response = processor_->call(request);
138
139 MSGPACK_RPC_DEBUG(logger_, "{} respond {} (id: {})",
140 formatted_remote_address_, request.method_name(), request.id());
141
142 {
143 std::unique_lock<std::mutex> lock(message_queue_mutex_);
144 message_queue_.push(std::move(serialized_response));
145 }
146
148 }
149
157 notification.method_name());
158
159 processor_->notify(notification);
160 }
161
167 "Unexpectedly received a response from {}, so close the "
168 "connection.",
170 const auto connection = connection_.lock();
171 if (connection) {
172 connection->async_close();
173 }
174 }
175
180 std::unique_lock<std::mutex> lock(message_queue_mutex_);
181 if (is_sending_.load(std::memory_order_acquire)) {
182 MSGPACK_RPC_TRACE(logger_, "Another message is being sent.");
183 return;
184 }
185 if (message_queue_.empty()) {
186 MSGPACK_RPC_TRACE(logger_, "No message to be sent for now.");
187 return;
188 }
189 const auto next_message = std::move(message_queue_.front());
190 message_queue_.pop();
191 is_sending_.store(true, std::memory_order_relaxed);
192 lock.unlock();
193
194 const auto connection = connection_.lock();
195 if (connection) {
196 MSGPACK_RPC_TRACE(logger_, "Sending next message.");
197 connection->async_send(next_message);
198 }
199 }
200
204 void on_sent() {
205 MSGPACK_RPC_TRACE(logger_, "A message has been sent.");
206 is_sending_.store(false, std::memory_order_release);
208 }
209
211 std::weak_ptr<transport::IConnection> connection_;
212
214 std::weak_ptr<executors::IExecutor> executor_;
215
217 std::shared_ptr<methods::IMethodProcessor> processor_;
218
220 std::shared_ptr<logging::Logger> logger_;
221
224
226 std::queue<messages::SerializedMessage> message_queue_{};
227
230
232 std::atomic<bool> is_sending_{false};
233};
234
235} // namespace msgpack_rpc::servers
Definition of async_invoke function.
Class of statuses.
Definition status.h:34
MethodNameView method_name() const noexcept
Get the method name.
MethodNameView method_name() const noexcept
Get the method name.
MessageID id() const noexcept
Get the message ID.
std::shared_ptr< logging::Logger > logger_
Logger.
void on_sent()
Handle the condition that a message is sent.
std::queue< messages::SerializedMessage > message_queue_
Messages to be sent.
std::weak_ptr< transport::IConnection > connection_
Connection.
std::atomic< bool > is_sending_
Whether this connection is sending a message.
void send_next_if_exists()
Send the next message if exists.
std::weak_ptr< executors::IExecutor > executor_
Executor.
std::string formatted_remote_address_
Formatted remote address for logging.
void on_invalid_message()
Handle an invalid message.
ServerConnection(const std::shared_ptr< transport::IConnection > &connection, std::weak_ptr< executors::IExecutor > executor, std::shared_ptr< methods::IMethodProcessor > processor, std::shared_ptr< logging::Logger > logger)
Constructor.
void on_notification(const messages::ParsedNotification &notification)
Process a notification.
void on_request(const messages::ParsedRequest &request)
Process a request.
void on_received(messages::ParsedMessage message)
Process a received message.
std::mutex message_queue_mutex_
Mutex of message_queue_.
std::shared_ptr< methods::IMethodProcessor > processor_
Processor of methods.
Definition of IAddress.
Definition of IConnection class.
Definition of IExecutor class.
Definition of IMethodProcessor class.
Definition of Logger class.
#define MSGPACK_RPC_DEBUG(LOGGER_PTR,...)
Write a debug log.
Definition logger.h:186
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
Definition of MethodNameView class.
@ CALLBACK
Execution of callbacks.
void async_invoke(const std::shared_ptr< IExecutor > &executor, OperationType type, Function &&function)
Asynchronously invoke a function.
std::variant< ParsedRequest, ParsedResponse, ParsedNotification > ParsedMessage
Type of parsed messages.
Namespace of servers.
STL namespace.
Definition of OperationType enumeration.
Definition of ParsedMessage type.
Definition of ParsedNotification class.
Definition of ParsedRequest class.
Definition of SerializedMessage class.
Definition of Status class.