cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
connection.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <cstddef>
23#include <functional>
24#include <memory>
25#include <optional>
26#include <string>
27#include <string_view>
28#include <system_error>
29#include <utility>
30
31#include <asio/buffer.hpp>
32#include <asio/error.hpp>
33#include <asio/error_code.hpp>
34#include <asio/post.hpp>
35#include <asio/write.hpp>
36#include <fmt/format.h>
37
51
52namespace msgpack_rpc::transport {
53
60template <typename AsioSocketType, typename ConcreteAddressType>
61class Connection : public IConnection,
62 public std::enable_shared_from_this<
63 Connection<AsioSocketType, ConcreteAddressType>> {
64public:
66 using AsioSocket = AsioSocketType;
67
69 using ConcreteAddress = ConcreteAddressType;
70
80 const config::MessageParserConfig& message_parser_config,
81 std::shared_ptr<logging::Logger> logger,
82 const std::shared_ptr<ConnectionList<Connection>>& connection_list =
83 nullptr)
84 : socket_(std::move(socket)),
85 message_parser_(message_parser_config),
86 local_address_(socket_.local_endpoint()),
87 remote_address_(socket_.remote_endpoint()),
88 log_name_(fmt::format("Connection(local={}, remote={})",
90 logger_(std::move(logger)),
91 connection_list_(connection_list) {}
92
93 Connection(const Connection&) = delete;
94 Connection(Connection&&) = delete;
95 Connection& operator=(const Connection&) = delete;
96 Connection& operator=(Connection&&) = delete;
97
101 ~Connection() override {
102 const auto connection_list = connection_list_.lock();
103 if (connection_list) {
104 connection_list->remove(this);
105 }
106 }
107
110 ConnectionClosedCallback on_closed) override {
111 state_machine_.handle_start_request();
112 // Only one thread can enter here.
113
114 on_received_ = std::move(on_received);
115 on_sent_ = std::move(on_sent);
116 on_closed_ = std::move(on_closed);
117
118 state_machine_.handle_processing_started();
119
120 asio::post(socket_.get_executor(),
121 [self = this->shared_from_this()] { self->async_read_next(); });
122 }
123
125 void async_send(const messages::SerializedMessage& message) override {
126 if (!state_machine_.is_processing()) {
127 MSGPACK_RPC_TRACE(logger_, "Not processing now.");
128 return;
129 }
130 asio::post(socket_.get_executor(),
131 [self = this->shared_from_this(), message]() {
132 self->async_send_in_thread(message);
133 });
134 }
135
137 void async_close() override {
138 asio::post(socket_.get_executor(), [self = this->shared_from_this()]() {
139 self->close_in_thread(Status());
140 });
141 }
142
144 [[nodiscard]] const addresses::IAddress& local_address()
145 const noexcept override {
146 return local_address_;
147 }
148
150 [[nodiscard]] const addresses::IAddress& remote_address()
151 const noexcept override {
152 return remote_address_;
153 }
154
155private:
160 const auto buffer = message_parser_.prepare_buffer();
161 socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()),
162 [self = this->shared_from_this()](const asio::error_code& error,
163 std::size_t size) { self->process_read_bytes(error, size); });
164 MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_);
165 }
166
173 void process_read_bytes(const asio::error_code& error, std::size_t size) {
174 if (error) {
175 if (error == asio::error::operation_aborted) {
176 return;
177 }
178 if (error == asio::error::eof ||
179 error == asio::error::connection_reset ||
180 error == asio::error::broken_pipe) {
182 logger_, "({}) Connection closed by peer.", log_name_);
183 state_machine_.handle_processing_stopped();
185 return;
186 }
187 const auto message = fmt::format(
188 "Error occurred when receiving data: {}", error.message());
189 MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
190 throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
191 }
192
193 MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size);
194 message_parser_.consumed(size);
195
196 while (true) {
197 std::optional<messages::ParsedMessage> message;
198 try {
199 message = message_parser_.try_parse();
200 } catch (const MsgpackRPCException& e) {
202 logger_, "({}) {}", log_name_, e.status().message());
204 return;
205 }
206 if (message) {
208 logger_, "({}) Received a message.", log_name_, size);
209 on_received_(std::move(*message));
210 message.reset();
211 } else {
213 "({}) More bytes are needed to parse a message.",
214 log_name_);
215 break;
216 }
217 }
218
219 if (!state_machine_.is_processing()) {
220 return;
221 }
223 }
224
231 asio::async_write(socket_,
232 asio::const_buffer(message.data(), message.size()),
233 [self = this->shared_from_this(), message](
234 const asio::error_code& error, std::size_t /*size*/) {
235 self->on_sent(error, message.size());
236 });
238 logger_, "({}) Sending {} bytes.", log_name_, message.size());
239 }
240
247 void on_sent(const asio::error_code& error, std::size_t size) {
248 if (error) {
249 if (error == asio::error::operation_aborted) {
250 return;
251 }
252 if (error == asio::error::eof ||
253 error == asio::error::connection_reset ||
254 error == asio::error::broken_pipe) {
256 logger_, "({}) Connection closed by peer.", log_name_);
257 state_machine_.handle_processing_stopped();
259 return;
260 }
261 const auto message = fmt::format(
262 "Error occurred when sending data: {}", error.message());
263 MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
264 throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
265 }
266
267 MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size);
268 on_sent_();
269 }
270
276 void close_in_thread(const Status& status) {
277 if (!state_machine_.handle_stop_requested()) {
278 return;
279 }
280 socket_.cancel();
281 socket_.shutdown(AsioSocket::shutdown_both);
282 socket_.close();
283 on_closed_(status);
284 MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_);
285 }
286
289
292
295
298
301
304
307
309 std::string log_name_;
310
312 std::shared_ptr<logging::Logger> logger_;
313
316
318 std::weak_ptr<ConnectionList<Connection>> connection_list_;
319};
320
321} // namespace msgpack_rpc::transport
Definition of BackgroundTaskStateMachine class.
Definition of BufferView class.
Interface of addresses.
Definition i_address.h:31
Class of exceptions in cpp-msgpack-rpc library.
const Status & status() const noexcept
Get the status.
Class of statuses.
Definition status.h:34
std::string_view message() const noexcept
Get the error message.
Definition status.cpp:76
Class of configuration of parsers of messages.
Class of serialized message data.
const char * data() const noexcept
Get the pointer to the data.
std::size_t size() const noexcept
Get the size of the data.
Class of state machines of background tasks in connections and acceptors.
Class of lists of connections.
void async_send_in_thread(const messages::SerializedMessage &message)
Asynchronously send a message in this thread.
Definition connection.h:230
~Connection() override
Destructor.
Definition connection.h:101
Connection(AsioSocket &&socket, const config::MessageParserConfig &message_parser_config, std::shared_ptr< logging::Logger > logger, const std::shared_ptr< ConnectionList< Connection > > &connection_list=nullptr)
Constructor.
Definition connection.h:79
const addresses::IAddress & local_address() const noexcept override
Get the address of the local endpoint.
Definition connection.h:144
void close_in_thread(const Status &status)
Close this connection in this thread.
Definition connection.h:276
void start(MessageReceivedCallback on_received, MessageSentCallback on_sent, ConnectionClosedCallback on_closed) override
Start process of this connection.
Definition connection.h:109
void on_sent(const asio::error_code &error, std::size_t size)
Definition connection.h:247
void async_send(const messages::SerializedMessage &message) override
Asynchronously send a message.
Definition connection.h:125
const addresses::IAddress & remote_address() const noexcept override
Get the address of the remote endpoint.
Definition connection.h:150
void async_close() override
Asynchronously close this connection.
Definition connection.h:137
std::weak_ptr< ConnectionList< Connection > > connection_list_
Definition connection.h:318
void async_read_next()
Asynchronously read next bytes.
Definition connection.h:159
void process_read_bytes(const asio::error_code &error, std::size_t size)
Process read bytes.
Definition connection.h:173
std::function< void(messages::ParsedMessage)> MessageReceivedCallback
Type of callback functions called when a message is received.
std::function< void(const Status &)> ConnectionClosedCallback
Type of callback functions called when a connection is closed.
std::function< void()> MessageSentCallback
Type of callback functions called when a message is successfully sent.
Definition of ConnectionList class.
Definition of IAddress.
Definition of IConnection class.
Definition of Logger class.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
#define MSGPACK_RPC_ERROR(LOGGER_PTR,...)
Write a error log.
Definition logger.h:222
Definition of MessageParser class.
Definition of MessageParserConfig class.
Definition of MsgpackRPCException class.
Namespace of fmt library.
Definition uri.h:113
Namespace of transport of messages.
Definition backends.h:31
STL namespace.
Definition of ParsedMessage type.
Definition of SerializedMessage class.
Definition of Status class.
Definition of StatusCode enumeration.