/builds/MusicScience37Projects/utility-libraries/cpp-msgpack-rpc/src/msgpack_rpc/transport/connection.h
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2023 MusicScience37 (Kenta Kabashima) |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | /*! |
17 | | * \file |
18 | | * \brief Definition of Connection class. |
19 | | */ |
20 | | #pragma once |
21 | | |
22 | | #include <cstddef> |
23 | | #include <functional> |
24 | | #include <memory> |
25 | | #include <optional> |
26 | | #include <string> |
27 | | #include <string_view> |
28 | | #include <system_error> |
29 | | #include <utility> |
30 | | |
31 | | #include <asio/buffer.hpp> |
32 | | #include <asio/error.hpp> |
33 | | #include <asio/error_code.hpp> |
34 | | #include <asio/post.hpp> |
35 | | #include <asio/write.hpp> |
36 | | #include <fmt/format.h> |
37 | | |
38 | | #include "msgpack_rpc/addresses/i_address.h" |
39 | | #include "msgpack_rpc/common/msgpack_rpc_exception.h" |
40 | | #include "msgpack_rpc/common/status.h" |
41 | | #include "msgpack_rpc/common/status_code.h" |
42 | | #include "msgpack_rpc/config/message_parser_config.h" |
43 | | #include "msgpack_rpc/logging/logger.h" |
44 | | #include "msgpack_rpc/messages/buffer_view.h" |
45 | | #include "msgpack_rpc/messages/message_parser.h" |
46 | | #include "msgpack_rpc/messages/parsed_message.h" |
47 | | #include "msgpack_rpc/messages/serialized_message.h" |
48 | | #include "msgpack_rpc/transport/background_task_state_machine.h" |
49 | | #include "msgpack_rpc/transport/connection_list.h" |
50 | | #include "msgpack_rpc/transport/i_connection.h" |
51 | | |
52 | | namespace msgpack_rpc::transport { |
53 | | |
54 | | /*! |
55 | | * \brief Class of connections. |
56 | | * |
57 | | * \param AsioSocketType Type of sockets in asio library. |
58 | | * \param ConcreteAddressType Type of concrete addresses. |
59 | | */ |
60 | | template <typename AsioSocketType, typename ConcreteAddressType> |
61 | | class Connection : public IConnection, |
62 | | public std::enable_shared_from_this< |
63 | | Connection<AsioSocketType, ConcreteAddressType>> { |
64 | | public: |
65 | | //! Type of sockets in asio library. |
66 | | using AsioSocket = AsioSocketType; |
67 | | |
68 | | //! Type of concrete addresses. |
69 | | using ConcreteAddress = ConcreteAddressType; |
70 | | |
71 | | /*! |
72 | | * \brief Constructor. |
73 | | * |
74 | | * \param[in] socket Socket. |
75 | | * \param[in] message_parser_config Configuration of the parser of messages. |
76 | | * \param[in] logger Logger. |
77 | | * \param[in] connection_list List of connections. |
78 | | */ |
79 | | Connection(AsioSocket&& socket, |
80 | | const config::MessageParserConfig& message_parser_config, |
81 | | std::shared_ptr<logging::Logger> logger, |
82 | | const std::shared_ptr<ConnectionList<Connection>>& connection_list = |
83 | | nullptr) |
84 | 118 | : socket_(std::move(socket)), |
85 | 118 | message_parser_(message_parser_config), |
86 | 118 | local_address_(socket_.local_endpoint()), |
87 | 118 | remote_address_(socket_.remote_endpoint()), |
88 | 118 | log_name_(fmt::format("Connection(local={}, remote={})", |
89 | 118 | local_address_, remote_address_)), |
90 | 118 | logger_(std::move(logger)), |
91 | 118 | connection_list_(connection_list) {} _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEEC2EOS7_RKNS_6config19MessageParserConfigENSt3__110shared_ptrINS_7logging6LoggerEEERKNSH_INS0_14ConnectionListISA_EEEE Line | Count | Source | 84 | 63 | : socket_(std::move(socket)), | 85 | 63 | message_parser_(message_parser_config), | 86 | 63 | local_address_(socket_.local_endpoint()), | 87 | 63 | remote_address_(socket_.remote_endpoint()), | 88 | 63 | log_name_(fmt::format("Connection(local={}, remote={})", | 89 | 63 | local_address_, remote_address_)), | 90 | 63 | logger_(std::move(logger)), | 91 | 63 | connection_list_(connection_list) {} |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEEC2EOS7_RKNS_6config19MessageParserConfigENSt3__110shared_ptrINS_7logging6LoggerEEERKNSH_INS0_14ConnectionListISA_EEEE Line | Count | Source | 84 | 55 | : socket_(std::move(socket)), | 85 | 55 | message_parser_(message_parser_config), | 86 | 55 | local_address_(socket_.local_endpoint()), | 87 | 55 | remote_address_(socket_.remote_endpoint()), | 88 | 55 | log_name_(fmt::format("Connection(local={}, remote={})", | 89 | 55 | local_address_, remote_address_)), | 90 | 55 | logger_(std::move(logger)), | 91 | 55 | connection_list_(connection_list) {} |
|
92 | | |
93 | | Connection(const Connection&) = delete; |
94 | | Connection(Connection&&) = delete; |
95 | | Connection& operator=(const Connection&) = delete; |
96 | | Connection& operator=(Connection&&) = delete; |
97 | | |
98 | | /*! |
99 | | * \brief Destructor. |
100 | | */ |
101 | 118 | ~Connection() override { |
102 | 118 | const auto connection_list = connection_list_.lock(); |
103 | 118 | if (connection_list) { |
104 | 50 | connection_list->remove(this); |
105 | 50 | } |
106 | 118 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEED2Ev Line | Count | Source | 101 | 63 | ~Connection() override { | 102 | 63 | const auto connection_list = connection_list_.lock(); | 103 | 63 | if (connection_list) { | 104 | 27 | connection_list->remove(this); | 105 | 27 | } | 106 | 63 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEED2Ev Line | Count | Source | 101 | 55 | ~Connection() override { | 102 | 55 | const auto connection_list = connection_list_.lock(); | 103 | 55 | if (connection_list) { | 104 | 23 | connection_list->remove(this); | 105 | 23 | } | 106 | 55 | } |
|
107 | | |
108 | | //! \copydoc msgpack_rpc::transport::IConnection::start |
109 | | void start(MessageReceivedCallback on_received, MessageSentCallback on_sent, |
110 | 116 | ConnectionClosedCallback on_closed) override { |
111 | 116 | state_machine_.handle_start_request(); |
112 | | // Only one thread can enter here. |
113 | | |
114 | 116 | on_received_ = std::move(on_received); |
115 | 116 | on_sent_ = std::move(on_sent); |
116 | 116 | on_closed_ = std::move(on_closed); |
117 | | |
118 | 116 | state_machine_.handle_processing_started(); |
119 | | |
120 | 116 | asio::post(socket_.get_executor(), |
121 | 116 | [self = this->shared_from_this()] { self->async_read_next(); }); _ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEEENKUlvE_clEv Line | Count | Source | 121 | 60 | [self = this->shared_from_this()] { self->async_read_next(); }); |
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEEENKUlvE_clEv Line | Count | Source | 121 | 54 | [self = this->shared_from_this()] { self->async_read_next(); }); |
|
122 | 116 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEE Line | Count | Source | 110 | 62 | ConnectionClosedCallback on_closed) override { | 111 | 62 | state_machine_.handle_start_request(); | 112 | | // Only one thread can enter here. | 113 | | | 114 | 62 | on_received_ = std::move(on_received); | 115 | 62 | on_sent_ = std::move(on_sent); | 116 | 62 | on_closed_ = std::move(on_closed); | 117 | | | 118 | 62 | state_machine_.handle_processing_started(); | 119 | | | 120 | 62 | asio::post(socket_.get_executor(), | 121 | 62 | [self = this->shared_from_this()] { self->async_read_next(); }); | 122 | 62 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEE Line | Count | Source | 110 | 54 | ConnectionClosedCallback on_closed) override { | 111 | 54 | state_machine_.handle_start_request(); | 112 | | // Only one thread can enter here. | 113 | | | 114 | 54 | on_received_ = std::move(on_received); | 115 | 54 | on_sent_ = std::move(on_sent); | 116 | 54 | on_closed_ = std::move(on_closed); | 117 | | | 118 | 54 | state_machine_.handle_processing_started(); | 119 | | | 120 | 54 | asio::post(socket_.get_executor(), | 121 | 54 | [self = this->shared_from_this()] { self->async_read_next(); }); | 122 | 54 | } |
|
123 | | |
124 | | //! \copydoc msgpack_rpc::transport::IConnection::async_send |
125 | 501 | void async_send(const messages::SerializedMessage& message) override { |
126 | 501 | if (!state_machine_.is_processing()) { |
127 | 0 | MSGPACK_RPC_TRACE(logger_, "Not processing now."); |
128 | 0 | return; |
129 | 0 | } |
130 | 501 | asio::post(socket_.get_executor(), |
131 | 501 | [self = this->shared_from_this(), message]() { |
132 | 501 | self->async_send_in_thread(message); |
133 | 501 | }); _ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE10async_sendERKNS_8messages17SerializedMessageEENKUlvE_clEv Line | Count | Source | 131 | 251 | [self = this->shared_from_this(), message]() { | 132 | 251 | self->async_send_in_thread(message); | 133 | 251 | }); |
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE10async_sendERKNS_8messages17SerializedMessageEENKUlvE_clEv Line | Count | Source | 131 | 250 | [self = this->shared_from_this(), message]() { | 132 | 250 | self->async_send_in_thread(message); | 133 | 250 | }); |
|
134 | 501 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE10async_sendERKNS_8messages17SerializedMessageE Line | Count | Source | 125 | 251 | void async_send(const messages::SerializedMessage& message) override { | 126 | 251 | if (!state_machine_.is_processing()) { | 127 | 0 | MSGPACK_RPC_TRACE(logger_, "Not processing now."); | 128 | 0 | return; | 129 | 0 | } | 130 | 251 | asio::post(socket_.get_executor(), | 131 | 251 | [self = this->shared_from_this(), message]() { | 132 | 251 | self->async_send_in_thread(message); | 133 | 251 | }); | 134 | 251 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE10async_sendERKNS_8messages17SerializedMessageE Line | Count | Source | 125 | 250 | void async_send(const messages::SerializedMessage& message) override { | 126 | 250 | if (!state_machine_.is_processing()) { | 127 | 0 | MSGPACK_RPC_TRACE(logger_, "Not processing now."); | 128 | 0 | return; | 129 | 0 | } | 130 | 250 | asio::post(socket_.get_executor(), | 131 | 250 | [self = this->shared_from_this(), message]() { | 132 | 250 | self->async_send_in_thread(message); | 133 | 250 | }); | 134 | 250 | } |
|
135 | | |
136 | | //! \copydoc msgpack_rpc::transport::IConnection::async_close |
137 | 60 | void async_close() override { |
138 | 60 | asio::post(socket_.get_executor(), [self = this->shared_from_this()]() { |
139 | 59 | self->close_in_thread(Status()); |
140 | 59 | }); _ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE11async_closeEvENKUlvE_clEv Line | Count | Source | 138 | 31 | asio::post(socket_.get_executor(), [self = this->shared_from_this()]() { | 139 | 31 | self->close_in_thread(Status()); | 140 | 31 | }); |
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE11async_closeEvENKUlvE_clEv Line | Count | Source | 138 | 28 | asio::post(socket_.get_executor(), [self = this->shared_from_this()]() { | 139 | 28 | self->close_in_thread(Status()); | 140 | 28 | }); |
|
141 | 60 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE11async_closeEv Line | Count | Source | 137 | 32 | void async_close() override { | 138 | 32 | asio::post(socket_.get_executor(), [self = this->shared_from_this()]() { | 139 | 32 | self->close_in_thread(Status()); | 140 | 32 | }); | 141 | 32 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE11async_closeEv Line | Count | Source | 137 | 28 | void async_close() override { | 138 | 28 | asio::post(socket_.get_executor(), [self = this->shared_from_this()]() { | 139 | 28 | self->close_in_thread(Status()); | 140 | 28 | }); | 141 | 28 | } |
|
142 | | |
143 | | //! \copydoc msgpack_rpc::transport::IConnection::local_address |
144 | | [[nodiscard]] const addresses::IAddress& local_address() |
145 | 6 | const noexcept override { |
146 | 6 | return local_address_; |
147 | 6 | } _ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE13local_addressEv Line | Count | Source | 145 | 3 | const noexcept override { | 146 | 3 | return local_address_; | 147 | 3 | } |
_ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE13local_addressEv Line | Count | Source | 145 | 3 | const noexcept override { | 146 | 3 | return local_address_; | 147 | 3 | } |
|
148 | | |
149 | | //! \copydoc msgpack_rpc::transport::IConnection::remote_address |
150 | | [[nodiscard]] const addresses::IAddress& remote_address() |
151 | 69 | const noexcept override { |
152 | 69 | return remote_address_; |
153 | 69 | } _ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE14remote_addressEv Line | Count | Source | 151 | 38 | const noexcept override { | 152 | 38 | return remote_address_; | 153 | 38 | } |
_ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE14remote_addressEv Line | Count | Source | 151 | 31 | const noexcept override { | 152 | 31 | return remote_address_; | 153 | 31 | } |
|
154 | | |
155 | | private: |
156 | | /*! |
157 | | * \brief Asynchronously read next bytes. |
158 | | */ |
159 | 362 | void async_read_next() { |
160 | 362 | const auto buffer = message_parser_.prepare_buffer(); |
161 | 362 | socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()), |
162 | 362 | [self = this->shared_from_this()](const asio::error_code& error, |
163 | 362 | std::size_t size) { self->process_read_bytes(error, size); }); _ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE15async_read_nextEvENKUlRKNSt3__110error_codeEmE_clESE_m Line | Count | Source | 163 | 167 | std::size_t size) { self->process_read_bytes(error, size); }); |
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE15async_read_nextEvENKUlRKNSt3__110error_codeEmE_clESE_m Line | Count | Source | 163 | 194 | std::size_t size) { self->process_read_bytes(error, size); }); |
|
164 | 362 | MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_); |
165 | 362 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE15async_read_nextEv Line | Count | Source | 159 | 168 | void async_read_next() { | 160 | 168 | const auto buffer = message_parser_.prepare_buffer(); | 161 | 168 | socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()), | 162 | 168 | [self = this->shared_from_this()](const asio::error_code& error, | 163 | 168 | std::size_t size) { self->process_read_bytes(error, size); }); | 164 | 168 | MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_); | 165 | 168 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE15async_read_nextEv Line | Count | Source | 159 | 194 | void async_read_next() { | 160 | 194 | const auto buffer = message_parser_.prepare_buffer(); | 161 | 194 | socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()), | 162 | 194 | [self = this->shared_from_this()](const asio::error_code& error, | 163 | 194 | std::size_t size) { self->process_read_bytes(error, size); }); | 164 | 194 | MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_); | 165 | 194 | } |
|
166 | | |
167 | | /*! |
168 | | * \brief Process read bytes. |
169 | | * |
170 | | * \param[in] error Error. |
171 | | * \param[in] size Number of bytes read to the buffer. |
172 | | */ |
173 | 360 | void process_read_bytes(const asio::error_code& error, std::size_t size) { |
174 | 360 | if (error) { |
175 | 112 | if (error == asio::error::operation_aborted) { |
176 | 48 | return; |
177 | 48 | } |
178 | 64 | if (error == asio::error::eof || |
179 | 64 | error == asio::error::connection_reset || |
180 | 65 | error == asio::error::broken_pipe) { |
181 | 65 | MSGPACK_RPC_TRACE( |
182 | 65 | logger_, "({}) Connection closed by peer.", log_name_); |
183 | 65 | state_machine_.handle_processing_stopped(); |
184 | 65 | on_closed_(Status()); |
185 | 65 | return; |
186 | 65 | } |
187 | 18.4E | const auto message = fmt::format( |
188 | 18.4E | "Error occurred when receiving data: {}", error.message()); |
189 | 18.4E | MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message); |
190 | 18.4E | throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message); |
191 | 64 | } |
192 | | |
193 | 248 | MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size); |
194 | 248 | message_parser_.consumed(size); |
195 | | |
196 | 749 | while (true) { |
197 | 749 | std::optional<messages::ParsedMessage> message; |
198 | 749 | try { |
199 | 749 | message = message_parser_.try_parse(); |
200 | 749 | } catch (const MsgpackRPCException& e) { |
201 | 0 | MSGPACK_RPC_ERROR( |
202 | 0 | logger_, "({}) {}", log_name_, e.status().message()); |
203 | 0 | close_in_thread(e.status()); |
204 | 0 | return; |
205 | 0 | } |
206 | 749 | if (message) { |
207 | 501 | MSGPACK_RPC_TRACE( |
208 | 501 | logger_, "({}) Received a message.", log_name_, size); |
209 | 501 | on_received_(std::move(*message)); |
210 | 501 | message.reset(); |
211 | 501 | } else { |
212 | 248 | MSGPACK_RPC_TRACE(logger_, |
213 | 248 | "({}) More bytes are needed to parse a message.", |
214 | 248 | log_name_); |
215 | 248 | break; |
216 | 248 | } |
217 | 749 | } |
218 | | |
219 | 248 | if (!state_machine_.is_processing()) { |
220 | 0 | return; |
221 | 0 | } |
222 | 248 | async_read_next(); |
223 | 248 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE18process_read_bytesERKNSt3__110error_codeEm Line | Count | Source | 173 | 167 | void process_read_bytes(const asio::error_code& error, std::size_t size) { | 174 | 167 | if (error) { | 175 | 59 | if (error == asio::error::operation_aborted) { | 176 | 24 | return; | 177 | 24 | } | 178 | 35 | if (error == asio::error::eof || | 179 | 35 | error == asio::error::connection_reset || | 180 | 35 | error == asio::error::broken_pipe) { | 181 | 35 | MSGPACK_RPC_TRACE( | 182 | 35 | logger_, "({}) Connection closed by peer.", log_name_); | 183 | 35 | state_machine_.handle_processing_stopped(); | 184 | 35 | on_closed_(Status()); | 185 | 35 | return; | 186 | 35 | } | 187 | 0 | const auto message = fmt::format( | 188 | 0 | "Error occurred when receiving data: {}", error.message()); | 189 | 0 | MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message); | 190 | 0 | throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message); | 191 | 35 | } | 192 | | | 193 | 108 | MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size); | 194 | 108 | message_parser_.consumed(size); | 195 | | | 196 | 359 | while (true) { | 197 | 359 | std::optional<messages::ParsedMessage> message; | 198 | 359 | try { | 199 | 359 | message = message_parser_.try_parse(); | 200 | 359 | } catch (const MsgpackRPCException& e) { | 201 | 0 | MSGPACK_RPC_ERROR( | 202 | 0 | logger_, "({}) {}", log_name_, e.status().message()); | 203 | 0 | close_in_thread(e.status()); | 204 | 0 | return; | 205 | 0 | } | 206 | 359 | if (message) { | 207 | 251 | MSGPACK_RPC_TRACE( | 208 | 251 | logger_, "({}) Received a message.", log_name_, size); | 209 | 251 | on_received_(std::move(*message)); | 210 | 251 | message.reset(); | 211 | 251 | } else { | 212 | 108 | MSGPACK_RPC_TRACE(logger_, | 213 | 108 | "({}) More bytes are needed to parse a message.", | 214 | 108 | log_name_); | 215 | 108 | break; | 216 | 108 | } | 217 | 359 | } | 218 | | | 219 | 108 | if (!state_machine_.is_processing()) { | 220 | 0 | return; | 221 | 0 | } | 222 | 108 | async_read_next(); | 223 | 108 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE18process_read_bytesERKNSt3__110error_codeEm Line | Count | Source | 173 | 193 | void process_read_bytes(const asio::error_code& error, std::size_t size) { | 174 | 193 | if (error) { | 175 | 53 | if (error == asio::error::operation_aborted) { | 176 | 24 | return; | 177 | 24 | } | 178 | 29 | if (error == asio::error::eof || | 179 | 29 | error == asio::error::connection_reset || | 180 | 30 | error == asio::error::broken_pipe) { | 181 | 30 | MSGPACK_RPC_TRACE( | 182 | 30 | logger_, "({}) Connection closed by peer.", log_name_); | 183 | 30 | state_machine_.handle_processing_stopped(); | 184 | 30 | on_closed_(Status()); | 185 | 30 | return; | 186 | 30 | } | 187 | 18.4E | const auto message = fmt::format( | 188 | 18.4E | "Error occurred when receiving data: {}", error.message()); | 189 | 18.4E | MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message); | 190 | 18.4E | throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message); | 191 | 29 | } | 192 | | | 193 | 140 | MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size); | 194 | 140 | message_parser_.consumed(size); | 195 | | | 196 | 390 | while (true) { | 197 | 390 | std::optional<messages::ParsedMessage> message; | 198 | 390 | try { | 199 | 390 | message = message_parser_.try_parse(); | 200 | 390 | } catch (const MsgpackRPCException& e) { | 201 | 0 | MSGPACK_RPC_ERROR( | 202 | 0 | logger_, "({}) {}", log_name_, e.status().message()); | 203 | 0 | close_in_thread(e.status()); | 204 | 0 | return; | 205 | 0 | } | 206 | 390 | if (message) { | 207 | 250 | MSGPACK_RPC_TRACE( | 208 | 250 | logger_, "({}) Received a message.", log_name_, size); | 209 | 250 | on_received_(std::move(*message)); | 210 | 250 | message.reset(); | 211 | 250 | } else { | 212 | 140 | MSGPACK_RPC_TRACE(logger_, | 213 | 140 | "({}) More bytes are needed to parse a message.", | 214 | 140 | log_name_); | 215 | 140 | break; | 216 | 140 | } | 217 | 390 | } | 218 | | | 219 | 140 | if (!state_machine_.is_processing()) { | 220 | 0 | return; | 221 | 0 | } | 222 | 140 | async_read_next(); | 223 | 140 | } |
|
224 | | |
225 | | /*! |
226 | | * \brief Asynchronously send a message in this thread. |
227 | | * |
228 | | * \param[in] message Message. |
229 | | */ |
230 | 501 | void async_send_in_thread(const messages::SerializedMessage& message) { |
231 | 501 | asio::async_write(socket_, |
232 | 501 | asio::const_buffer(message.data(), message.size()), |
233 | 501 | [self = this->shared_from_this(), message]( |
234 | 501 | const asio::error_code& error, std::size_t /*size*/) { |
235 | 501 | self->on_sent(error, message.size()); |
236 | 501 | }); _ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageEENKUlRKNSt3__110error_codeEmE_clESI_m Line | Count | Source | 234 | 251 | const asio::error_code& error, std::size_t /*size*/) { | 235 | 251 | self->on_sent(error, message.size()); | 236 | 251 | }); |
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageEENKUlRKNSt3__110error_codeEmE_clESI_m Line | Count | Source | 234 | 250 | const asio::error_code& error, std::size_t /*size*/) { | 235 | 250 | self->on_sent(error, message.size()); | 236 | 250 | }); |
|
237 | 501 | MSGPACK_RPC_TRACE( |
238 | 501 | logger_, "({}) Sending {} bytes.", log_name_, message.size()); |
239 | 501 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageE Line | Count | Source | 230 | 251 | void async_send_in_thread(const messages::SerializedMessage& message) { | 231 | 251 | asio::async_write(socket_, | 232 | 251 | asio::const_buffer(message.data(), message.size()), | 233 | 251 | [self = this->shared_from_this(), message]( | 234 | 251 | const asio::error_code& error, std::size_t /*size*/) { | 235 | 251 | self->on_sent(error, message.size()); | 236 | 251 | }); | 237 | 251 | MSGPACK_RPC_TRACE( | 238 | 251 | logger_, "({}) Sending {} bytes.", log_name_, message.size()); | 239 | 251 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageE Line | Count | Source | 230 | 250 | void async_send_in_thread(const messages::SerializedMessage& message) { | 231 | 250 | asio::async_write(socket_, | 232 | 250 | asio::const_buffer(message.data(), message.size()), | 233 | 250 | [self = this->shared_from_this(), message]( | 234 | 250 | const asio::error_code& error, std::size_t /*size*/) { | 235 | 250 | self->on_sent(error, message.size()); | 236 | 250 | }); | 237 | 250 | MSGPACK_RPC_TRACE( | 238 | 250 | logger_, "({}) Sending {} bytes.", log_name_, message.size()); | 239 | 250 | } |
|
240 | | |
241 | | /*! |
242 | | * \brief Handle the result of send operation. |
243 | | * |
244 | | * \param[in] error Error. |
245 | | * \param[in] size Message size. |
246 | | */ |
247 | 501 | void on_sent(const asio::error_code& error, std::size_t size) { |
248 | 501 | if (error) { |
249 | 0 | if (error == asio::error::operation_aborted) { |
250 | 0 | return; |
251 | 0 | } |
252 | 0 | if (error == asio::error::eof || |
253 | 0 | error == asio::error::connection_reset || |
254 | 0 | error == asio::error::broken_pipe) { |
255 | 0 | MSGPACK_RPC_TRACE( |
256 | 0 | logger_, "({}) Connection closed by peer.", log_name_); |
257 | 0 | state_machine_.handle_processing_stopped(); |
258 | 0 | on_closed_(Status()); |
259 | 0 | return; |
260 | 0 | } |
261 | 0 | const auto message = fmt::format( |
262 | 0 | "Error occurred when sending data: {}", error.message()); |
263 | 0 | MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message); |
264 | 0 | throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message); |
265 | 0 | } |
266 | | |
267 | 501 | MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size); |
268 | 501 | on_sent_(); |
269 | 501 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE7on_sentERKNSt3__110error_codeEm Line | Count | Source | 247 | 251 | void on_sent(const asio::error_code& error, std::size_t size) { | 248 | 251 | if (error) { | 249 | 0 | if (error == asio::error::operation_aborted) { | 250 | 0 | return; | 251 | 0 | } | 252 | 0 | if (error == asio::error::eof || | 253 | 0 | error == asio::error::connection_reset || | 254 | 0 | error == asio::error::broken_pipe) { | 255 | 0 | MSGPACK_RPC_TRACE( | 256 | 0 | logger_, "({}) Connection closed by peer.", log_name_); | 257 | 0 | state_machine_.handle_processing_stopped(); | 258 | 0 | on_closed_(Status()); | 259 | 0 | return; | 260 | 0 | } | 261 | 0 | const auto message = fmt::format( | 262 | 0 | "Error occurred when sending data: {}", error.message()); | 263 | 0 | MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message); | 264 | 0 | throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message); | 265 | 0 | } | 266 | | | 267 | 251 | MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size); | 268 | 251 | on_sent_(); | 269 | 251 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE7on_sentERKNSt3__110error_codeEm Line | Count | Source | 247 | 250 | void on_sent(const asio::error_code& error, std::size_t size) { | 248 | 250 | if (error) { | 249 | 0 | if (error == asio::error::operation_aborted) { | 250 | 0 | return; | 251 | 0 | } | 252 | 0 | if (error == asio::error::eof || | 253 | 0 | error == asio::error::connection_reset || | 254 | 0 | error == asio::error::broken_pipe) { | 255 | 0 | MSGPACK_RPC_TRACE( | 256 | 0 | logger_, "({}) Connection closed by peer.", log_name_); | 257 | 0 | state_machine_.handle_processing_stopped(); | 258 | 0 | on_closed_(Status()); | 259 | 0 | return; | 260 | 0 | } | 261 | 0 | const auto message = fmt::format( | 262 | 0 | "Error occurred when sending data: {}", error.message()); | 263 | 0 | MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message); | 264 | 0 | throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message); | 265 | 0 | } | 266 | | | 267 | 250 | MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size); | 268 | 250 | on_sent_(); | 269 | 250 | } |
|
270 | | |
271 | | /*! |
272 | | * \brief Close this connection in this thread. |
273 | | * |
274 | | * \param[in] status Status. |
275 | | */ |
276 | 59 | void close_in_thread(const Status& status) { |
277 | 59 | if (!state_machine_.handle_stop_requested()) { |
278 | 7 | return; |
279 | 7 | } |
280 | 52 | socket_.cancel(); |
281 | 52 | socket_.shutdown(AsioSocket::shutdown_both); |
282 | 52 | socket_.close(); |
283 | 52 | on_closed_(status); |
284 | 52 | MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_); |
285 | 52 | } _ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE15close_in_threadERKNS_6common6StatusE Line | Count | Source | 276 | 31 | void close_in_thread(const Status& status) { | 277 | 31 | if (!state_machine_.handle_stop_requested()) { | 278 | 5 | return; | 279 | 5 | } | 280 | 26 | socket_.cancel(); | 281 | 26 | socket_.shutdown(AsioSocket::shutdown_both); | 282 | 26 | socket_.close(); | 283 | 26 | on_closed_(status); | 284 | 26 | MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_); | 285 | 26 | } |
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE15close_in_threadERKNS_6common6StatusE Line | Count | Source | 276 | 28 | void close_in_thread(const Status& status) { | 277 | 28 | if (!state_machine_.handle_stop_requested()) { | 278 | 2 | return; | 279 | 2 | } | 280 | 26 | socket_.cancel(); | 281 | 26 | socket_.shutdown(AsioSocket::shutdown_both); | 282 | 26 | socket_.close(); | 283 | 26 | on_closed_(status); | 284 | 26 | MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_); | 285 | 26 | } |
|
286 | | |
287 | | //! Socket. |
288 | | AsioSocket socket_; |
289 | | |
290 | | //! Callback function when a message is received. |
291 | | MessageReceivedCallback on_received_{}; |
292 | | |
293 | | //! Callback function when a message is sent. |
294 | | MessageSentCallback on_sent_{}; |
295 | | |
296 | | //! Callback function when this connection is closed. |
297 | | ConnectionClosedCallback on_closed_{}; |
298 | | |
299 | | //! Parser of messages. |
300 | | messages::MessageParser message_parser_; |
301 | | |
302 | | //! Address of the local endpoint. |
303 | | ConcreteAddress local_address_; |
304 | | |
305 | | //! Address of the remote endpoint. |
306 | | ConcreteAddress remote_address_; |
307 | | |
308 | | //! Name of the connection for logs. |
309 | | std::string log_name_; |
310 | | |
311 | | //! Logger. |
312 | | std::shared_ptr<logging::Logger> logger_; |
313 | | |
314 | | //! State machine. |
315 | | BackgroundTaskStateMachine state_machine_{}; |
316 | | |
317 | | //! List of connections. |
318 | | std::weak_ptr<ConnectionList<Connection>> connection_list_; |
319 | | }; |
320 | | |
321 | | } // namespace msgpack_rpc::transport |