]>
Commit | Line | Data |
---|---|---|
fc72c078 S |
1 | // Copyright (c) 2016 The Zcash developers |
2 | // Distributed under the MIT software license, see the accompanying | |
3 | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | |
4 | ||
5 | #include "asyncrpcqueue.h" | |
6 | ||
3b54bf58 | 7 | static std::atomic<size_t> workerCounter(0); |
fc72c078 | 8 | |
423a63d0 S |
9 | /** |
10 | * Static method to return the shared/default queue. | |
11 | */ | |
12 | shared_ptr<AsyncRPCQueue> AsyncRPCQueue::sharedInstance() { | |
13 | // Thread-safe in C+11 and gcc 4.3 | |
14 | static shared_ptr<AsyncRPCQueue> q = std::make_shared<AsyncRPCQueue>(); | |
15 | return q; | |
16 | } | |
17 | ||
9cd71343 | 18 | AsyncRPCQueue::AsyncRPCQueue() : closed_(false), finish_(false) { |
fc72c078 S |
19 | } |
20 | ||
fc72c078 | 21 | AsyncRPCQueue::~AsyncRPCQueue() { |
3b54bf58 | 22 | closeAndWait(); // join on all worker threads |
fc72c078 S |
23 | } |
24 | ||
3b54bf58 | 25 | /** |
fc72c078 S |
26 | * A worker will execute this method on a new thread |
27 | */ | |
3b54bf58 | 28 | void AsyncRPCQueue::run(size_t workerId) { |
fc72c078 | 29 | |
9cd71343 | 30 | while (true) { |
fc72c078 S |
31 | AsyncRPCOperationId key; |
32 | std::shared_ptr<AsyncRPCOperation> operation; | |
33 | { | |
9bdad434 | 34 | std::unique_lock<std::mutex> guard(lock_); |
9cd71343 | 35 | while (operation_id_queue_.empty() && !isClosed() && !isFinishing()) { |
3b54bf58 | 36 | this->condition_.wait(guard); |
fc72c078 S |
37 | } |
38 | ||
9cd71343 | 39 | // Exit if the queue is empty and we are finishing up |
61ea2aba | 40 | if (isFinishing() && operation_id_queue_.empty()) { |
9cd71343 S |
41 | break; |
42 | } | |
43 | ||
fc72c078 | 44 | // Exit if the queue is closing. |
3b54bf58 | 45 | if (isClosed()) { |
017b3ede S |
46 | while (!operation_id_queue_.empty()) { |
47 | operation_id_queue_.pop(); | |
48 | } | |
fc72c078 | 49 | break; |
3b54bf58 | 50 | } |
fc72c078 S |
51 | |
52 | // Get operation id | |
3b54bf58 S |
53 | key = operation_id_queue_.front(); |
54 | operation_id_queue_.pop(); | |
fc72c078 S |
55 | |
56 | // Search operation map | |
3b54bf58 S |
57 | AsyncRPCOperationMap::const_iterator iter = operation_map_.find(key); |
58 | if (iter != operation_map_.end()) { | |
fc72c078 S |
59 | operation = iter->second; |
60 | } | |
61 | } | |
62 | ||
63 | if (!operation) { | |
64 | // cannot find operation in map, may have been removed | |
65 | } else if (operation->isCancelled()) { | |
66 | // skip cancelled operation | |
67 | } else { | |
68 | operation->main(); | |
69 | } | |
70 | } | |
fc72c078 S |
71 | } |
72 | ||
73 | ||
3b54bf58 | 74 | /** |
fc72c078 S |
75 | * Add shared_ptr to operation. |
76 | * | |
77 | * To retain polymorphic behaviour, i.e. main() method of derived classes is invoked, | |
3b54bf58 | 78 | * caller should create the shared_ptr like this: |
fc72c078 S |
79 | * |
80 | * std::shared_ptr<AsyncRPCOperation> ptr(new MyCustomAsyncRPCOperation(params)); | |
81 | * | |
82 | * Don't use std::make_shared<AsyncRPCOperation>(). | |
83 | */ | |
84 | void AsyncRPCQueue::addOperation(const std::shared_ptr<AsyncRPCOperation> &ptrOperation) { | |
69a4cb44 | 85 | std::lock_guard<std::mutex> guard(lock_); |
fc72c078 | 86 | |
9cd71343 S |
87 | // Don't add if queue is closed or finishing |
88 | if (isClosed() || isFinishing()) { | |
fc72c078 | 89 | return; |
3b54bf58 | 90 | } |
fc72c078 S |
91 | |
92 | AsyncRPCOperationId id = ptrOperation->getId(); | |
3b54bf58 S |
93 | operation_map_.emplace(id, ptrOperation); |
94 | operation_id_queue_.push(id); | |
95 | this->condition_.notify_one(); | |
fc72c078 S |
96 | } |
97 | ||
3b54bf58 S |
98 | /** |
99 | * Return the operation for a given operation id. | |
100 | */ | |
101 | std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) const { | |
fc72c078 S |
102 | std::shared_ptr<AsyncRPCOperation> ptr; |
103 | ||
e91048f2 | 104 | std::lock_guard<std::mutex> guard(lock_); |
3b54bf58 S |
105 | AsyncRPCOperationMap::const_iterator iter = operation_map_.find(id); |
106 | if (iter != operation_map_.end()) { | |
fc72c078 S |
107 | ptr = iter->second; |
108 | } | |
109 | return ptr; | |
110 | } | |
111 | ||
3b54bf58 S |
112 | /** |
113 | * Return the operation for a given operation id and then remove the operation from internal storage. | |
114 | */ | |
fc72c078 S |
115 | std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) { |
116 | std::shared_ptr<AsyncRPCOperation> ptr = getOperationForId(id); | |
117 | if (ptr) { | |
e91048f2 | 118 | std::lock_guard<std::mutex> guard(lock_); |
fc72c078 S |
119 | // Note: if the id still exists in the operationIdQueue, when it gets processed by a worker |
120 | // there will no operation in the map to execute, so nothing will happen. | |
3b54bf58 | 121 | operation_map_.erase(id); |
fc72c078 S |
122 | } |
123 | return ptr; | |
124 | } | |
125 | ||
3b54bf58 S |
126 | /** |
127 | * Return true if the queue is closed to new operations. | |
128 | */ | |
129 | bool AsyncRPCQueue::isClosed() const { | |
9cd71343 | 130 | return closed_.load(); |
fc72c078 S |
131 | } |
132 | ||
3b54bf58 S |
133 | /** |
134 | * Close the queue and cancel all existing operations | |
135 | */ | |
fc72c078 | 136 | void AsyncRPCQueue::close() { |
9cd71343 | 137 | closed_.store(true); |
fc72c078 S |
138 | cancelAllOperations(); |
139 | } | |
140 | ||
9cd71343 S |
141 | /** |
142 | * Return true if the queue is finishing up | |
143 | */ | |
144 | bool AsyncRPCQueue::isFinishing() const { | |
145 | return finish_.load(); | |
146 | } | |
147 | ||
148 | /** | |
149 | * Close the queue but finish existing operations. Do not accept new operations. | |
150 | */ | |
151 | void AsyncRPCQueue::finish() { | |
152 | finish_.store(true); | |
153 | } | |
154 | ||
3b54bf58 | 155 | /** |
fc72c078 S |
156 | * Call cancel() on all operations |
157 | */ | |
158 | void AsyncRPCQueue::cancelAllOperations() { | |
5e363861 | 159 | std::lock_guard<std::mutex> guard(lock_); |
3b54bf58 | 160 | for (auto key : operation_map_) { |
fc72c078 S |
161 | key.second->cancel(); |
162 | } | |
3b54bf58 | 163 | this->condition_.notify_all(); |
fc72c078 S |
164 | } |
165 | ||
3b54bf58 S |
166 | /** |
167 | * Return the number of operations in the queue | |
168 | */ | |
169 | size_t AsyncRPCQueue::getOperationCount() const { | |
5e363861 | 170 | std::lock_guard<std::mutex> guard(lock_); |
3b54bf58 | 171 | return operation_id_queue_.size(); |
fc72c078 S |
172 | } |
173 | ||
3b54bf58 | 174 | /** |
fc72c078 S |
175 | * Spawn a worker thread |
176 | */ | |
177 | void AsyncRPCQueue::addWorker() { | |
5e363861 | 178 | std::lock_guard<std::mutex> guard(lock_); |
3b54bf58 | 179 | workers_.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) ); |
fc72c078 S |
180 | } |
181 | ||
3b54bf58 S |
182 | /** |
183 | * Return the number of worker threads spawned by the queue | |
184 | */ | |
185 | size_t AsyncRPCQueue::getNumberOfWorkers() const { | |
a50fd5fd | 186 | std::lock_guard<std::mutex> guard(lock_); |
3b54bf58 | 187 | return workers_.size(); |
fc72c078 | 188 | } |
861513a0 | 189 | |
3b54bf58 S |
190 | /** |
191 | * Return a list of all known operation ids found in internal storage. | |
192 | */ | |
193 | std::vector<AsyncRPCOperationId> AsyncRPCQueue::getAllOperationIds() const { | |
5e363861 | 194 | std::lock_guard<std::mutex> guard(lock_); |
861513a0 | 195 | std::vector<AsyncRPCOperationId> v; |
3b54bf58 | 196 | for(auto & entry: operation_map_) { |
861513a0 | 197 | v.push_back(entry.first); |
3b54bf58 | 198 | } |
861513a0 S |
199 | return v; |
200 | } | |
201 | ||
3b54bf58 S |
202 | /** |
203 | * Calling thread will close and wait for worker threads to join. | |
204 | */ | |
205 | void AsyncRPCQueue::closeAndWait() { | |
9cd71343 S |
206 | close(); |
207 | wait_for_worker_threads(); | |
208 | } | |
209 | ||
210 | /** | |
211 | * Block current thread until all workers have finished their tasks. | |
212 | */ | |
213 | void AsyncRPCQueue::finishAndWait() { | |
214 | finish(); | |
215 | wait_for_worker_threads(); | |
216 | } | |
217 | ||
218 | /** | |
219 | * Block current thread until all operations are finished or the queue has closed. | |
220 | */ | |
221 | void AsyncRPCQueue::wait_for_worker_threads() { | |
222 | // Notify any workers who are waiting, so they see the updated queue state | |
223 | { | |
5e363861 | 224 | std::lock_guard<std::mutex> guard(lock_); |
9cd71343 | 225 | this->condition_.notify_all(); |
3b54bf58 | 226 | } |
9cd71343 | 227 | |
3b54bf58 S |
228 | for (std::thread & t : this->workers_) { |
229 | if (t.joinable()) { | |
230 | t.join(); | |
231 | } | |
232 | } | |
233 | } |