cpp-msgpack-rpc 0.2.0
An RPC library implementing MessagePack RPC.
Loading...
Searching...
No Matches
call_list.h
Go to the documentation of this file.
1/*
2 * Copyright 2023 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <cassert>
23#include <chrono>
24#include <memory>
25#include <mutex>
26#include <tuple>
27#include <unordered_map>
28#include <utility>
29
43
45
49class CallList : public std::enable_shared_from_this<CallList> {
50public:
58 explicit CallList(std::chrono::nanoseconds timeout,
59 std::weak_ptr<executors::IExecutor> executor,
60 std::shared_ptr<logging::Logger> logger)
61 : timeout_(timeout),
62 executor_(std::move(executor)),
63 logger_(std::move(logger)) {}
64
72 [[nodiscard]] std::tuple<messages::MessageID, messages::SerializedMessage,
73 std::shared_ptr<CallFutureImpl>>
75 const IParametersSerializer& parameters) {
76 const auto deadline = std::chrono::steady_clock::now() + timeout_;
77
78 const messages::MessageID request_id = request_id_generator_.generate();
79 const auto serialized_request =
80 parameters.create_serialized_request(method_name, request_id);
81
82 std::unique_lock<std::mutex> lock(mutex_);
83 const auto [iter, is_success] =
84 list_.try_emplace(request_id, executor(), deadline);
85 if (!is_success) {
86 // This won't occur in the ordinary condition.
88 StatusCode::UNEXPECTED_ERROR, "Duplicate request ID.");
89 }
90 // Iterator can be invalidated, but reference won't be invalidated in
91 // std::unordered_map unless the referenced object is destroyed.
92 auto& call = iter->second;
93 lock.unlock();
94
95 call.set_timeout(
96 deadline, [weak_self = this->weak_from_this(), request_id] {
97 const auto self = weak_self.lock();
98 if (self) {
99 self->on_timeout(request_id);
100 }
101 });
102
103 return {request_id, serialized_request, call.future()};
104 }
105
111 void handle(const messages::ParsedResponse& response) {
112 std::unique_lock<std::mutex> lock(mutex_);
113 const auto iter = list_.find(response.id());
114 if (iter == list_.end()) {
116 "Ignored a response with a non-existing request ID {}.",
117 response.id());
118 return;
119 }
120 iter->second.set(response.result());
121 list_.erase(iter);
122 }
123
124private:
130 [[nodiscard]] std::shared_ptr<executors::IExecutor> executor() {
131 auto res = executor_.lock();
132 // This won't occur without a bug.
133 assert(res);
134 return res;
135 }
136
144 logger_, "Timeout of an RPC (request ID: {}).", request_id);
145 std::unique_lock<std::mutex> lock(mutex_);
146 const auto iter = list_.find(request_id);
147 if (iter == list_.end()) {
148 return;
149 }
150 iter->second.set(Status(StatusCode::TIMEOUT,
151 "Result of an RPC couldn't be received within a timeout."));
152 list_.erase(iter);
153 }
154
156 std::unordered_map<messages::MessageID, Call> list_{};
157
160
162 std::mutex mutex_{};
163
165 std::chrono::nanoseconds timeout_;
166
168 std::weak_ptr<executors::IExecutor> executor_;
169
171 std::shared_ptr<logging::Logger> logger_;
172};
173
174} // namespace msgpack_rpc::clients::impl
Definition of Call class.
Definition of CallFutureImpl class.
CallList(std::chrono::nanoseconds timeout, std::weak_ptr< executors::IExecutor > executor, std::shared_ptr< logging::Logger > logger)
Constructor.
Definition call_list.h:58
std::unordered_map< messages::MessageID, Call > list_
List.
Definition call_list.h:156
std::shared_ptr< executors::IExecutor > executor()
Get the executor.
Definition call_list.h:130
std::mutex mutex_
Mutex of data.
Definition call_list.h:162
void handle(const messages::ParsedResponse &response)
Handle a response.
Definition call_list.h:111
std::shared_ptr< logging::Logger > logger_
Logger.
Definition call_list.h:171
std::weak_ptr< executors::IExecutor > executor_
Executor.
Definition call_list.h:168
std::tuple< messages::MessageID, messages::SerializedMessage, std::shared_ptr< CallFutureImpl > > create(messages::MethodNameView method_name, const IParametersSerializer &parameters)
Register an RPC.
Definition call_list.h:74
std::chrono::nanoseconds timeout_
Timeout of RPCs.
Definition call_list.h:165
void on_timeout(messages::MessageID request_id)
Handle timeout of a RPC.
Definition call_list.h:142
RequestIDGenerator request_id_generator_
Generator of message IDs of requests.
Definition call_list.h:159
virtual messages::SerializedMessage create_serialized_request(messages::MethodNameView method_name, messages::MessageID request_id) const =0
Create a serialized request data.
Class to generate message IDs of requests.
Class of exceptions in cpp-msgpack-rpc library.
Class of statuses.
Definition status.h:34
MessageID id() const noexcept
Get the message ID.
const CallResult & result() const noexcept
Get the result.
Class of serialized message data.
Definition of IExecutor class.
Definition of Logger class.
#define MSGPACK_RPC_WARN(LOGGER_PTR,...)
Write a warning log.
Definition logger.h:210
#define MSGPACK_RPC_TRACE(LOGGER_PTR,...)
Write a trace log.
Definition logger.h:174
Definition of MessageID type.
Definition of MethodNameView class.
Definition of MsgpackRPCException class.
Namespace of internal implementations.
std::uint32_t MessageID
Type of message IDs.
Definition message_id.h:27
STL namespace.
Definition of ParametersSerializer class.
Definition of ParsedResponse class.
Definition of RequestIDGenerator class.
Definition of SerializedMessage class.
Definition of Status class.
Definition of StatusCode enumeration.