Commit | Line | Data |
---|---|---|
f9cae832 PW |
1 | // Copyright (c) 2012 The Bitcoin developers |
2 | // Distributed under the MIT/X11 software license, see the accompanying | |
3 | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | |
51ed9ec9 | 4 | |
f9cae832 PW |
5 | #ifndef CHECKQUEUE_H |
6 | #define CHECKQUEUE_H | |
7 | ||
51ed9ec9 BD |
8 | #include <algorithm> |
9 | #include <vector> | |
10 | ||
319b1160 | 11 | #include <boost/foreach.hpp> |
f9cae832 | 12 | #include <boost/thread/condition_variable.hpp> |
51ed9ec9 BD |
13 | #include <boost/thread/locks.hpp> |
14 | #include <boost/thread/mutex.hpp> | |
f9cae832 PW |
15 | |
16 | template<typename T> class CCheckQueueControl; | |
17 | ||
18 | /** Queue for verifications that have to be performed. | |
19 | * The verifications are represented by a type T, which must provide an | |
20 | * operator(), returning a bool. | |
21 | * | |
22 | * One thread (the master) is assumed to push batches of verifications | |
23 | * onto the queue, where they are processed by N-1 worker threads. When | |
24 | * the master is done adding work, it temporarily joins the worker pool | |
25 | * as an N'th worker, until all jobs are done. | |
26 | */ | |
27 | template<typename T> class CCheckQueue { | |
28 | private: | |
29 | // Mutex to protect the inner state | |
30 | boost::mutex mutex; | |
31 | ||
32 | // Worker threads block on this when out of work | |
33 | boost::condition_variable condWorker; | |
34 | ||
35 | // Master thread blocks on this when out of work | |
36 | boost::condition_variable condMaster; | |
37 | ||
f9cae832 PW |
38 | // The queue of elements to be processed. |
39 | // As the order of booleans doesn't matter, it is used as a LIFO (stack) | |
40 | std::vector<T> queue; | |
41 | ||
42 | // The number of workers (including the master) that are idle. | |
43 | int nIdle; | |
44 | ||
45 | // The total number of workers (including the master). | |
46 | int nTotal; | |
47 | ||
48 | // The temporary evaluation result. | |
49 | bool fAllOk; | |
50 | ||
51 | // Number of verifications that haven't completed yet. | |
52 | // This includes elements that are not anymore in queue, but still in | |
53 | // worker's own batches. | |
54 | unsigned int nTodo; | |
55 | ||
56 | // Whether we're shutting down. | |
57 | bool fQuit; | |
58 | ||
59 | // The maximum number of elements to be processed in one batch | |
60 | unsigned int nBatchSize; | |
61 | ||
62 | // Internal function that does bulk of the verification work. | |
63 | bool Loop(bool fMaster = false) { | |
64 | boost::condition_variable &cond = fMaster ? condMaster : condWorker; | |
65 | std::vector<T> vChecks; | |
66 | vChecks.reserve(nBatchSize); | |
67 | unsigned int nNow = 0; | |
68 | bool fOk = true; | |
69 | do { | |
70 | { | |
71 | boost::unique_lock<boost::mutex> lock(mutex); | |
72 | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | |
73 | if (nNow) { | |
74 | fAllOk &= fOk; | |
75 | nTodo -= nNow; | |
76 | if (nTodo == 0 && !fMaster) | |
77 | // We processed the last element; inform the master he can exit and return the result | |
78 | condMaster.notify_one(); | |
79 | } else { | |
80 | // first iteration | |
81 | nTotal++; | |
82 | } | |
83 | // logically, the do loop starts here | |
84 | while (queue.empty()) { | |
85 | if ((fMaster || fQuit) && nTodo == 0) { | |
86 | nTotal--; | |
f9cae832 PW |
87 | bool fRet = fAllOk; |
88 | // reset the status for new work later | |
89 | if (fMaster) | |
90 | fAllOk = true; | |
91 | // return the current status | |
92 | return fRet; | |
93 | } | |
94 | nIdle++; | |
95 | cond.wait(lock); // wait | |
96 | nIdle--; | |
97 | } | |
98 | // Decide how many work units to process now. | |
99 | // * Do not try to do everything at once, but aim for increasingly smaller batches so | |
100 | // all workers finish approximately simultaneously. | |
101 | // * Try to account for idle jobs which will instantly start helping. | |
102 | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | |
103 | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | |
104 | vChecks.resize(nNow); | |
105 | for (unsigned int i = 0; i < nNow; i++) { | |
106 | // We want the lock on the mutex to be as short as possible, so swap jobs from the global | |
107 | // queue to the local batch vector instead of copying. | |
108 | vChecks[i].swap(queue.back()); | |
109 | queue.pop_back(); | |
110 | } | |
111 | // Check whether we need to do work at all | |
112 | fOk = fAllOk; | |
113 | } | |
114 | // execute work | |
115 | BOOST_FOREACH(T &check, vChecks) | |
116 | if (fOk) | |
117 | fOk = check(); | |
118 | vChecks.clear(); | |
119 | } while(true); | |
120 | } | |
121 | ||
122 | public: | |
123 | // Create a new check queue | |
124 | CCheckQueue(unsigned int nBatchSizeIn) : | |
125 | nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {} | |
126 | ||
127 | // Worker thread | |
128 | void Thread() { | |
129 | Loop(); | |
130 | } | |
131 | ||
132 | // Wait until execution finishes, and return whether all evaluations where succesful. | |
133 | bool Wait() { | |
134 | return Loop(true); | |
135 | } | |
136 | ||
137 | // Add a batch of checks to the queue | |
138 | void Add(std::vector<T> &vChecks) { | |
139 | boost::unique_lock<boost::mutex> lock(mutex); | |
140 | BOOST_FOREACH(T &check, vChecks) { | |
141 | queue.push_back(T()); | |
142 | check.swap(queue.back()); | |
143 | } | |
144 | nTodo += vChecks.size(); | |
145 | if (vChecks.size() == 1) | |
146 | condWorker.notify_one(); | |
147 | else if (vChecks.size() > 1) | |
148 | condWorker.notify_all(); | |
149 | } | |
150 | ||
f7f3a96b | 151 | ~CCheckQueue() { |
f7f3a96b PW |
152 | } |
153 | ||
f9cae832 PW |
154 | friend class CCheckQueueControl<T>; |
155 | }; | |
156 | ||
157 | /** RAII-style controller object for a CCheckQueue that guarantees the passed | |
158 | * queue is finished before continuing. | |
159 | */ | |
160 | template<typename T> class CCheckQueueControl { | |
161 | private: | |
162 | CCheckQueue<T> *pqueue; | |
163 | bool fDone; | |
164 | ||
165 | public: | |
166 | CCheckQueueControl(CCheckQueue<T> *pqueueIn) : pqueue(pqueueIn), fDone(false) { | |
167 | // passed queue is supposed to be unused, or NULL | |
168 | if (pqueue != NULL) { | |
169 | assert(pqueue->nTotal == pqueue->nIdle); | |
170 | assert(pqueue->nTodo == 0); | |
171 | assert(pqueue->fAllOk == true); | |
172 | } | |
173 | } | |
174 | ||
175 | bool Wait() { | |
176 | if (pqueue == NULL) | |
177 | return true; | |
178 | bool fRet = pqueue->Wait(); | |
179 | fDone = true; | |
180 | return fRet; | |
181 | } | |
182 | ||
183 | void Add(std::vector<T> &vChecks) { | |
184 | if (pqueue != NULL) | |
185 | pqueue->Add(vChecks); | |
186 | } | |
187 | ||
188 | ~CCheckQueueControl() { | |
189 | if (!fDone) | |
190 | Wait(); | |
191 | } | |
192 | }; | |
193 | ||
093303a8 | 194 | #endif // CHECKQUEUE_H |