cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
server_impl.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <atomic>
23#include <exception>
24#include <functional>
25#include <memory>
26#include <string>
27#include <utility>
28#include <vector>
29
43
45
49class ServerImpl final : public IServerImpl {
50public:
59 ServerImpl(std::vector<std::shared_ptr<transport::IAcceptor>> acceptors,
60 std::unique_ptr<methods::IMethodProcessor> processor,
61 std::shared_ptr<executors::IAsyncExecutor> executor,
62 std::shared_ptr<logging::Logger> logger)
63 : acceptors_(std::move(acceptors)),
64 processor_(std::move(processor)),
65 executor_(std::move(executor)),
66 logger_(std::move(logger)),
68
70 ~ServerImpl() override {
71 try {
72 stop();
73 } catch (const std::exception& e) {
75 "An exception was thrown when destructing a server but "
76 "ignored: "
77 "{}",
78 e.what());
79 }
80 }
81
82 ServerImpl(const ServerImpl&) = delete;
83 ServerImpl(ServerImpl&&) = delete;
84 ServerImpl& operator=(const ServerImpl&) = delete;
85 ServerImpl& operator=(ServerImpl&&) = delete;
86
90 void start() {
91 if (is_started_.exchange(true)) {
92 throw MsgpackRPCException(StatusCode::PRECONDITION_NOT_MET,
93 "This server has already been started.");
94 }
96
97 executor_->on_exception(
98 [weak_signal_handler =
99 std::weak_ptr<StopSignalHandler>(stop_signal_handler_)](
100 const std::exception_ptr& /*exception*/) {
101 const auto signal_handler = weak_signal_handler.lock();
102 if (signal_handler) {
103 signal_handler->stop();
104 }
105 });
106
107 executor_->start();
108 }
109
111 void stop() override {
112 if (!is_started_.load()) {
113 return;
114 }
115 if (is_stopped_.exchange(true)) {
116 return;
117 }
119 acceptors_.clear();
120 processor_.reset();
121 executor_->stop();
122 executor_.reset();
123 }
124
126 void run_until_signal() override {
127 stop_signal_handler_->wait();
128
129 const auto last_exception = executor_->last_exception();
130
131 stop();
132
133 if (last_exception) {
134 std::rethrow_exception(last_exception);
135 }
136 }
137
139 [[nodiscard]] std::vector<addresses::URI> local_endpoint_uris() override {
140 std::vector<addresses::URI> uris;
141 uris.reserve(acceptors_.size());
142 for (const auto& acceptor : acceptors_) {
143 uris.push_back(acceptor->local_address().to_uri());
144 }
145 return uris;
146 }
147
149 [[nodiscard]] std::shared_ptr<executors::IExecutor> executor() override {
150 return executor_;
151 }
152
153private:
158 for (const auto& acceptor : acceptors_) {
159 acceptor->start(
160 [executor = std::weak_ptr<executors::IExecutor>(executor_),
161 processor = processor_, logger = logger_](
162 const std::shared_ptr<transport::IConnection>& connection) {
163 const auto handler = std::make_shared<ServerConnection>(
164 connection, executor, processor, logger);
165 handler->start();
166 });
167 MSGPACK_RPC_DEBUG(logger_, "Listening to {}.",
168 acceptor->local_address().to_string());
169 }
170 }
171
176 for (const auto& acceptor : acceptors_) {
177 acceptor->stop();
178 }
179 }
180
182 std::vector<std::shared_ptr<transport::IAcceptor>> acceptors_;
183
185 std::shared_ptr<methods::IMethodProcessor> processor_;
186
188 std::shared_ptr<executors::IAsyncExecutor> executor_;
189
191 std::shared_ptr<logging::Logger> logger_;
192
194 std::shared_ptr<StopSignalHandler> stop_signal_handler_;
195
197 std::atomic<bool> is_started_{false};
198
200 std::atomic<bool> is_stopped_{false};
201};
202
203} // namespace msgpack_rpc::servers::impl
Class of exceptions in cpp-msgpack-rpc library.
Class to handle Linux signals SIGINT and SIGTERM to stop a server.
Class of internal implementation of servers.
Definition server_impl.h:49
std::shared_ptr< logging::Logger > logger_
Logger.
std::vector< addresses::URI > local_endpoint_uris() override
Get the URIs of the local endpoints in this server.
void stop_acceptors()
Stop processing of acceptors.
void start_acceptors()
Start processing of acceptors.
std::shared_ptr< methods::IMethodProcessor > processor_
Processor of methods.
std::shared_ptr< executors::IAsyncExecutor > executor_
Executor.
std::atomic< bool > is_stopped_
Whether this server has been stopped.
void run_until_signal() override
Run processing of this server until SIGINT or SIGTERM is received.
std::shared_ptr< StopSignalHandler > stop_signal_handler_
Handler of signals to stop processing.
void stop() override
Stop processing of this server.
ServerImpl(std::vector< std::shared_ptr< transport::IAcceptor > > acceptors, std::unique_ptr< methods::IMethodProcessor > processor, std::shared_ptr< executors::IAsyncExecutor > executor, std::shared_ptr< logging::Logger > logger)
Constructor.
Definition server_impl.h:59
std::shared_ptr< executors::IExecutor > executor() override
Get the executor.
void start()
Start processing of this server.
Definition server_impl.h:90
std::vector< std::shared_ptr< transport::IAcceptor > > acceptors_
Acceptors.
std::atomic< bool > is_started_
Whether this server has been started.
Definition of IAcceptor class.
Definition of IAddress.
Definition of IAsyncExecutor class.
Definition of IConnection class.
Definition of IExecutor class.
Definition of IMethodProcessor class.
Definition of IServerImpl class.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
Definition logger.h:234
#define MSGPACK_RPC_DEBUG(LOGGER_PTR,...)
Write a debug log.
Definition logger.h:186
Definition of MsgpackRPCException class.
Namespace of internal implementation.
STL namespace.
Definition of Server class.
Definition of StatusCode enumeration.
Definition of StopSignalHandler class.
Definition of URI class.