cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
tcp_connector.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <cstdint>
23#include <functional>
24#include <memory>
25#include <optional>
26#include <string>
27#include <string_view>
28#include <type_traits>
29#include <utility>
30
31#include <asio/connect.hpp>
32#include <asio/error_code.hpp>
33#include <asio/ip/basic_resolver_entry.hpp>
34#include <asio/ip/basic_resolver_iterator.hpp>
35#include <asio/ip/tcp.hpp>
36#include <fmt/format.h>
37#include <fmt/ostream.h>
38
51
53
57class TCPConnector : public IConnector,
58 public std::enable_shared_from_this<TCPConnector> {
59public:
61 using AsioResolver = asio::ip::tcp::resolver;
62
64 using AsioSocket = asio::ip::tcp::socket;
65
67 using AsioAddress = asio::ip::tcp::endpoint;
68
71
74
82 TCPConnector(const std::shared_ptr<executors::IExecutor>& executor,
83 const config::MessageParserConfig& message_parser_config,
84 std::shared_ptr<logging::Logger> logger)
85 : executor_(executor),
86 message_parser_config_(message_parser_config),
87 resolver_(executor->context(executors::OperationType::TRANSPORT)),
88 scheme_("tcp"),
89 log_name_(fmt::format("Connector({})", scheme_)),
90 logger_(std::move(logger)) {}
91
94 const addresses::URI& uri, ConnectionCallback on_connected) override {
95 const auto resolved_endpoints = resolve(uri);
96
97 auto socket_ptr = std::make_unique<AsioSocket>(
99 AsioSocket& socket = *socket_ptr;
100 asio::async_connect(socket, resolved_endpoints,
101 [self = this->shared_from_this(),
102 socket_ptr_moved = std::move(socket_ptr),
103 on_connected_moved = std::move(on_connected),
104 uri](const asio::error_code& error,
105 const AsioAddress& asio_address) {
106 self->on_connect(error, std::move(*socket_ptr_moved),
107 on_connected_moved, asio_address, uri);
108 });
109 MSGPACK_RPC_TRACE(logger_, "({}) Connecting to {}.", log_name_, uri);
110 }
111
112private:
119 [[nodiscard]] AsioResolver::results_type resolve(
120 const addresses::URI& uri) {
121 if (uri.scheme() != scheme_) {
122 throw MsgpackRPCException(StatusCode::INVALID_ARGUMENT,
123 fmt::format("Scheme is different with the resolver: "
124 "expected={}, actual={}",
125 scheme_, uri.scheme()));
126 }
127
128 MSGPACK_RPC_TRACE(logger_, "({}) Resolve {}.", log_name_, uri);
129
130 const std::string service = fmt::format(
131 "{}", uri.port_number().value_or(static_cast<std::uint16_t>(0)));
132 asio::error_code error;
133 auto results = resolver_.resolve(uri.host_or_path(), service, error);
134 if (error) {
135 const auto message =
136 fmt::format("Failed to resolve {}: {}", uri, error.message());
137 MSGPACK_RPC_ERROR(logger_, "({}) {}", log_name_, message);
138 throw MsgpackRPCException(StatusCode::HOST_UNRESOLVED, message);
139 }
140
141 if (logger_->output_log_level() <= logging::LogLevel::TRACE) {
142 for (const auto& result : results) {
143 MSGPACK_RPC_TRACE(logger_, "({}) Result of resolving {}: {}.",
144 log_name_, uri, fmt::streamed(result.endpoint()));
145 }
146 }
147
148 return results;
149 }
150
160 void on_connect(const asio::error_code& error, AsioSocket&& socket,
161 const ConnectionCallback& on_connected, const AsioAddress& asio_address,
162 const addresses::URI& uri) {
163 if (error) {
164 const auto message = fmt::format(
165 "Failed to connect to {}: {}", uri, error.message());
166 MSGPACK_RPC_WARN(logger_, "({}) {}", log_name_, message);
167 on_connected(
168 Status(StatusCode::CONNECTION_FAILURE, message), nullptr);
169 return;
170 }
171 MSGPACK_RPC_TRACE(logger_, "({}) Connected to {}.", log_name_,
172 fmt::streamed(asio_address));
173
174 auto connection = std::make_shared<ConnectionType>(
175 std::move(socket), message_parser_config_, logger_);
176 on_connected(Status(), std::move(connection));
177 }
178
184 [[nodiscard]] std::shared_ptr<executors::IExecutor> get_executor() {
185 auto executor = executor_.lock();
186 if (!executor) {
187 const auto message = std::string("Executor is not set.");
188 MSGPACK_RPC_CRITICAL(logger_, "({}) {}", log_name_, message);
190 StatusCode::PRECONDITION_NOT_MET, message);
191 }
192 return executor;
193 }
194
196 std::weak_ptr<executors::IExecutor> executor_;
197
200
203
205 std::string scheme_;
206
208 std::string log_name_;
209
211 std::shared_ptr<logging::Logger> logger_;
212};
213
214} // namespace msgpack_rpc::transport::tcp
Class of addresses of TCP.
Definition tcp_address.h:44
Class of URIs (Uniform Resource Identifiers) to specify endpoints in this library.
Definition uri.h:38
std::string_view scheme() const noexcept
Get the scheme.
Definition uri.cpp:38
std::optional< std::uint16_t > port_number() const noexcept
Get the port number.
Definition uri.cpp:42
std::string_view host_or_path() const noexcept
Get the host name or file path.
Definition uri.cpp:40
Class of exceptions in cpp-msgpack-rpc library.
Class of statuses.
Definition status.h:34
Class of configuration of parsers of messages.
std::function< void(const Status &, std::shared_ptr< IConnection >)> ConnectionCallback
Type of callback functions called when connecting process finished (even for failures).
Definition i_connector.h:45
asio::ip::tcp::resolver AsioResolver
Type of resolvers in asio library.
std::string log_name_
Name of the connection for logs.
config::MessageParserConfig message_parser_config_
Configuration of parsers of messages.
void on_connect(const asio::error_code &error, AsioSocket &&socket, const ConnectionCallback &on_connected, const AsioAddress &asio_address, const addresses::URI &uri)
Handle the result of connect operation.
Connection< AsioSocket, ConcreteAddress > ConnectionType
Type of connections.
void async_connect(const addresses::URI &uri, ConnectionCallback on_connected) override
Asynchronously connect to an endpoint.
std::weak_ptr< executors::IExecutor > executor_
Executor.
std::shared_ptr< logging::Logger > logger_
Logger.
TCPConnector(const std::shared_ptr< executors::IExecutor > &executor, const config::MessageParserConfig &message_parser_config, std::shared_ptr< logging::Logger > logger)
Constructor.
addresses::TCPAddress ConcreteAddress
Type of concrete addresses.
std::shared_ptr< executors::IExecutor > get_executor()
Get the executor.
asio::ip::tcp::socket AsioSocket
Type of sockets in asio library.
asio::ip::tcp::endpoint AsioAddress
Type of addresses in asio library.
AsioResolver::results_type resolve(const addresses::URI &uri)
Resolve a URI.
Definition of Connection class.
Definition of IConnector class.
Definition of IExecutor class.
Definition of LogLevel enumeration.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
Definition logger.h:234
#define MSGPACK_RPC_WARN(LOGGER_PTR,...)
Write a warning log.
Definition logger.h:210
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
#define MSGPACK_RPC_ERROR(LOGGER_PTR,...)
Write a error log.
Definition logger.h:222
Definition of MessageParserConfig class.
Definition of MsgpackRPCException class.
Namespace of fmt library.
Definition uri.h:113
Namespace of executors to process asynchronous tasks.
@ TRACE
Trace. (Internal operations to send and receive messages.)
Definition log_level.h:31
Namespace of transport of messages via TCP.
STL namespace.
Definition of OperationType enumeration.
Definition of Status class.
Definition of StatusCode enumeration.
Definition of TCPAddress.
Definition of URI class.