28#include <system_error>
31#include <asio/buffer.hpp>
32#include <asio/error.hpp>
33#include <asio/error_code.hpp>
34#include <asio/post.hpp>
35#include <asio/write.hpp>
36#include <fmt/format.h>
60template <
typename AsioSocketType,
typename ConcreteAddressType>
62 public std::enable_shared_from_this<
63 Connection<AsioSocketType, ConcreteAddressType>> {
81 std::shared_ptr<logging::Logger> logger,
103 if (connection_list) {
104 connection_list->remove(
this);
120 asio::post(
socket_.get_executor(),
121 [self = this->shared_from_this()] { self->async_read_next(); });
130 asio::post(
socket_.get_executor(),
131 [self = this->shared_from_this(), message]() {
132 self->async_send_in_thread(message);
138 asio::post(
socket_.get_executor(), [self = this->shared_from_this()]() {
139 self->close_in_thread(Status());
145 const noexcept
override {
151 const noexcept
override {
161 socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()),
162 [self = this->shared_from_this()](
const asio::error_code& error,
163 std::size_t size) { self->process_read_bytes(error, size); });
175 if (error == asio::error::operation_aborted) {
178 if (error == asio::error::eof ||
179 error == asio::error::connection_reset ||
180 error == asio::error::broken_pipe) {
187 const auto message = fmt::format(
188 "Error occurred when receiving data: {}", error.message());
197 std::optional<messages::ParsedMessage> message;
213 "({}) More bytes are needed to parse a message.",
232 asio::const_buffer(message.
data(), message.
size()),
233 [self = this->shared_from_this(), message](
234 const asio::error_code& error, std::size_t ) {
235 self->on_sent(error, message.size());
247 void on_sent(
const asio::error_code& error, std::size_t size) {
249 if (error == asio::error::operation_aborted) {
252 if (error == asio::error::eof ||
253 error == asio::error::connection_reset ||
254 error == asio::error::broken_pipe) {
261 const auto message = fmt::format(
262 "Error occurred when sending data: {}", error.message());
281 socket_.shutdown(AsioSocket::shutdown_both);
Definition of BackgroundTaskStateMachine class.
Definition of BufferView class.
Class of exceptions in cpp-msgpack-rpc library.
const Status & status() const noexcept
Get the status.
std::string_view message() const noexcept
Get the error message.
Class of configuration of parsers of messages.
Class of serialized message data.
const char * data() const noexcept
Get the pointer to the data.
std::size_t size() const noexcept
Get the size of the data.
Class of state machines of background tasks in connections and acceptors.
Class of lists of connections.
ConcreteAddress local_address_
void async_send_in_thread(const messages::SerializedMessage &message)
Asynchronously send a message in this thread.
std::shared_ptr< logging::Logger > logger_
~Connection() override
Destructor.
Connection(AsioSocket &&socket, const config::MessageParserConfig &message_parser_config, std::shared_ptr< logging::Logger > logger, const std::shared_ptr< ConnectionList< Connection > > &connection_list=nullptr)
Constructor.
const addresses::IAddress & local_address() const noexcept override
Get the address of the local endpoint.
ConcreteAddress remote_address_
MessageSentCallback on_sent_
void close_in_thread(const Status &status)
Close this connection in this thread.
BackgroundTaskStateMachine state_machine_
void start(MessageReceivedCallback on_received, MessageSentCallback on_sent, ConnectionClosedCallback on_closed) override
Start process of this connection.
messages::MessageParser message_parser_
ConnectionClosedCallback on_closed_
void on_sent(const asio::error_code &error, std::size_t size)
void async_send(const messages::SerializedMessage &message) override
Asynchronously send a message.
const addresses::IAddress & remote_address() const noexcept override
Get the address of the remote endpoint.
ConcreteAddress ConcreteAddress
MessageReceivedCallback on_received_
void async_close() override
Asynchronously close this connection.
std::weak_ptr< ConnectionList< Connection > > connection_list_
void async_read_next()
Asynchronously read next bytes.
void process_read_bytes(const asio::error_code &error, std::size_t size)
Process read bytes.
std::function< void(messages::ParsedMessage)> MessageReceivedCallback
Type of callback functions called when a message is received.
std::function< void(const Status &)> ConnectionClosedCallback
Type of callback functions called when a connection is closed.
std::function< void()> MessageSentCallback
Type of callback functions called when a message is successfully sent.
Definition of ConnectionList class.
Definition of IConnection class.
Definition of Logger class.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
#define MSGPACK_RPC_ERROR(LOGGER_PTR,...)
Write a error log.
Definition of MessageParser class.
Definition of MessageParserConfig class.
Definition of MsgpackRPCException class.
Namespace of fmt library.
Namespace of transport of messages.
Definition of ParsedMessage type.
Definition of SerializedMessage class.
Definition of Status class.
Definition of StatusCode enumeration.