24#if MSGPACK_RPC_HAS_UNIX_SOCKETS
33#include <asio/error_code.hpp>
34#include <asio/local/stream_protocol.hpp>
35#include <fmt/format.h>
36#include <fmt/ostream.h>
50namespace msgpack_rpc::transport::unix_socket {
55class UnixSocketConnector final
57 public std::enable_shared_from_this<UnixSocketConnector> {
60 using AsioSocket = asio::local::stream_protocol::socket;
63 using AsioAddress = asio::local::stream_protocol::endpoint;
66 using ConcreteAddress = addresses::UnixSocketAddress;
69 using ConnectionType = Connection<AsioSocket, ConcreteAddress>;
79 UnixSocketConnector(
const std::shared_ptr<executors::IExecutor>& executor,
80 const config::MessageParserConfig& message_parser_config,
81 std::shared_ptr<logging::Logger> logger, std::string_view scheme)
82 : executor_(executor),
83 message_parser_config_(message_parser_config),
85 log_name_(fmt::format(
"Connector({})", scheme_)),
86 logger_(std::move(logger)) {}
90 const addresses::URI& uri, ConnectionCallback on_connected)
override {
91 const AsioAddress asio_address{uri.host_or_path()};
93 auto socket_ptr = std::make_unique<AsioSocket>(
94 get_executor()->context(executors::OperationType::TRANSPORT));
95 AsioSocket& socket = *socket_ptr;
96 socket.async_connect(asio_address,
97 [self = this->shared_from_this(),
98 socket_ptr_moved = std::move(socket_ptr),
99 on_connected_moved = std::move(on_connected), uri,
100 asio_address](
const asio::error_code& error) {
101 self->on_connect(error, std::move(*socket_ptr_moved),
102 on_connected_moved, asio_address, uri);
117 void on_connect(
const asio::error_code& error, AsioSocket&& socket,
118 const ConnectionCallback& on_connected,
const AsioAddress& asio_address,
119 const addresses::URI& uri) {
121 const auto message = fmt::format(
122 "Failed to connect to {}: {}", uri, error.message());
125 Status(StatusCode::CONNECTION_FAILURE, message),
nullptr);
129 fmt::streamed(asio_address));
131 auto connection = std::make_shared<ConnectionType>(
132 std::move(socket), message_parser_config_, logger_);
133 on_connected(Status(), std::move(connection));
141 [[nodiscard]] std::shared_ptr<executors::IExecutor> get_executor() {
142 auto executor = executor_.lock();
144 const auto message = std::string(
"Executor is not set.");
146 throw MsgpackRPCException(
147 StatusCode::PRECONDITION_NOT_MET, message);
153 std::weak_ptr<executors::IExecutor> executor_;
156 config::MessageParserConfig message_parser_config_;
162 std::string log_name_;
165 std::shared_ptr<logging::Logger> logger_;
Definitions of platform-specific macros.
Definition of Connection class.
Definition of IConnector class.
Definition of IExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
#define MSGPACK_RPC_WARN(LOGGER_PTR,...)
Write a warning log.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition of MessageParserConfig class.
Definition of MsgpackRPCException class.
void async_connect(const BackendList &backends, const std::vector< addresses::URI > &uris, std::function< void(const Status &, std::shared_ptr< IConnection >)> on_connection)
Asynchronously connect to an endpoint.
Definition of OperationType enumeration.
Definition of Status class.
Definition of StatusCode enumeration.
Definition of UnixSocketAddress class.