cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
general_executor.cpp
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#include <atomic>
21#include <cstdlib>
22#include <exception>
23#include <functional>
24#include <memory>
25#include <mutex>
26#include <sstream>
27#include <string>
28#include <thread>
29#include <utility>
30#include <vector>
31
32#include <asio/executor_work_guard.hpp>
33
41
42namespace msgpack_rpc::executors {
43
47class GeneralExecutor final : public IAsyncExecutor {
48public:
55 GeneralExecutor(std::shared_ptr<logging::Logger> logger,
57 : transport_context_thread_pairs_(config.num_transport_threads()),
58 callbacks_context_thread_pairs_(config.num_callback_threads()),
59 logger_(std::move(logger)) {}
60
61 GeneralExecutor(const GeneralExecutor&) = delete;
63 GeneralExecutor& operator=(const GeneralExecutor&) = delete;
64 GeneralExecutor& operator=(GeneralExecutor&&) = delete;
65
69 ~GeneralExecutor() override { stop(); }
70
72 void start() override {
73 if (is_started_.exchange(true)) {
74 throw MsgpackRPCException(StatusCode::PRECONDITION_NOT_MET,
75 "An executor must not be run multiple times.");
76 }
77
78 MSGPACK_RPC_TRACE(logger_, "Start an executor.");
80 }
81
83 void stop() override {
84 if (is_stopped_.exchange(true)) {
85 // Already stopped.
86 return;
87 }
88
90
91 // Remove objects
94
95 MSGPACK_RPC_TRACE(logger_, "Executor run stopped.");
96 }
97
99 AsioContextType& context(OperationType type) noexcept override {
100 switch (type) {
105 .context;
110 .context;
111 }
112 // This line won't be executed without a bug.
113 std::abort();
114 }
115
117 [[nodiscard]] std::exception_ptr last_exception() override {
118 std::unique_lock<std::mutex> lock(exception_in_thread_mutex_);
120 }
121
124 std::function<void(std::exception_ptr)> exception_callback) override {
125 std::unique_lock<std::mutex> lock(exception_callbacks_mutex_);
126 exception_callbacks_.push_back(std::move(exception_callback));
127 }
128
130 [[nodiscard]] bool is_running() override {
131 std::unique_lock<std::mutex> lock(exception_in_thread_mutex_);
133 return false;
134 }
135 return is_started_.load(std::memory_order_relaxed) &&
136 !is_stopped_.load(std::memory_order_relaxed);
137 }
138
139private:
144 try {
145 for (auto& context_thread_pair : transport_context_thread_pairs_) {
146 context_thread_pair.thread =
147 std::thread{[this, &context_thread_pair] {
148 run_in_thread(context_thread_pair.context);
149 }};
150 }
151 for (auto& context_thread_pair : callbacks_context_thread_pairs_) {
152 context_thread_pair.thread =
153 std::thread{[this, &context_thread_pair] {
154 run_in_thread(context_thread_pair.context);
155 }};
156 }
157 } catch (...) {
158 stop_threads();
159 throw;
160 }
161 }
162
168 for (auto& context_thread_pair : transport_context_thread_pairs_) {
169 if (context_thread_pair.thread.joinable()) {
170 context_thread_pair.thread.join();
171 }
172 }
173 for (auto& context_thread_pair : callbacks_context_thread_pairs_) {
174 if (context_thread_pair.thread.joinable()) {
175 context_thread_pair.thread.join();
176 }
177 }
178 }
179
184 for (auto& context_thread_pair : transport_context_thread_pairs_) {
185 context_thread_pair.context.stop();
186 }
187 for (auto& context_thread_pair : callbacks_context_thread_pairs_) {
188 context_thread_pair.context.stop();
189 }
190 }
191
196 for (auto& context_thread_pair : transport_context_thread_pairs_) {
197 context_thread_pair.work_guard.reset();
198 }
199 for (auto& context_thread_pair : callbacks_context_thread_pairs_) {
200 context_thread_pair.work_guard.reset();
201 }
202 }
203
210 const auto thread_id = create_thread_id_string();
211 MSGPACK_RPC_TRACE(logger_, "Start an executor thread {}.", thread_id);
212 try {
213 context.run();
214 } catch (const std::exception& e) {
216 "Executor stops due to an exception thrown in thread {}: {}",
217 thread_id, e.what());
218 const std::exception_ptr exception = std::current_exception();
219 {
220 std::unique_lock<std::mutex> lock(exception_in_thread_mutex_);
221 exception_in_thread_ = exception;
222 }
224 {
225 std::unique_lock<std::mutex> lock(exception_callbacks_mutex_);
226 for (const auto& exception_callback : exception_callbacks_) {
227 exception_callback(exception);
228 }
229 }
230 }
231 MSGPACK_RPC_TRACE(logger_, "Finish an executor thread {}.", thread_id);
232 }
233
239 static std::string create_thread_id_string() {
240 std::ostringstream stream;
241 stream << std::this_thread::get_id();
242 return stream.str();
243 }
244
252 static std::size_t get_context_index(
253 std::atomic<std::size_t>& index, std::size_t size) {
254 // TODO Should I consider overflow?
255 return index.fetch_add(1, std::memory_order_relaxed) % size;
256 }
257
260 public:
263
265 asio::executor_work_guard<AsioContextType::executor_type> work_guard;
266
268 std::thread thread;
269
272 : context(1), work_guard(asio::make_work_guard(context)) {}
273 };
274
276 std::vector<ContextThreadPair> transport_context_thread_pairs_;
277
279 std::vector<ContextThreadPair> callbacks_context_thread_pairs_;
280
282 std::atomic<std::size_t> transport_context_index_{0};
283
285 std::atomic<std::size_t> callback_context_index_{0};
286
288 std::atomic<bool> is_started_{false};
289
291 std::atomic<bool> is_stopped_{false};
292
294 std::exception_ptr exception_in_thread_{};
295
298
300 std::vector<std::function<void(std::exception_ptr)>> exception_callbacks_{};
301
304
306 std::shared_ptr<logging::Logger> logger_;
307};
308
309std::shared_ptr<IAsyncExecutor> create_executor(
310 std::shared_ptr<logging::Logger> logger,
312 return std::make_shared<GeneralExecutor>(std::move(logger), config);
313}
314
315} // namespace msgpack_rpc::executors
Definition of AsioContextType class.
Class of exceptions in cpp-msgpack-rpc library.
Class of configuration of executors.
Class of general-purpose executors.
void start() override
Start internal event loops to process asynchronous tasks.
std::mutex exception_callbacks_mutex_
Mutex of exception_callbacks_.
std::atomic< std::size_t > transport_context_index_
Index of context to use for transport.
void on_exception(std::function< void(std::exception_ptr)> exception_callback) override
Register a function called when an exception is thrown in asynchronous tasks.
AsioContextType & context(OperationType type) noexcept override
Get the context in asio library.
std::atomic< std::size_t > callback_context_index_
Index of context to use for callbacks.
static std::size_t get_context_index(std::atomic< std::size_t > &index, std::size_t size)
Get the index of context to use.
std::exception_ptr last_exception() override
Get the last exception thrown in asynchronous tasks.
GeneralExecutor(std::shared_ptr< logging::Logger > logger, const config::ExecutorConfig &config)
Constructor.
void stop() override
Stops operation.
void interrupt_threads()
Notify threads to stop operations.
std::vector< std::function< void(std::exception_ptr)> > exception_callbacks_
Functions called when an exception is thrown.
std::exception_ptr exception_in_thread_
Exception thrown in threads.
std::mutex exception_in_thread_mutex_
Mutex of exception_in_thread_.
std::atomic< bool > is_started_
Whether this executor has been started.
std::atomic< bool > is_stopped_
Whether this executor has been stopped.
std::shared_ptr< logging::Logger > logger_
Logger.
bool is_running() override
Check whether this executor is running.
static std::string create_thread_id_string()
Create a string of the current thread ID.
std::vector< ContextThreadPair > transport_context_thread_pairs_
List of pairs of context and its thread for transport.
std::vector< ContextThreadPair > callbacks_context_thread_pairs_
List of pairs of context and its thread for callbacks.
void async_stop_threads_gently()
Notify threads to stop operations gently.
void run_in_thread(AsioContextType &context)
Run operations in a thread.
Definition of ExecutorConfig class.
Definition of IAsyncExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
Definition logger.h:234
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
Definition of MsgpackRPCException class.
Namespace of configurations.
Namespace of executors to process asynchronous tasks.
OperationType
Enumeration of types of operations.
@ CALLBACK
Execution of callbacks.
std::shared_ptr< IAsyncExecutor > create_executor(std::shared_ptr< logging::Logger > logger, const config::ExecutorConfig &config)
Create an executor.
asio::io_context AsioContextType
Type of context in asio library.
STL namespace.
Definition of OperationType enumeration.
Definition of StatusCode enumeration.
asio::executor_work_guard< AsioContextType::executor_type > work_guard
Work guard.