32#include <asio/executor_work_guard.hpp>
75 "An executor must not be run multiple times.");
124 std::function<
void(std::exception_ptr)> exception_callback)
override {
135 return is_started_.load(std::memory_order_relaxed) &&
146 context_thread_pair.thread =
147 std::thread{[
this, &context_thread_pair] {
152 context_thread_pair.thread =
153 std::thread{[
this, &context_thread_pair] {
169 if (context_thread_pair.thread.joinable()) {
170 context_thread_pair.thread.join();
174 if (context_thread_pair.thread.joinable()) {
175 context_thread_pair.thread.join();
185 context_thread_pair.context.stop();
188 context_thread_pair.context.stop();
197 context_thread_pair.work_guard.reset();
200 context_thread_pair.work_guard.reset();
214 }
catch (
const std::exception& e) {
216 "Executor stops due to an exception thrown in thread {}: {}",
217 thread_id, e.what());
218 const std::exception_ptr exception = std::current_exception();
227 exception_callback(exception);
240 std::ostringstream stream;
241 stream << std::this_thread::get_id();
253 std::atomic<std::size_t>& index, std::size_t size) {
255 return index.fetch_add(1, std::memory_order_relaxed) % size;
265 asio::executor_work_guard<AsioContextType::executor_type>
work_guard;
310 std::shared_ptr<logging::Logger> logger,
312 return std::make_shared<GeneralExecutor>(std::move(logger),
config);
Definition of AsioContextType class.
Class of exceptions in cpp-msgpack-rpc library.
Class of configuration of executors.
Class of general-purpose executors.
void start() override
Start internal event loops to process asynchronous tasks.
std::mutex exception_callbacks_mutex_
Mutex of exception_callbacks_.
~GeneralExecutor() override
Destructor.
std::atomic< std::size_t > transport_context_index_
Index of context to use for transport.
void on_exception(std::function< void(std::exception_ptr)> exception_callback) override
Register a function called when an exception is thrown in asynchronous tasks.
AsioContextType & context(OperationType type) noexcept override
Get the context in asio library.
std::atomic< std::size_t > callback_context_index_
Index of context to use for callbacks.
static std::size_t get_context_index(std::atomic< std::size_t > &index, std::size_t size)
Get the index of context to use.
std::exception_ptr last_exception() override
Get the last exception thrown in asynchronous tasks.
GeneralExecutor(std::shared_ptr< logging::Logger > logger, const config::ExecutorConfig &config)
Constructor.
void start_threads()
Start threads.
void stop() override
Stops operation.
void interrupt_threads()
Notify threads to stop operations.
std::vector< std::function< void(std::exception_ptr)> > exception_callbacks_
Functions called when an exception is thrown.
std::exception_ptr exception_in_thread_
Exception thrown in threads.
std::mutex exception_in_thread_mutex_
Mutex of exception_in_thread_.
std::atomic< bool > is_started_
Whether this executor has been started.
std::atomic< bool > is_stopped_
Whether this executor has been stopped.
std::shared_ptr< logging::Logger > logger_
Logger.
bool is_running() override
Check whether this executor is running.
static std::string create_thread_id_string()
Create a string of the current thread ID.
void stop_threads()
Stop threads.
std::vector< ContextThreadPair > transport_context_thread_pairs_
List of pairs of context and its thread for transport.
std::vector< ContextThreadPair > callbacks_context_thread_pairs_
List of pairs of context and its thread for callbacks.
void async_stop_threads_gently()
Notify threads to stop operations gently.
void run_in_thread(AsioContextType &context)
Run operations in a thread.
Definition of ExecutorConfig class.
Definition of IAsyncExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_CRITICAL(LOGGER_PTR,...)
Write a critical log.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition of MsgpackRPCException class.
Namespace of configurations.
Namespace of executors to process asynchronous tasks.
OperationType
Enumeration of types of operations.
@ CALLBACK
Execution of callbacks.
std::shared_ptr< IAsyncExecutor > create_executor(std::shared_ptr< logging::Logger > logger, const config::ExecutorConfig &config)
Create an executor.
asio::io_context AsioContextType
Type of context in asio library.
Definition of OperationType enumeration.
Definition of StatusCode enumeration.
ContextThreadPair()
Constructor.
AsioContextType context
Context.
std::thread thread
Thread.
asio::executor_work_guard< AsioContextType::executor_type > work_guard
Work guard.