Coverage Report

Created: 2025-08-01 03:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/builds/MusicScience37Projects/utility-libraries/cpp-msgpack-rpc/src/msgpack_rpc/transport/connection.h
Line
Count
Source
1
/*
2
 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
/*!
17
 * \file
18
 * \brief Definition of Connection class.
19
 */
20
#pragma once
21
22
#include <cstddef>
23
#include <functional>
24
#include <memory>
25
#include <optional>
26
#include <string>
27
#include <string_view>
28
#include <system_error>
29
#include <utility>
30
31
#include <asio/buffer.hpp>
32
#include <asio/error.hpp>
33
#include <asio/error_code.hpp>
34
#include <asio/post.hpp>
35
#include <asio/write.hpp>
36
#include <fmt/format.h>
37
38
#include "msgpack_rpc/addresses/i_address.h"
39
#include "msgpack_rpc/common/msgpack_rpc_exception.h"
40
#include "msgpack_rpc/common/status.h"
41
#include "msgpack_rpc/common/status_code.h"
42
#include "msgpack_rpc/config/message_parser_config.h"
43
#include "msgpack_rpc/logging/logger.h"
44
#include "msgpack_rpc/messages/buffer_view.h"
45
#include "msgpack_rpc/messages/message_parser.h"
46
#include "msgpack_rpc/messages/parsed_message.h"
47
#include "msgpack_rpc/messages/serialized_message.h"
48
#include "msgpack_rpc/transport/background_task_state_machine.h"
49
#include "msgpack_rpc/transport/connection_list.h"
50
#include "msgpack_rpc/transport/i_connection.h"
51
52
namespace msgpack_rpc::transport {
53
54
/*!
55
 * \brief Class of connections.
56
 *
57
 * \param AsioSocketType Type of sockets in asio library.
58
 * \param ConcreteAddressType Type of concrete addresses.
59
 */
60
template <typename AsioSocketType, typename ConcreteAddressType>
61
class Connection : public IConnection,
62
                   public std::enable_shared_from_this<
63
                       Connection<AsioSocketType, ConcreteAddressType>> {
64
public:
65
    //! Type of sockets in asio library.
66
    using AsioSocket = AsioSocketType;
67
68
    //! Type of concrete addresses.
69
    using ConcreteAddress = ConcreteAddressType;
70
71
    /*!
72
     * \brief Constructor.
73
     *
74
     * \param[in] socket Socket.
75
     * \param[in] message_parser_config Configuration of the parser of messages.
76
     * \param[in] logger Logger.
77
     * \param[in] connection_list List of connections.
78
     */
79
    Connection(AsioSocket&& socket,
80
        const config::MessageParserConfig& message_parser_config,
81
        std::shared_ptr<logging::Logger> logger,
82
        const std::shared_ptr<ConnectionList<Connection>>& connection_list =
83
            nullptr)
84
118
        : socket_(std::move(socket)),
85
118
          message_parser_(message_parser_config),
86
118
          local_address_(socket_.local_endpoint()),
87
118
          remote_address_(socket_.remote_endpoint()),
88
118
          log_name_(fmt::format("Connection(local={}, remote={})",
89
118
              local_address_, remote_address_)),
90
118
          logger_(std::move(logger)),
91
118
          connection_list_(connection_list) {}
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEEC2EOS7_RKNS_6config19MessageParserConfigENSt3__110shared_ptrINS_7logging6LoggerEEERKNSH_INS0_14ConnectionListISA_EEEE
Line
Count
Source
84
63
        : socket_(std::move(socket)),
85
63
          message_parser_(message_parser_config),
86
63
          local_address_(socket_.local_endpoint()),
87
63
          remote_address_(socket_.remote_endpoint()),
88
63
          log_name_(fmt::format("Connection(local={}, remote={})",
89
63
              local_address_, remote_address_)),
90
63
          logger_(std::move(logger)),
91
63
          connection_list_(connection_list) {}
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEEC2EOS7_RKNS_6config19MessageParserConfigENSt3__110shared_ptrINS_7logging6LoggerEEERKNSH_INS0_14ConnectionListISA_EEEE
Line
Count
Source
84
55
        : socket_(std::move(socket)),
85
55
          message_parser_(message_parser_config),
86
55
          local_address_(socket_.local_endpoint()),
87
55
          remote_address_(socket_.remote_endpoint()),
88
55
          log_name_(fmt::format("Connection(local={}, remote={})",
89
55
              local_address_, remote_address_)),
90
55
          logger_(std::move(logger)),
91
55
          connection_list_(connection_list) {}
92
93
    Connection(const Connection&) = delete;
94
    Connection(Connection&&) = delete;
95
    Connection& operator=(const Connection&) = delete;
96
    Connection& operator=(Connection&&) = delete;
97
98
    /*!
99
     * \brief Destructor.
100
     */
101
118
    ~Connection() override {
102
118
        const auto connection_list = connection_list_.lock();
103
118
        if (connection_list) {
104
50
            connection_list->remove(this);
105
50
        }
106
118
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEED2Ev
Line
Count
Source
101
63
    ~Connection() override {
102
63
        const auto connection_list = connection_list_.lock();
103
63
        if (connection_list) {
104
27
            connection_list->remove(this);
105
27
        }
106
63
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEED2Ev
Line
Count
Source
101
55
    ~Connection() override {
102
55
        const auto connection_list = connection_list_.lock();
103
55
        if (connection_list) {
104
23
            connection_list->remove(this);
105
23
        }
106
55
    }
107
108
    //! \copydoc msgpack_rpc::transport::IConnection::start
109
    void start(MessageReceivedCallback on_received, MessageSentCallback on_sent,
110
116
        ConnectionClosedCallback on_closed) override {
111
116
        state_machine_.handle_start_request();
112
        // Only one thread can enter here.
113
114
116
        on_received_ = std::move(on_received);
115
116
        on_sent_ = std::move(on_sent);
116
116
        on_closed_ = std::move(on_closed);
117
118
116
        state_machine_.handle_processing_started();
119
120
116
        asio::post(socket_.get_executor(),
121
116
            [self = this->shared_from_this()] { self->async_read_next(); });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEEENKUlvE_clEv
Line
Count
Source
121
60
            [self = this->shared_from_this()] { self->async_read_next(); });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEEENKUlvE_clEv
Line
Count
Source
121
54
            [self = this->shared_from_this()] { self->async_read_next(); });
122
116
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEE
Line
Count
Source
110
62
        ConnectionClosedCallback on_closed) override {
111
62
        state_machine_.handle_start_request();
112
        // Only one thread can enter here.
113
114
62
        on_received_ = std::move(on_received);
115
62
        on_sent_ = std::move(on_sent);
116
62
        on_closed_ = std::move(on_closed);
117
118
62
        state_machine_.handle_processing_started();
119
120
62
        asio::post(socket_.get_executor(),
121
62
            [self = this->shared_from_this()] { self->async_read_next(); });
122
62
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE5startENSt3__18functionIFvNSB_7variantIJNS_8messages13ParsedRequestENSE_14ParsedResponseENSE_18ParsedNotificationEEEEEEENSC_IFvvEEENSC_IFvRKNS_6common6StatusEEEE
Line
Count
Source
110
54
        ConnectionClosedCallback on_closed) override {
111
54
        state_machine_.handle_start_request();
112
        // Only one thread can enter here.
113
114
54
        on_received_ = std::move(on_received);
115
54
        on_sent_ = std::move(on_sent);
116
54
        on_closed_ = std::move(on_closed);
117
118
54
        state_machine_.handle_processing_started();
119
120
54
        asio::post(socket_.get_executor(),
121
54
            [self = this->shared_from_this()] { self->async_read_next(); });
122
54
    }
123
124
    //! \copydoc msgpack_rpc::transport::IConnection::async_send
125
501
    void async_send(const messages::SerializedMessage& message) override {
126
501
        if (!state_machine_.is_processing()) {
127
0
            MSGPACK_RPC_TRACE(logger_, "Not processing now.");
128
0
            return;
129
0
        }
130
501
        asio::post(socket_.get_executor(),
131
501
            [self = this->shared_from_this(), message]() {
132
501
                self->async_send_in_thread(message);
133
501
            });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE10async_sendERKNS_8messages17SerializedMessageEENKUlvE_clEv
Line
Count
Source
131
251
            [self = this->shared_from_this(), message]() {
132
251
                self->async_send_in_thread(message);
133
251
            });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE10async_sendERKNS_8messages17SerializedMessageEENKUlvE_clEv
Line
Count
Source
131
250
            [self = this->shared_from_this(), message]() {
132
250
                self->async_send_in_thread(message);
133
250
            });
134
501
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE10async_sendERKNS_8messages17SerializedMessageE
Line
Count
Source
125
251
    void async_send(const messages::SerializedMessage& message) override {
126
251
        if (!state_machine_.is_processing()) {
127
0
            MSGPACK_RPC_TRACE(logger_, "Not processing now.");
128
0
            return;
129
0
        }
130
251
        asio::post(socket_.get_executor(),
131
251
            [self = this->shared_from_this(), message]() {
132
251
                self->async_send_in_thread(message);
133
251
            });
134
251
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE10async_sendERKNS_8messages17SerializedMessageE
Line
Count
Source
125
250
    void async_send(const messages::SerializedMessage& message) override {
126
250
        if (!state_machine_.is_processing()) {
127
0
            MSGPACK_RPC_TRACE(logger_, "Not processing now.");
128
0
            return;
129
0
        }
130
250
        asio::post(socket_.get_executor(),
131
250
            [self = this->shared_from_this(), message]() {
132
250
                self->async_send_in_thread(message);
133
250
            });
134
250
    }
135
136
    //! \copydoc msgpack_rpc::transport::IConnection::async_close
137
60
    void async_close() override {
138
60
        asio::post(socket_.get_executor(), [self = this->shared_from_this()]() {
139
59
            self->close_in_thread(Status());
140
59
        });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE11async_closeEvENKUlvE_clEv
Line
Count
Source
138
31
        asio::post(socket_.get_executor(), [self = this->shared_from_this()]() {
139
31
            self->close_in_thread(Status());
140
31
        });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE11async_closeEvENKUlvE_clEv
Line
Count
Source
138
28
        asio::post(socket_.get_executor(), [self = this->shared_from_this()]() {
139
28
            self->close_in_thread(Status());
140
28
        });
141
60
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE11async_closeEv
Line
Count
Source
137
32
    void async_close() override {
138
32
        asio::post(socket_.get_executor(), [self = this->shared_from_this()]() {
139
32
            self->close_in_thread(Status());
140
32
        });
141
32
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE11async_closeEv
Line
Count
Source
137
28
    void async_close() override {
138
28
        asio::post(socket_.get_executor(), [self = this->shared_from_this()]() {
139
28
            self->close_in_thread(Status());
140
28
        });
141
28
    }
142
143
    //! \copydoc msgpack_rpc::transport::IConnection::local_address
144
    [[nodiscard]] const addresses::IAddress& local_address()
145
6
        const noexcept override {
146
6
        return local_address_;
147
6
    }
_ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE13local_addressEv
Line
Count
Source
145
3
        const noexcept override {
146
3
        return local_address_;
147
3
    }
_ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE13local_addressEv
Line
Count
Source
145
3
        const noexcept override {
146
3
        return local_address_;
147
3
    }
148
149
    //! \copydoc msgpack_rpc::transport::IConnection::remote_address
150
    [[nodiscard]] const addresses::IAddress& remote_address()
151
69
        const noexcept override {
152
69
        return remote_address_;
153
69
    }
_ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE14remote_addressEv
Line
Count
Source
151
38
        const noexcept override {
152
38
        return remote_address_;
153
38
    }
_ZNK11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE14remote_addressEv
Line
Count
Source
151
31
        const noexcept override {
152
31
        return remote_address_;
153
31
    }
154
155
private:
156
    /*!
157
     * \brief Asynchronously read next bytes.
158
     */
159
362
    void async_read_next() {
160
362
        const auto buffer = message_parser_.prepare_buffer();
161
362
        socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()),
162
362
            [self = this->shared_from_this()](const asio::error_code& error,
163
362
                std::size_t size) { self->process_read_bytes(error, size); });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE15async_read_nextEvENKUlRKNSt3__110error_codeEmE_clESE_m
Line
Count
Source
163
167
                std::size_t size) { self->process_read_bytes(error, size); });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE15async_read_nextEvENKUlRKNSt3__110error_codeEmE_clESE_m
Line
Count
Source
163
194
                std::size_t size) { self->process_read_bytes(error, size); });
164
362
        MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_);
165
362
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE15async_read_nextEv
Line
Count
Source
159
168
    void async_read_next() {
160
168
        const auto buffer = message_parser_.prepare_buffer();
161
168
        socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()),
162
168
            [self = this->shared_from_this()](const asio::error_code& error,
163
168
                std::size_t size) { self->process_read_bytes(error, size); });
164
168
        MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_);
165
168
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE15async_read_nextEv
Line
Count
Source
159
194
    void async_read_next() {
160
194
        const auto buffer = message_parser_.prepare_buffer();
161
194
        socket_.async_read_some(asio::buffer(buffer.data(), buffer.size()),
162
194
            [self = this->shared_from_this()](const asio::error_code& error,
163
194
                std::size_t size) { self->process_read_bytes(error, size); });
164
194
        MSGPACK_RPC_TRACE(logger_, "({}) Reading next bytes.", log_name_);
165
194
    }
166
167
    /*!
168
     * \brief Process read bytes.
169
     *
170
     * \param[in] error Error.
171
     * \param[in] size Number of bytes read to the buffer.
172
     */
173
360
    void process_read_bytes(const asio::error_code& error, std::size_t size) {
174
360
        if (error) {
175
112
            if (error == asio::error::operation_aborted) {
176
48
                return;
177
48
            }
178
64
            if (error == asio::error::eof ||
179
64
                error == asio::error::connection_reset ||
180
65
                error == asio::error::broken_pipe) {
181
65
                MSGPACK_RPC_TRACE(
182
65
                    logger_, "({}) Connection closed by peer.", log_name_);
183
65
                state_machine_.handle_processing_stopped();
184
65
                on_closed_(Status());
185
65
                return;
186
65
            }
187
18.4E
            const auto message = fmt::format(
188
18.4E
                "Error occurred when receiving data: {}", error.message());
189
18.4E
            MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
190
18.4E
            throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
191
64
        }
192
193
248
        MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size);
194
248
        message_parser_.consumed(size);
195
196
749
        while (true) {
197
749
            std::optional<messages::ParsedMessage> message;
198
749
            try {
199
749
                message = message_parser_.try_parse();
200
749
            } catch (const MsgpackRPCException& e) {
201
0
                MSGPACK_RPC_ERROR(
202
0
                    logger_, "({}) {}", log_name_, e.status().message());
203
0
                close_in_thread(e.status());
204
0
                return;
205
0
            }
206
749
            if (message) {
207
501
                MSGPACK_RPC_TRACE(
208
501
                    logger_, "({}) Received a message.", log_name_, size);
209
501
                on_received_(std::move(*message));
210
501
                message.reset();
211
501
            } else {
212
248
                MSGPACK_RPC_TRACE(logger_,
213
248
                    "({}) More bytes are needed to parse a message.",
214
248
                    log_name_);
215
248
                break;
216
248
            }
217
749
        }
218
219
248
        if (!state_machine_.is_processing()) {
220
0
            return;
221
0
        }
222
248
        async_read_next();
223
248
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE18process_read_bytesERKNSt3__110error_codeEm
Line
Count
Source
173
167
    void process_read_bytes(const asio::error_code& error, std::size_t size) {
174
167
        if (error) {
175
59
            if (error == asio::error::operation_aborted) {
176
24
                return;
177
24
            }
178
35
            if (error == asio::error::eof ||
179
35
                error == asio::error::connection_reset ||
180
35
                error == asio::error::broken_pipe) {
181
35
                MSGPACK_RPC_TRACE(
182
35
                    logger_, "({}) Connection closed by peer.", log_name_);
183
35
                state_machine_.handle_processing_stopped();
184
35
                on_closed_(Status());
185
35
                return;
186
35
            }
187
0
            const auto message = fmt::format(
188
0
                "Error occurred when receiving data: {}", error.message());
189
0
            MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
190
0
            throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
191
35
        }
192
193
108
        MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size);
194
108
        message_parser_.consumed(size);
195
196
359
        while (true) {
197
359
            std::optional<messages::ParsedMessage> message;
198
359
            try {
199
359
                message = message_parser_.try_parse();
200
359
            } catch (const MsgpackRPCException& e) {
201
0
                MSGPACK_RPC_ERROR(
202
0
                    logger_, "({}) {}", log_name_, e.status().message());
203
0
                close_in_thread(e.status());
204
0
                return;
205
0
            }
206
359
            if (message) {
207
251
                MSGPACK_RPC_TRACE(
208
251
                    logger_, "({}) Received a message.", log_name_, size);
209
251
                on_received_(std::move(*message));
210
251
                message.reset();
211
251
            } else {
212
108
                MSGPACK_RPC_TRACE(logger_,
213
108
                    "({}) More bytes are needed to parse a message.",
214
108
                    log_name_);
215
108
                break;
216
108
            }
217
359
        }
218
219
108
        if (!state_machine_.is_processing()) {
220
0
            return;
221
0
        }
222
108
        async_read_next();
223
108
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE18process_read_bytesERKNSt3__110error_codeEm
Line
Count
Source
173
193
    void process_read_bytes(const asio::error_code& error, std::size_t size) {
174
193
        if (error) {
175
53
            if (error == asio::error::operation_aborted) {
176
24
                return;
177
24
            }
178
29
            if (error == asio::error::eof ||
179
29
                error == asio::error::connection_reset ||
180
30
                error == asio::error::broken_pipe) {
181
30
                MSGPACK_RPC_TRACE(
182
30
                    logger_, "({}) Connection closed by peer.", log_name_);
183
30
                state_machine_.handle_processing_stopped();
184
30
                on_closed_(Status());
185
30
                return;
186
30
            }
187
18.4E
            const auto message = fmt::format(
188
18.4E
                "Error occurred when receiving data: {}", error.message());
189
18.4E
            MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
190
18.4E
            throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
191
29
        }
192
193
140
        MSGPACK_RPC_TRACE(logger_, "({}) Read {} bytes.", log_name_, size);
194
140
        message_parser_.consumed(size);
195
196
390
        while (true) {
197
390
            std::optional<messages::ParsedMessage> message;
198
390
            try {
199
390
                message = message_parser_.try_parse();
200
390
            } catch (const MsgpackRPCException& e) {
201
0
                MSGPACK_RPC_ERROR(
202
0
                    logger_, "({}) {}", log_name_, e.status().message());
203
0
                close_in_thread(e.status());
204
0
                return;
205
0
            }
206
390
            if (message) {
207
250
                MSGPACK_RPC_TRACE(
208
250
                    logger_, "({}) Received a message.", log_name_, size);
209
250
                on_received_(std::move(*message));
210
250
                message.reset();
211
250
            } else {
212
140
                MSGPACK_RPC_TRACE(logger_,
213
140
                    "({}) More bytes are needed to parse a message.",
214
140
                    log_name_);
215
140
                break;
216
140
            }
217
390
        }
218
219
140
        if (!state_machine_.is_processing()) {
220
0
            return;
221
0
        }
222
140
        async_read_next();
223
140
    }
224
225
    /*!
226
     * \brief Asynchronously send a message in this thread.
227
     *
228
     * \param[in] message Message.
229
     */
230
501
    void async_send_in_thread(const messages::SerializedMessage& message) {
231
501
        asio::async_write(socket_,
232
501
            asio::const_buffer(message.data(), message.size()),
233
501
            [self = this->shared_from_this(), message](
234
501
                const asio::error_code& error, std::size_t /*size*/) {
235
501
                self->on_sent(error, message.size());
236
501
            });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageEENKUlRKNSt3__110error_codeEmE_clESI_m
Line
Count
Source
234
251
                const asio::error_code& error, std::size_t /*size*/) {
235
251
                self->on_sent(error, message.size());
236
251
            });
_ZZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageEENKUlRKNSt3__110error_codeEmE_clESI_m
Line
Count
Source
234
250
                const asio::error_code& error, std::size_t /*size*/) {
235
250
                self->on_sent(error, message.size());
236
250
            });
237
501
        MSGPACK_RPC_TRACE(
238
501
            logger_, "({}) Sending {} bytes.", log_name_, message.size());
239
501
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageE
Line
Count
Source
230
251
    void async_send_in_thread(const messages::SerializedMessage& message) {
231
251
        asio::async_write(socket_,
232
251
            asio::const_buffer(message.data(), message.size()),
233
251
            [self = this->shared_from_this(), message](
234
251
                const asio::error_code& error, std::size_t /*size*/) {
235
251
                self->on_sent(error, message.size());
236
251
            });
237
251
        MSGPACK_RPC_TRACE(
238
251
            logger_, "({}) Sending {} bytes.", log_name_, message.size());
239
251
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE20async_send_in_threadERKNS_8messages17SerializedMessageE
Line
Count
Source
230
250
    void async_send_in_thread(const messages::SerializedMessage& message) {
231
250
        asio::async_write(socket_,
232
250
            asio::const_buffer(message.data(), message.size()),
233
250
            [self = this->shared_from_this(), message](
234
250
                const asio::error_code& error, std::size_t /*size*/) {
235
250
                self->on_sent(error, message.size());
236
250
            });
237
250
        MSGPACK_RPC_TRACE(
238
250
            logger_, "({}) Sending {} bytes.", log_name_, message.size());
239
250
    }
240
241
    /*!
242
     * \brief Handle the result of send operation.
243
     *
244
     * \param[in] error Error.
245
     * \param[in] size Message size.
246
     */
247
501
    void on_sent(const asio::error_code& error, std::size_t size) {
248
501
        if (error) {
249
0
            if (error == asio::error::operation_aborted) {
250
0
                return;
251
0
            }
252
0
            if (error == asio::error::eof ||
253
0
                error == asio::error::connection_reset ||
254
0
                error == asio::error::broken_pipe) {
255
0
                MSGPACK_RPC_TRACE(
256
0
                    logger_, "({}) Connection closed by peer.", log_name_);
257
0
                state_machine_.handle_processing_stopped();
258
0
                on_closed_(Status());
259
0
                return;
260
0
            }
261
0
            const auto message = fmt::format(
262
0
                "Error occurred when sending data: {}", error.message());
263
0
            MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
264
0
            throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
265
0
        }
266
267
501
        MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size);
268
501
        on_sent_();
269
501
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE7on_sentERKNSt3__110error_codeEm
Line
Count
Source
247
251
    void on_sent(const asio::error_code& error, std::size_t size) {
248
251
        if (error) {
249
0
            if (error == asio::error::operation_aborted) {
250
0
                return;
251
0
            }
252
0
            if (error == asio::error::eof ||
253
0
                error == asio::error::connection_reset ||
254
0
                error == asio::error::broken_pipe) {
255
0
                MSGPACK_RPC_TRACE(
256
0
                    logger_, "({}) Connection closed by peer.", log_name_);
257
0
                state_machine_.handle_processing_stopped();
258
0
                on_closed_(Status());
259
0
                return;
260
0
            }
261
0
            const auto message = fmt::format(
262
0
                "Error occurred when sending data: {}", error.message());
263
0
            MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
264
0
            throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
265
0
        }
266
267
251
        MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size);
268
251
        on_sent_();
269
251
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE7on_sentERKNSt3__110error_codeEm
Line
Count
Source
247
250
    void on_sent(const asio::error_code& error, std::size_t size) {
248
250
        if (error) {
249
0
            if (error == asio::error::operation_aborted) {
250
0
                return;
251
0
            }
252
0
            if (error == asio::error::eof ||
253
0
                error == asio::error::connection_reset ||
254
0
                error == asio::error::broken_pipe) {
255
0
                MSGPACK_RPC_TRACE(
256
0
                    logger_, "({}) Connection closed by peer.", log_name_);
257
0
                state_machine_.handle_processing_stopped();
258
0
                on_closed_(Status());
259
0
                return;
260
0
            }
261
0
            const auto message = fmt::format(
262
0
                "Error occurred when sending data: {}", error.message());
263
0
            MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
264
0
            throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
265
0
        }
266
267
250
        MSGPACK_RPC_TRACE(logger_, "({}) Sent {} bytes.", log_name_, size);
268
250
        on_sent_();
269
250
    }
270
271
    /*!
272
     * \brief Close this connection in this thread.
273
     *
274
     * \param[in] status Status.
275
     */
276
59
    void close_in_thread(const Status& status) {
277
59
        if (!state_machine_.handle_stop_requested()) {
278
7
            return;
279
7
        }
280
52
        socket_.cancel();
281
52
        socket_.shutdown(AsioSocket::shutdown_both);
282
52
        socket_.close();
283
52
        on_closed_(status);
284
52
        MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_);
285
52
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_2ip3tcpENS2_15any_io_executorEEENS_9addresses10TCPAddressEE15close_in_threadERKNS_6common6StatusE
Line
Count
Source
276
31
    void close_in_thread(const Status& status) {
277
31
        if (!state_machine_.handle_stop_requested()) {
278
5
            return;
279
5
        }
280
26
        socket_.cancel();
281
26
        socket_.shutdown(AsioSocket::shutdown_both);
282
26
        socket_.close();
283
26
        on_closed_(status);
284
26
        MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_);
285
26
    }
_ZN11msgpack_rpc9transport10ConnectionIN4asio19basic_stream_socketINS2_5local15stream_protocolENS2_15any_io_executorEEENS_9addresses17UnixSocketAddressEE15close_in_threadERKNS_6common6StatusE
Line
Count
Source
276
28
    void close_in_thread(const Status& status) {
277
28
        if (!state_machine_.handle_stop_requested()) {
278
2
            return;
279
2
        }
280
26
        socket_.cancel();
281
26
        socket_.shutdown(AsioSocket::shutdown_both);
282
26
        socket_.close();
283
26
        on_closed_(status);
284
26
        MSGPACK_RPC_TRACE(logger_, "({}) Closed this connection.", log_name_);
285
26
    }
286
287
    //! Socket.
288
    AsioSocket socket_;
289
290
    //! Callback function when a message is received.
291
    MessageReceivedCallback on_received_{};
292
293
    //! Callback function when a message is sent.
294
    MessageSentCallback on_sent_{};
295
296
    //! Callback function when this connection is closed.
297
    ConnectionClosedCallback on_closed_{};
298
299
    //! Parser of messages.
300
    messages::MessageParser message_parser_;
301
302
    //! Address of the local endpoint.
303
    ConcreteAddress local_address_;
304
305
    //! Address of the remote endpoint.
306
    ConcreteAddress remote_address_;
307
308
    //! Name of the connection for logs.
309
    std::string log_name_;
310
311
    //! Logger.
312
    std::shared_ptr<logging::Logger> logger_;
313
314
    //! State machine.
315
    BackgroundTaskStateMachine state_machine_{};
316
317
    //! List of connections.
318
    std::weak_ptr<ConnectionList<Connection>> connection_list_;
319
};
320
321
}  // namespace msgpack_rpc::transport