cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
client_connector.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <atomic>
23#include <functional>
24#include <memory>
25#include <mutex>
26#include <utility>
27#include <vector>
28
39
41
45class ClientConnector : public std::enable_shared_from_this<ClientConnector> {
46public:
48 using ConnectionCallback = std::function<void()>;
49
53
56
58 using ConnectionClosedCallback = std::function<void()>;
59
69 ClientConnector(const std::shared_ptr<executors::IExecutor>& executor,
71 std::vector<addresses::URI> server_uris,
72 const config::ReconnectionConfig& reconnection_config,
73 const std::shared_ptr<logging::Logger>& logger)
74 : backends_(std::move(backends)),
75 server_uris_(std::move(server_uris)),
76 retry_timer_(executor, logger, reconnection_config),
77 logger_(logger) {}
78
92 ConnectionClosedCallback on_closed) {
93 on_connection_ = std::move(on_connection);
94 on_received_ = std::move(on_received);
95 on_sent_ = std::move(on_sent);
96 on_closed_ = std::move(on_closed);
98 }
99
103 void stop() {
104 if (is_stopped_.exchange(true, std::memory_order_acquire)) {
105 return;
106 }
107
108 std::unique_lock<std::mutex> lock(connection_mutex_);
109 if (connection_) {
110 connection_->async_close();
111 connection_.reset();
112 }
113 lock.unlock();
114 retry_timer_.cancel();
115 }
116
122 [[nodiscard]] std::shared_ptr<transport::IConnection> connection() {
123 std::unique_lock<std::mutex> lock(connection_mutex_);
124 return connection_;
125 }
126
127private:
133 [self = this->shared_from_this()](const Status& status,
134 const std::shared_ptr<transport::IConnection>& connection) {
135 if (status.code() == StatusCode::SUCCESS) {
136 self->on_connection(connection);
137 } else {
138 self->on_connection_failure();
139 }
140 });
141 }
142
149 const std::shared_ptr<transport::IConnection>& connection) {
150 if (is_stopped_.load(std::memory_order_relaxed)) {
151 return;
152 }
153 std::unique_lock<std::mutex> lock(connection_mutex_);
156 [weak_self = weak_from_this()](const Status& /*status*/) {
157 const auto self = weak_self.lock();
158 if (self) {
159 self->on_connection_closed();
160 }
161 });
162 retry_timer_.reset();
163 lock.unlock();
164
166 }
167
172 if (is_stopped_.load(std::memory_order_relaxed)) {
173 return;
174 }
175 retry_timer_.async_wait(
176 [self = this->shared_from_this()] { self->async_connect(); });
177 }
178
183 if (is_stopped_.load(std::memory_order_relaxed)) {
184 return;
185 }
186 std::unique_lock<std::mutex> lock(connection_mutex_);
187 connection_.reset();
188 lock.unlock();
189 MSGPACK_RPC_TRACE(logger_, "Connection closed, so reconnect now.");
190 on_closed_();
192 }
193
196
198 std::vector<addresses::URI> server_uris_;
199
201 std::shared_ptr<transport::IConnection> connection_{};
202
204 std::mutex connection_mutex_{};
205
207 std::atomic<bool> is_stopped_{false};
208
211
214
217
220
223
225 std::shared_ptr<logging::Logger> logger_;
226};
227
228} // namespace msgpack_rpc::clients::impl
Definition of async_connect function.
Definition of BackendList class.
ClientConnector(const std::shared_ptr< executors::IExecutor > &executor, transport::BackendList backends, std::vector< addresses::URI > server_uris, const config::ReconnectionConfig &reconnection_config, const std::shared_ptr< logging::Logger > &logger)
Constructor.
void on_connection(const std::shared_ptr< transport::IConnection > &connection)
Handle connection established.
std::function< void()> ConnectionCallback
Type of callback functions called when a connection is established.
transport::IConnection::MessageSentCallback MessageSentCallback
Type of callback functions called when a message is successfully sent.
std::mutex connection_mutex_
Mutex of connection_ and is_connecting_.
ConnectionCallback on_connection_
Callback function called when a connection is established.
void on_connection_closed()
Handle connection closed.
std::shared_ptr< transport::IConnection > connection_
Connection.
void on_connection_failure()
Handle failure to connect.
std::atomic< bool > is_stopped_
Whether this connector is stopped.
transport::BackendList backends_
Backends.
void async_connect()
Asynchronously connect to a server.
std::function< void()> ConnectionClosedCallback
Type of callback functions called when a connection is closed.
transport::IConnection::MessageReceivedCallback MessageReceivedCallback
Type of callback functions called when a message is received.
ConnectionClosedCallback on_closed_
Callback function called when a connection is closed.
MessageReceivedCallback on_received_
Callback function called when a message is received.
MessageSentCallback on_sent_
Callback function called when a message is sent.
std::shared_ptr< logging::Logger > logger_
Logger.
void start(ConnectionCallback on_connection, MessageReceivedCallback on_received, MessageSentCallback on_sent, ConnectionClosedCallback on_closed)
Start processing.
std::vector< addresses::URI > server_uris_
URIs of servers.
std::shared_ptr< transport::IConnection > connection()
Get the connection.
ReconnectionTimer retry_timer_
Timer to sleep until next retry.
Class of timer to sleep before reconnecting.
Class of statuses.
Definition status.h:34
StatusCode code() const noexcept
Get the status code.
Definition status.cpp:69
Class of configurations of reconnection.
Class of lists of backends of protocols.
std::function< void(messages::ParsedMessage)> MessageReceivedCallback
Type of callback functions called when a message is received.
std::function< void()> MessageSentCallback
Type of callback functions called when a message is successfully sent.
Definition of IConnection class.
Definition of IExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
Namespace of internal implementations.
void async_connect(const BackendList &backends, const std::vector< addresses::URI > &uris, std::function< void(const Status &, std::shared_ptr< IConnection >)> on_connection)
Asynchronously connect to an endpoint.
STL namespace.
Definition of ReconnectionConfig class.
Class of ReconnectionTimer class.
Definition of Status class.
Definition of StatusCode enumeration.
Definition of URI class.