cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
acceptor.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <cstdio> // IWYU pragma: keep
23#include <memory>
24#include <optional>
25#include <string>
26#include <system_error>
27#include <type_traits> // IWYU pragma: keep
28#include <utility>
29
30#include <asio/error.hpp>
31#include <asio/error_code.hpp>
32#include <asio/post.hpp>
33#include <fmt/format.h>
34#include <fmt/ostream.h>
35
48
49namespace msgpack_rpc::transport {
50
54
55template <typename AsioAcceptorType, typename AsioSocketType,
56 typename ConcreteAddressType>
58 : public IAcceptor,
59 public std::enable_shared_from_this<
60 Acceptor<AsioAcceptorType, AsioSocketType, ConcreteAddressType>> {
61public:
63 using AsioAcceptor = AsioAcceptorType;
64
66 using AsioSocket = AsioSocketType;
67
69 using ConcreteAddress = ConcreteAddressType;
70
73
83 const std::shared_ptr<executors::IExecutor>& executor,
84 const config::MessageParserConfig& message_parser_config,
85 std::shared_ptr<logging::Logger> logger)
86 : acceptor_(executor->context(executors::OperationType::TRANSPORT),
87 local_address.asio_address()),
88 executor_(executor),
89 local_address_(acceptor_.local_endpoint()),
90 message_parser_config_(message_parser_config),
91 log_name_(fmt::format("Acceptor(local={})", local_address_)),
92 logger_(std::move(logger)),
94 MSGPACK_RPC_TRACE(logger_, "({}) Created an acceptor to listen {}.",
96 }
97
99 void start(ConnectionCallback on_connection) override {
100 state_machine_.handle_start_request();
101 // Only one thread can enter here.
102
103 on_connection_ = std::move(on_connection);
104 asio::post(acceptor_.get_executor(),
105 [self = this->shared_from_this()] { self->async_accept_next(); });
106
107 state_machine_.handle_processing_started();
108 }
109
111 void stop() override {
112 asio::post(acceptor_.get_executor(),
113 [self = this->shared_from_this()]() { self->stop_in_thread(); });
114 }
115
117 [[nodiscard]] const addresses::IAddress& local_address()
118 const noexcept override {
119 return local_address_;
120 }
121
122private:
127 socket_.reset();
128 socket_.emplace(
130 acceptor_.async_accept(*socket_,
131 [self = this->shared_from_this()](
132 const asio::error_code& error) { self->on_accept(error); });
133 }
134
140 void on_accept(const asio::error_code& error) {
141 if (error) {
142 if (error == asio::error::operation_aborted) {
143 return;
144 }
145 const auto message =
146 fmt::format("Error occurred when accepting a connection: {}",
147 error.message());
148 MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
149 throw MsgpackRPCException(StatusCode::UNEXPECTED_ERROR, message);
150 }
151
152 MSGPACK_RPC_TRACE(logger_, "({}) Accepted a connection from {}.",
153 log_name_, fmt::streamed(socket_->remote_endpoint()));
154 auto connection = std::make_shared<ConnectionType>(std::move(*socket_),
156 connection_list_->append(connection);
157 on_connection_(std::move(connection));
158
159 if (!state_machine_.is_processing()) {
160 return;
161 }
163 }
164
169 if (!state_machine_.handle_stop_requested()) {
170 return;
171 }
172 acceptor_.cancel();
173 acceptor_.close();
174 connection_list_->async_close_all();
175 if constexpr (std::is_same_v<ConcreteAddress,
176 addresses::UnixSocketAddress>) {
177 // Without removing the file of Unix socket, the socket cannot be
178 // used next time.
179 (void)std::remove(local_address_.file_path().c_str());
180 }
181 MSGPACK_RPC_TRACE(logger_, "({}) Stopped this acceptor.", log_name_);
182 }
183
189 [[nodiscard]] std::shared_ptr<executors::IExecutor> get_executor() {
190 auto executor = executor_.lock();
191 if (!executor) {
192 const auto message = std::string("Executor is not set.");
193 MSGPACK_RPC_CRITICAL(logger_, "({}) {}", log_name_, message);
195 StatusCode::PRECONDITION_NOT_MET, message);
196 }
197 return executor;
198 }
199
202
204 std::optional<AsioSocket> socket_{};
205
207 std::weak_ptr<executors::IExecutor> executor_;
208
211
214
217
219 std::string log_name_;
220
222 std::shared_ptr<logging::Logger> logger_;
223
226
228 std::shared_ptr<ConnectionList<ConnectionType>> connection_list_;
229};
230
231} // namespace msgpack_rpc::transport
Definition of BackgroundTaskStateMachine class.
Interface of addresses.
Definition i_address.h:31
Class of exceptions in cpp-msgpack-rpc library.
Class of configuration of parsers of messages.
void on_accept(const asio::error_code &error)
Handle the result of accept operation.
Definition acceptor.h:140
Acceptor(const ConcreteAddress &local_address, const std::shared_ptr< executors::IExecutor > &executor, const config::MessageParserConfig &message_parser_config, std::shared_ptr< logging::Logger > logger)
Constructor.
Definition acceptor.h:82
void async_accept_next()
Asynchronously accept a connection.
Definition acceptor.h:126
AsioSocketType AsioSocket
Type of sockets in asio library.
Definition acceptor.h:66
Connection< AsioSocket, ConcreteAddress > ConnectionType
Type of connections.
Definition acceptor.h:72
void stop_in_thread()
Stop this acceptor in this thread.
Definition acceptor.h:168
std::shared_ptr< executors::IExecutor > get_executor()
Get the executor.
Definition acceptor.h:189
void start(ConnectionCallback on_connection) override
Start process of this acceptor.
Definition acceptor.h:99
void stop() override
Stop this acceptor.
Definition acceptor.h:111
AsioAcceptorType AsioAcceptor
Type of acceptors in asio library.
Definition acceptor.h:63
ConcreteAddressType ConcreteAddress
Type of concrete addresses.
Definition acceptor.h:69
Class of state machines of background tasks in connections and acceptors.
Class of lists of connections.
std::function< void(std::shared_ptr< IConnection >)> ConnectionCallback
Type of callback functions called when a connection is accepted.
Definition i_acceptor.h:42
Definition of Connection class.
Definition of ConnectionList class.
Definition of IAcceptor class.
Definition of IAddress.
Definition of IExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
Definition logger.h:234
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
#define MSGPACK_RPC_ERROR(LOGGER_PTR,...)
Write a error log.
Definition logger.h:222
Definition of MessageParserConfig class.
Definition of MsgpackRPCException class.
Namespace of fmt library.
Definition uri.h:113
Namespace of executors to process asynchronous tasks.
Namespace of transport of messages.
Definition backends.h:31
STL namespace.
Definition of OperationType enumeration.
Definition of StatusCode enumeration.
Definition of UnixSocketAddress class.