cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
unix_socket_connector.h
Go to the documentation of this file.
1/*
2 * Copyright 2024 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include "msgpack_rpc/config.h"
23
24#if MSGPACK_RPC_HAS_UNIX_SOCKETS
25
26#include <functional>
27#include <memory>
28#include <string>
29#include <string_view>
30#include <type_traits>
31#include <utility>
32
33#include <asio/error_code.hpp>
34#include <asio/local/stream_protocol.hpp>
35#include <fmt/format.h>
36#include <fmt/ostream.h>
37
49
50namespace msgpack_rpc::transport::unix_socket {
51
55class UnixSocketConnector final
56 : public IConnector,
57 public std::enable_shared_from_this<UnixSocketConnector> {
58public:
60 using AsioSocket = asio::local::stream_protocol::socket;
61
63 using AsioAddress = asio::local::stream_protocol::endpoint;
64
66 using ConcreteAddress = addresses::UnixSocketAddress;
67
69 using ConnectionType = Connection<AsioSocket, ConcreteAddress>;
70
79 UnixSocketConnector(const std::shared_ptr<executors::IExecutor>& executor,
80 const config::MessageParserConfig& message_parser_config,
81 std::shared_ptr<logging::Logger> logger, std::string_view scheme)
82 : executor_(executor),
83 message_parser_config_(message_parser_config),
84 scheme_(scheme),
85 log_name_(fmt::format("Connector({})", scheme_)),
86 logger_(std::move(logger)) {}
87
89 void async_connect(
90 const addresses::URI& uri, ConnectionCallback on_connected) override {
91 const AsioAddress asio_address{uri.host_or_path()};
92
93 auto socket_ptr = std::make_unique<AsioSocket>(
94 get_executor()->context(executors::OperationType::TRANSPORT));
95 AsioSocket& socket = *socket_ptr;
96 socket.async_connect(asio_address,
97 [self = this->shared_from_this(),
98 socket_ptr_moved = std::move(socket_ptr),
99 on_connected_moved = std::move(on_connected), uri,
100 asio_address](const asio::error_code& error) {
101 self->on_connect(error, std::move(*socket_ptr_moved),
102 on_connected_moved, asio_address, uri);
103 });
104 MSGPACK_RPC_TRACE(logger_, "({}) Connecting to {}.", log_name_, uri);
105 }
106
107private:
117 void on_connect(const asio::error_code& error, AsioSocket&& socket,
118 const ConnectionCallback& on_connected, const AsioAddress& asio_address,
119 const addresses::URI& uri) {
120 if (error) {
121 const auto message = fmt::format(
122 "Failed to connect to {}: {}", uri, error.message());
123 MSGPACK_RPC_WARN(logger_, "({}) {}", log_name_, message);
124 on_connected(
125 Status(StatusCode::CONNECTION_FAILURE, message), nullptr);
126 return;
127 }
128 MSGPACK_RPC_TRACE(logger_, "({}) Connected to {}.", log_name_,
129 fmt::streamed(asio_address));
130
131 auto connection = std::make_shared<ConnectionType>(
132 std::move(socket), message_parser_config_, logger_);
133 on_connected(Status(), std::move(connection));
134 }
135
141 [[nodiscard]] std::shared_ptr<executors::IExecutor> get_executor() {
142 auto executor = executor_.lock();
143 if (!executor) {
144 const auto message = std::string("Executor is not set.");
145 MSGPACK_RPC_CRITICAL(logger_, "({}) {}", log_name_, message);
146 throw MsgpackRPCException(
147 StatusCode::PRECONDITION_NOT_MET, message);
148 }
149 return executor;
150 }
151
153 std::weak_ptr<executors::IExecutor> executor_;
154
156 config::MessageParserConfig message_parser_config_;
157
159 std::string scheme_;
160
162 std::string log_name_;
163
165 std::shared_ptr<logging::Logger> logger_;
166};
167
168} // namespace msgpack_rpc::transport::unix_socket
169
170#endif
Definitions of platform-specific macros.
Definition of Connection class.
Definition of IConnector class.
Definition of IExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
Definition logger.h:234
#define MSGPACK_RPC_WARN(LOGGER_PTR,...)
Write a warning log.
Definition logger.h:210
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
Definition of MessageParserConfig class.
Definition of MsgpackRPCException class.
void async_connect(const BackendList &backends, const std::vector< addresses::URI > &uris, std::function< void(const Status &, std::shared_ptr< IConnection >)> on_connection)
Asynchronously connect to an endpoint.
Definition of OperationType enumeration.
Definition of Status class.
Definition of StatusCode enumeration.
Definition of UnixSocketAddress class.
Definition of URI class.