27#include <unordered_map>
49class CallList :
public std::enable_shared_from_this<CallList> {
58 explicit CallList(std::chrono::nanoseconds timeout,
59 std::weak_ptr<executors::IExecutor>
executor,
60 std::shared_ptr<logging::Logger> logger)
73 std::shared_ptr<CallFutureImpl>>
76 const auto deadline = std::chrono::steady_clock::now() +
timeout_;
79 const auto serialized_request =
82 std::unique_lock<std::mutex> lock(
mutex_);
83 const auto [iter, is_success] =
88 StatusCode::UNEXPECTED_ERROR,
"Duplicate request ID.");
92 auto& call = iter->second;
96 deadline, [weak_self = this->weak_from_this(), request_id] {
97 const auto self = weak_self.lock();
99 self->on_timeout(request_id);
103 return {request_id, serialized_request, call.future()};
112 std::unique_lock<std::mutex> lock(
mutex_);
113 const auto iter =
list_.find(response.
id());
114 if (iter ==
list_.end()) {
116 "Ignored a response with a non-existing request ID {}.",
120 iter->second.set(response.
result());
130 [[nodiscard]] std::shared_ptr<executors::IExecutor>
executor() {
144 logger_,
"Timeout of an RPC (request ID: {}).", request_id);
145 std::unique_lock<std::mutex> lock(
mutex_);
146 const auto iter =
list_.find(request_id);
147 if (iter ==
list_.end()) {
150 iter->second.set(
Status(StatusCode::TIMEOUT,
151 "Result of an RPC couldn't be received within a timeout."));
156 std::unordered_map<messages::MessageID, Call>
list_{};
Definition of Call class.
Definition of CallFutureImpl class.
CallList(std::chrono::nanoseconds timeout, std::weak_ptr< executors::IExecutor > executor, std::shared_ptr< logging::Logger > logger)
Constructor.
std::unordered_map< messages::MessageID, Call > list_
List.
std::shared_ptr< executors::IExecutor > executor()
Get the executor.
std::mutex mutex_
Mutex of data.
void handle(const messages::ParsedResponse &response)
Handle a response.
std::shared_ptr< logging::Logger > logger_
Logger.
std::weak_ptr< executors::IExecutor > executor_
Executor.
std::tuple< messages::MessageID, messages::SerializedMessage, std::shared_ptr< CallFutureImpl > > create(messages::MethodNameView method_name, const IParametersSerializer ¶meters)
Register an RPC.
std::chrono::nanoseconds timeout_
Timeout of RPCs.
void on_timeout(messages::MessageID request_id)
Handle timeout of a RPC.
RequestIDGenerator request_id_generator_
Generator of message IDs of requests.
Class to generate message IDs of requests.
Class of exceptions in cpp-msgpack-rpc library.
Class of parsed responses.
MessageID id() const noexcept
Get the message ID.
const CallResult & result() const noexcept
Get the result.
Class of serialized message data.
Definition of IExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_WARN(LOGGER_PTR,...)
Write a warning log.
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition of MessageID type.
Definition of MethodNameView class.
Definition of MsgpackRPCException class.
Namespace of internal implementations.
std::uint32_t MessageID
Type of message IDs.
Definition of ParametersSerializer class.
Definition of ParsedResponse class.
Definition of RequestIDGenerator class.
Definition of SerializedMessage class.
Definition of Status class.
Definition of StatusCode enumeration.