]> Git Repo - VerusCoin.git/blame - src/asyncrpcqueue.cpp
Merge pull request #557 from jl777/kolo-assets-new
[VerusCoin.git] / src / asyncrpcqueue.cpp
CommitLineData
fc72c078
S
1// Copyright (c) 2016 The Zcash developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include "asyncrpcqueue.h"
6
3b54bf58 7static std::atomic<size_t> workerCounter(0);
fc72c078 8
423a63d0
S
9/**
10 * Static method to return the shared/default queue.
11 */
12shared_ptr<AsyncRPCQueue> AsyncRPCQueue::sharedInstance() {
13 // Thread-safe in C+11 and gcc 4.3
14 static shared_ptr<AsyncRPCQueue> q = std::make_shared<AsyncRPCQueue>();
15 return q;
16}
17
9cd71343 18AsyncRPCQueue::AsyncRPCQueue() : closed_(false), finish_(false) {
fc72c078
S
19}
20
fc72c078 21AsyncRPCQueue::~AsyncRPCQueue() {
3b54bf58 22 closeAndWait(); // join on all worker threads
fc72c078
S
23}
24
3b54bf58 25/**
fc72c078
S
26 * A worker will execute this method on a new thread
27 */
3b54bf58 28void AsyncRPCQueue::run(size_t workerId) {
fc72c078 29
9cd71343 30 while (true) {
fc72c078
S
31 AsyncRPCOperationId key;
32 std::shared_ptr<AsyncRPCOperation> operation;
33 {
9bdad434 34 std::unique_lock<std::mutex> guard(lock_);
9cd71343 35 while (operation_id_queue_.empty() && !isClosed() && !isFinishing()) {
3b54bf58 36 this->condition_.wait(guard);
fc72c078
S
37 }
38
9cd71343 39 // Exit if the queue is empty and we are finishing up
61ea2aba 40 if (isFinishing() && operation_id_queue_.empty()) {
9cd71343
S
41 break;
42 }
43
fc72c078 44 // Exit if the queue is closing.
3b54bf58 45 if (isClosed()) {
017b3ede
S
46 while (!operation_id_queue_.empty()) {
47 operation_id_queue_.pop();
48 }
fc72c078 49 break;
3b54bf58 50 }
fc72c078
S
51
52 // Get operation id
3b54bf58
S
53 key = operation_id_queue_.front();
54 operation_id_queue_.pop();
fc72c078
S
55
56 // Search operation map
3b54bf58
S
57 AsyncRPCOperationMap::const_iterator iter = operation_map_.find(key);
58 if (iter != operation_map_.end()) {
fc72c078
S
59 operation = iter->second;
60 }
61 }
62
63 if (!operation) {
64 // cannot find operation in map, may have been removed
65 } else if (operation->isCancelled()) {
66 // skip cancelled operation
67 } else {
68 operation->main();
69 }
70 }
fc72c078
S
71}
72
73
3b54bf58 74/**
fc72c078
S
75 * Add shared_ptr to operation.
76 *
77 * To retain polymorphic behaviour, i.e. main() method of derived classes is invoked,
3b54bf58 78 * caller should create the shared_ptr like this:
fc72c078
S
79 *
80 * std::shared_ptr<AsyncRPCOperation> ptr(new MyCustomAsyncRPCOperation(params));
81 *
82 * Don't use std::make_shared<AsyncRPCOperation>().
83 */
84void AsyncRPCQueue::addOperation(const std::shared_ptr<AsyncRPCOperation> &ptrOperation) {
69a4cb44 85 std::lock_guard<std::mutex> guard(lock_);
fc72c078 86
9cd71343
S
87 // Don't add if queue is closed or finishing
88 if (isClosed() || isFinishing()) {
fc72c078 89 return;
3b54bf58 90 }
fc72c078
S
91
92 AsyncRPCOperationId id = ptrOperation->getId();
3b54bf58
S
93 operation_map_.emplace(id, ptrOperation);
94 operation_id_queue_.push(id);
95 this->condition_.notify_one();
fc72c078
S
96}
97
3b54bf58
S
98/**
99 * Return the operation for a given operation id.
100 */
101std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) const {
fc72c078
S
102 std::shared_ptr<AsyncRPCOperation> ptr;
103
e91048f2 104 std::lock_guard<std::mutex> guard(lock_);
3b54bf58
S
105 AsyncRPCOperationMap::const_iterator iter = operation_map_.find(id);
106 if (iter != operation_map_.end()) {
fc72c078
S
107 ptr = iter->second;
108 }
109 return ptr;
110}
111
3b54bf58
S
112/**
113 * Return the operation for a given operation id and then remove the operation from internal storage.
114 */
fc72c078
S
115std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) {
116 std::shared_ptr<AsyncRPCOperation> ptr = getOperationForId(id);
117 if (ptr) {
e91048f2 118 std::lock_guard<std::mutex> guard(lock_);
fc72c078
S
119 // Note: if the id still exists in the operationIdQueue, when it gets processed by a worker
120 // there will no operation in the map to execute, so nothing will happen.
3b54bf58 121 operation_map_.erase(id);
fc72c078
S
122 }
123 return ptr;
124}
125
3b54bf58
S
126/**
127 * Return true if the queue is closed to new operations.
128 */
129bool AsyncRPCQueue::isClosed() const {
9cd71343 130 return closed_.load();
fc72c078
S
131}
132
3b54bf58
S
133/**
134 * Close the queue and cancel all existing operations
135 */
fc72c078 136void AsyncRPCQueue::close() {
9cd71343 137 closed_.store(true);
fc72c078
S
138 cancelAllOperations();
139}
140
9cd71343
S
141/**
142 * Return true if the queue is finishing up
143 */
144bool AsyncRPCQueue::isFinishing() const {
145 return finish_.load();
146}
147
148/**
149 * Close the queue but finish existing operations. Do not accept new operations.
150 */
151void AsyncRPCQueue::finish() {
152 finish_.store(true);
153}
154
3b54bf58 155/**
fc72c078
S
156 * Call cancel() on all operations
157 */
158void AsyncRPCQueue::cancelAllOperations() {
5e363861 159 std::lock_guard<std::mutex> guard(lock_);
3b54bf58 160 for (auto key : operation_map_) {
fc72c078
S
161 key.second->cancel();
162 }
3b54bf58 163 this->condition_.notify_all();
fc72c078
S
164}
165
3b54bf58
S
166/**
167 * Return the number of operations in the queue
168 */
169size_t AsyncRPCQueue::getOperationCount() const {
5e363861 170 std::lock_guard<std::mutex> guard(lock_);
3b54bf58 171 return operation_id_queue_.size();
fc72c078
S
172}
173
3b54bf58 174/**
fc72c078
S
175 * Spawn a worker thread
176 */
177void AsyncRPCQueue::addWorker() {
5e363861 178 std::lock_guard<std::mutex> guard(lock_);
3b54bf58 179 workers_.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) );
fc72c078
S
180}
181
3b54bf58
S
182/**
183 * Return the number of worker threads spawned by the queue
184 */
185size_t AsyncRPCQueue::getNumberOfWorkers() const {
a50fd5fd 186 std::lock_guard<std::mutex> guard(lock_);
3b54bf58 187 return workers_.size();
fc72c078 188}
861513a0 189
3b54bf58
S
190/**
191 * Return a list of all known operation ids found in internal storage.
192 */
193std::vector<AsyncRPCOperationId> AsyncRPCQueue::getAllOperationIds() const {
5e363861 194 std::lock_guard<std::mutex> guard(lock_);
861513a0 195 std::vector<AsyncRPCOperationId> v;
3b54bf58 196 for(auto & entry: operation_map_) {
861513a0 197 v.push_back(entry.first);
3b54bf58 198 }
861513a0
S
199 return v;
200}
201
3b54bf58
S
202/**
203 * Calling thread will close and wait for worker threads to join.
204 */
205void AsyncRPCQueue::closeAndWait() {
9cd71343
S
206 close();
207 wait_for_worker_threads();
208}
209
210/**
211 * Block current thread until all workers have finished their tasks.
212 */
213void AsyncRPCQueue::finishAndWait() {
214 finish();
215 wait_for_worker_threads();
216}
217
218/**
219 * Block current thread until all operations are finished or the queue has closed.
220 */
221void AsyncRPCQueue::wait_for_worker_threads() {
222 // Notify any workers who are waiting, so they see the updated queue state
223 {
5e363861 224 std::lock_guard<std::mutex> guard(lock_);
9cd71343 225 this->condition_.notify_all();
3b54bf58 226 }
9cd71343 227
3b54bf58
S
228 for (std::thread & t : this->workers_) {
229 if (t.joinable()) {
230 t.join();
231 }
232 }
233}
This page took 0.167989 seconds and 4 git commands to generate.