BitMagic-C++
bmthreadpool.h
Go to the documentation of this file.
1#ifndef BMTPOOL__H__INCLUDED__
2#define BMTPOOL__H__INCLUDED__
3/*
4Copyright(c) 2002-2021 Anatoliy Kuznetsov(anatoliy_kuznetsov at yahoo.com)
5
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18For more information please visit: http://bitmagic.io
19*/
20
21#include <type_traits>
22#include <queue>
23#include <thread>
24#include <mutex>
25#include <atomic>
26#include <condition_variable>
27
28#include "bmbuffer.h"
29#include "bmtask.h"
30
31namespace bm
32{
33
34
35/// Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
36/// @internal
37struct pad60_struct { char c[60]; };
38/// Empty padding
39/// @internal
40struct pad0_struct { };
41
42/**
43 Spin-lock with two-phase acquire (read + cas)
44 padding parameter optionally adds a buffer to avoid CPU cache
45 line contention.
46 TODO: test if padding realy helps in our case
47
48 Generally spin_lock does not have advantage over std::mutex
49 but in some specific cases like WebAssembly it may be better
50 due no "noexcept" property
51
52 @ingroup bmtasks
53 */
54template<class Pad = bm::pad0_struct>
56{
57public:
58 spin_lock() noexcept : locked_(0) {}
59
60 /// Lock the lock
61 void lock() noexcept
62 {
63 while(1) // spin loop
64 {
65 unsigned locked = locked_.load(std::memory_order_relaxed);
66 if (!locked &&
67 locked_.compare_exchange_weak(locked, true,
68 std::memory_order_acquire))
69 break;
70#if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
71 _mm_pause();
72#endif
73 } // while
74 }
75
76 /// Try to acquire the lock, return true if successfull
77 ///
78 bool try_lock() noexcept
79 {
80 unsigned locked = locked_.load(std::memory_order_relaxed);
81 if (!locked &&
82 locked_.compare_exchange_weak(locked, true,
83 std::memory_order_acquire))
84 return true;
85 return false;
86 }
87
88 /// Unlock the lock
89 void unlock() noexcept
90 {
91 locked_.store(false, std::memory_order_release);
92 }
93private:
94 spin_lock(const spin_lock&)=delete;
95 spin_lock& operator=(const spin_lock&)=delete;
96
97private:
98 std::atomic<unsigned> locked_;
99 Pad p_;
100};
101
102/// Wait for multiple threads to exit
103///
104/// @internal
105/// @ingroup bmtasks
106///
107template<typename TCont>
108void join_multiple_threads(TCont& tcont)
109{
110 typename TCont::iterator it_end = tcont.end();
111 for (typename TCont::iterator it = tcont.begin(); it != it_end; ++it)
112 {
113 if (it->joinable())
114 it->join();
115 } // for it
116}
117
118
119template<typename QValue, typename Lock> class thread_pool;
120
121
122/**
123 Thread-sync queue with MT access protecion
124
125 @ingroup bmtasks
126 @internal
127 */
128template<typename Value, typename Lock>
130{
131public:
132 typedef Value value_type;
133 typedef Lock lock_type;
134
135 /// constructor
136 ///
137 queue_sync() noexcept {}
138
139 /// Push value to the back of the queue
140 /// @param v - value to put in the queue
141 ///
142 /// @sa push_no_lock
143 ///
144 void push(const value_type& v) //noexcept(bm::is_lock_noexcept<lock_type>::value)
145 {
146 {
147 std::lock_guard<lock_type> lg(dq_lock_);
148 data_queue_.push(v);
149 }
150 queue_push_cond_.notify_one(); // noexcept
151 }
152
153 /// Push value to the back of the queue without lock protection
154 /// It is assumed that processing did not start and we are just staging
155 /// the batch
156 ///
157 /// @param v - value to put in the queue
158 /// @sa push
159 ///
161 {
162 data_queue_.push(v);
163 }
164
165
166 /// Extract value
167 /// @param v - [out] value returned
168 /// @return true if extracted
169 ///
171 {
172 std::lock_guard<lock_type> guard(dq_lock_);
173 if (data_queue_.empty())
174 return false;
175 v = data_queue_.front();
176 data_queue_.pop();
177 return true;
178 }
179
180 /// @return true if empty
181 bool empty() const //noexcept(bm::is_lock_noexcept<lock_type>::value)
182 {
183 std::lock_guard<lock_type> guard(dq_lock_);
184 return data_queue_.empty();
185 }
186
187 /// lock the queue access
188 /// @sa push_no_lock, unlock
189 void lock() noexcept(bm::is_lock_noexcept<lock_type>::value)
190 { dq_lock_.lock(); }
191
192 /// Try to lock the queue exclusively
193 ///
194 bool try_lock() noexcept(bm::is_lock_noexcept<lock_type>::value)
195 { return dq_lock_.try_lock(); }
196
197 /// unlock the queue access
198 /// @sa push_no_lock, lock
199 void unlock() noexcept(bm::is_lock_noexcept<lock_type>::value)
200 {
201 dq_lock_.unlock();
202 // lock-unlock is done to protect bulk push, need to wake up
203 // all waiting workers
204 queue_push_cond_.notify_all(); // noexcept
205 }
206
207 template<typename QV, typename L> friend class bm::thread_pool;
208
209protected:
210 typedef std::queue<value_type> queue_type;
211
212private:
213 queue_sync(const queue_sync&) = delete;
214 queue_sync& operator=(const queue_sync&) = delete;
215private:
216 queue_type data_queue_; ///< queue object
217 mutable lock_type dq_lock_; ///< lock for queue
218
219 // signal structure for wait on empty queue
220protected:
221 mutable std::mutex signal_mut_; ///< signal mutex for q submissions
222 std::condition_variable queue_push_cond_; ///< mutex paired conditional
223};
224
225
226/**
227 Thread pool with custom (thread safe) queue
228
229 Thread pool implements a busy-wait task stealing
230 design pattern
231
232 QValue - task queue value parameter
233 Lock - locking protection type (like std::mutex or spinlock)
234
235 @ingroup bmtasks
236*/
237template<typename QValue, typename Lock>
239{
240public:
241 typedef QValue value_type;
242 typedef Lock lock_type;
244
245 /**
246 Stop modes for threads:
247 0 - keep running/waiting for jobs
248 1 - wait for empty task queue then stop threads
249 2 - stop threads now even if there are pending tasks
250 */
252 {
253 no_stop = 0, ///< keep spinning on busy-wait
254 stop_when_done = 1, ///< stop if tsak queue is empty
255 stop_now = 2 ///< stop right now
256 };
257
258public:
260 : stop_flag_(sm)
261 {}
262
263 ~thread_pool();
264
265 /** Setup the criteria for threads shutdown
266 Also notifies all threads on a new directive
267 @param sm - stop mode
268 */
269 void set_stop_mode(stop_mode sm) noexcept;
270
271 /**
272 Request an immediate stop of all threads in the pool
273 */
274 void stop() noexcept { set_stop_mode(stop_now); }
275
276 /**
277 Start thread pool worker threads.
278 @param tcount - number of threads to start
279 */
280 void start(unsigned tcount);
281
282 /**
283 Wait for threads to finish (or stop if stop was requested)
284 */
285 void join();
286
287 /**
288 Conditional spin-wait for the queue to empty
289 (Important note: tasks may still be running, but the queue is empty)
290 */
291 void wait_empty_queue();
292
293 /// Get access to the job submission queue
294 ///
295 queue_type& get_job_queue() noexcept { return job_queue_; }
296
297 /// Return if thread pool is stopped by a request
298 int is_stopped() const noexcept
299 { return stop_flag_.load(std::memory_order_relaxed); }
300
301protected:
302
303 /// Internal worker wrapper with busy-wait spin loop
304 /// making pthread-like call for tasks
305 ///
306 void worker_func();
307
308private:
309 thread_pool(const thread_pool&)=delete;
310 thread_pool& operator=(const thread_pool&)=delete;
311
312private:
313 queue_type job_queue_; ///< queue (thread sync)
314 std::vector<std::thread> thread_vect_; ///< threads servicing queue
315 std::atomic_int stop_flag_{0}; ///< stop flag to all threads
316
317 // notification channel for results wait
318 mutable std::mutex task_done_mut_; ///< signal mutex for task done
319 std::condition_variable task_done_cond_;///< mutex paired conditional
320
321};
322
323/**
324 Utility class to submit task batch to the running thread pool
325 and optionally wait for it getting done
326
327 @ingroup bmtasks
328 */
329template<typename TPool>
331{
332public:
333 typedef TPool thread_pool_type;
335
336public:
338
339 static
340 void run(thread_pool_type& tpool,
341 bm::task_batch_base& tasks,
342 bool wait_for_batch);
343
344 /**
345 Check if all batch jobs in the specified interval are done
346 Spin wait if not.
347 */
348 static
350 bm::task_batch_base& tasks,
353private:
355 thread_pool_executor& operator=(const thread_pool_executor&) = delete;
356};
357
358
359// ========================================================================
360// thread_pool<> implementations
361// ========================================================================
362
363
364// -----------------------------------------------------------------------
365
366template<typename QValue, typename Lock>
368{
369 int is_stop = stop_flag_;
370 if (!is_stop) // finish the outstanding jobs and close threads
371 set_stop_mode(stop_when_done);
372 join();
373}
374
375// -----------------------------------------------------------------------
376
377template<typename QValue, typename Lock>
379{
380 stop_flag_ = sm;
381 job_queue_.queue_push_cond_.notify_all(); // this is noexcept
382}
383
384// -----------------------------------------------------------------------
385
386template<typename QValue, typename Lock>
388{
389 int is_stop = stop_flag_.load(std::memory_order_relaxed);
390 if (is_stop == stop_now) // immediate stop requested
391 return;
392 // TODO: consider lock protect of thread_vect_ member
393 for(unsigned i = 0;i < tcount; ++i)
394 {
395 thread_vect_.emplace_back(
396 std::thread(&thread_pool::worker_func,this));
397 } // for
398}
399
400// -----------------------------------------------------------------------
401
402template<typename QValue, typename Lock>
404{
405 bm::join_multiple_threads(thread_vect_);
406 thread_vect_.resize(0);
407}
408
409// -----------------------------------------------------------------------
410
411template<typename QValue, typename Lock>
413{
414 const std::chrono::duration<int, std::milli> wait_duration(20);
415 while(1)
416 {
417 if (job_queue_.empty())
418 break;
419 std::cv_status wait_res;
420 {
421 std::unique_lock<std::mutex> lk(task_done_mut_);
422 wait_res = task_done_cond_.wait_for(lk, wait_duration);
423 }
424 if (wait_res == std::cv_status::timeout)
425 {
426 std::this_thread::yield();
427 int is_stop = is_stopped();
428 if (is_stop == stop_now) // immediate stop requested
429 return;
430 }
431 } // while
432}
433
434// -----------------------------------------------------------------------
435
436template<typename QValue, typename Lock>
438{
439 const std::chrono::duration<int, std::milli> wait_duration(10);
440 while(1)
441 {
442 int is_stop = is_stopped();
443 if (is_stop == stop_now) // immediate stop requested
444 break;
445
447 if (job_queue_.try_pop(task_descr))
448 {
450 try
451 {
453 }
454 catch (...)
455 {
456 task_descr->err_code = -1;
457 }
458 task_descr->done.store(1, std::memory_order_release);
459 task_done_cond_.notify_one();
460 continue;
461 }
462 // queue appears to be empty, check if requested to stop
463 //
464 is_stop = is_stopped();
465 if (is_stop)
466 return;
467
468 // enter a temporal condition wait
469 // notifications are treated as unreliable re-verified
470 // via spin over the poll of the queue
471 std::cv_status wait_res;
472 {
473 std::unique_lock<std::mutex> lk(job_queue_.signal_mut_);
474 wait_res =
475 job_queue_.queue_push_cond_.wait_for(lk, wait_duration);
476 }
477 if (wait_res == std::cv_status::timeout)
478 {
479 is_stop = is_stopped();
480 if (is_stop == stop_now) // immediate stop requested
481 return;
482 std::this_thread::yield();
483 }
484 } // while
485 return;
486}
487
488// ========================================================================
489// thread_pool_executor<> implementations
490// ========================================================================
491
492
493template<typename TPool>
495 thread_pool_type& tpool,
497 bool wait_for_batch)
498{
499 typename thread_pool_type::queue_type& qu = tpool.get_job_queue();
500
502 for (task_batch_base::size_type i = 0; i < batch_size; ++i)
503 {
505 tdescr->argp = tdescr; // restore the self referenece
506 BM_ASSERT(!tdescr->done);
507
508 if (tdescr->flags != bm::task_descr::no_flag) // barrier task ?
509 {
510 if (i) // wait until all previously scheduled tasks are done
511 {
512 tpool.wait_empty_queue();
513 wait_for_batch_done(tpool, task_batch, 0, batch_size - 1);
514 }
515
516 // run the barrier proc on the curent thread
517 tdescr->err_code = tdescr->func(tdescr->argp);
518 tdescr->done.store(1, std::memory_order_release);
519
520 // re-read the batch size, if barrier added more tasks
521 task_batch_base::size_type new_batch_size = task_batch.size();
522 if (new_batch_size != batch_size)
523 batch_size = new_batch_size;
524 continue;
525 }
526
527 qu.push(tdescr); // locked push to the thread queue
528
529 auto is_stop = tpool.is_stopped();
530 if (is_stop == thread_pool_type::stop_now)
531 break; // thread pool stop requested
532
533 } // for
534
535
536 // implicit wait barrier for all tasks
537 if (wait_for_batch && batch_size)
538 {
539 tpool.wait_empty_queue();
540 wait_for_batch_done(tpool, task_batch, 0, batch_size - 1);
541 }
542}
543
544
545// -----------------------------------------------------------------------
546
547template<typename TPool>
549 thread_pool_type& tpool,
550 bm::task_batch_base& tasks,
553{
554 BM_ASSERT(from_idx <= to_idx);
555 BM_ASSERT(to_idx < tasks.size());
556
557 for (task_batch_base::size_type i = from_idx; i <= to_idx; ++i)
558 {
559 const bm::task_descr* tdescr = tasks.get_task(i);
560 auto done = tdescr->done.load(std::memory_order_consume);
561 while (!done)
562 {
563 auto is_stop = tpool.is_stopped();
564 if (is_stop == thread_pool_type::stop_now)
565 return; // thread pool stopped, jobs will not be done
566 std::this_thread::yield();
567 // TODO: subscribe to a conditional wait for job done in tpool
568 done = tdescr->done.load(std::memory_order_acquire);
569 } // while
570 } // for
571
572}
573
574// -----------------------------------------------------------------------
575
576
577} // bm
578
579#endif
#define BM_ASSERT
Definition: bmdef.h:139
Task definitions for parallel programming with BitMagic.
Thread-sync queue with MT access protecion.
Definition: bmthreadpool.h:130
std::queue< value_type > queue_type
Definition: bmthreadpool.h:210
bool empty() const
Definition: bmthreadpool.h:181
bool try_pop(value_type &v)
Extract value.
Definition: bmthreadpool.h:170
void push(const value_type &v)
Push value to the back of the queue.
Definition: bmthreadpool.h:144
void lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
lock the queue access
Definition: bmthreadpool.h:189
void push_no_lock(const value_type &v)
Push value to the back of the queue without lock protection It is assumed that processing did not sta...
Definition: bmthreadpool.h:160
std::condition_variable queue_push_cond_
mutex paired conditional
Definition: bmthreadpool.h:222
std::mutex signal_mut_
signal mutex for q submissions
Definition: bmthreadpool.h:221
queue_sync() noexcept
constructor
Definition: bmthreadpool.h:137
bool try_lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
Try to lock the queue exclusively.
Definition: bmthreadpool.h:194
void unlock() noexcept(bm::is_lock_noexcept< lock_type >::value)
unlock the queue access
Definition: bmthreadpool.h:199
Spin-lock with two-phase acquire (read + cas) padding parameter optionally adds a buffer to avoid CPU...
Definition: bmthreadpool.h:56
spin_lock() noexcept
Definition: bmthreadpool.h:58
bool try_lock() noexcept
Try to acquire the lock, return true if successfull.
Definition: bmthreadpool.h:78
void unlock() noexcept
Unlock the lock.
Definition: bmthreadpool.h:89
void lock() noexcept
Lock the lock.
Definition: bmthreadpool.h:61
Interface definition (base class) for a group of tasks (batch)
Definition: bmtask.h:118
virtual size_type size() const =0
Return size of batch.
unsigned size_type
Definition: bmtask.h:120
virtual bm::task_descr * get_task(size_type task_idx)=0
Get task by index in the batch.
Basic implementation for collection of tasks for parallel execution.
Definition: bmtask.h:140
virtual size_type size() const BMNOEXCEPT
task_batch_base intreface implementation
Definition: bmtask.h:158
virtual bm::task_descr * get_task(size_type task_idx)
Get task by index in the batch.
Definition: bmtask.h:160
Utility class to submit task batch to the running thread pool and optionally wait for it getting done...
Definition: bmthreadpool.h:331
static void wait_for_batch_done(thread_pool_type &tpool, bm::task_batch_base &tasks, task_batch_base::size_type from_idx, task_batch_base::size_type to_idx)
Check if all batch jobs in the specified interval are done Spin wait if not.
Definition: bmthreadpool.h:548
static void run(thread_pool_type &tpool, bm::task_batch_base &tasks, bool wait_for_batch)
Definition: bmthreadpool.h:494
task_batch_base::size_type size_type
Definition: bmthreadpool.h:334
Thread pool with custom (thread safe) queue.
Definition: bmthreadpool.h:239
void stop() noexcept
Request an immediate stop of all threads in the pool.
Definition: bmthreadpool.h:274
queue_type & get_job_queue() noexcept
Get access to the job submission queue.
Definition: bmthreadpool.h:295
bm::queue_sync< QValue, lock_type > queue_type
Definition: bmthreadpool.h:243
void join()
Wait for threads to finish (or stop if stop was requested)
Definition: bmthreadpool.h:403
thread_pool(stop_mode sm=no_stop) noexcept
Definition: bmthreadpool.h:259
void worker_func()
Internal worker wrapper with busy-wait spin loop making pthread-like call for tasks.
Definition: bmthreadpool.h:437
int is_stopped() const noexcept
Return if thread pool is stopped by a request.
Definition: bmthreadpool.h:298
void start(unsigned tcount)
Start thread pool worker threads.
Definition: bmthreadpool.h:387
void wait_empty_queue()
Conditional spin-wait for the queue to empty (Important note: tasks may still be running,...
Definition: bmthreadpool.h:412
stop_mode
Stop modes for threads: 0 - keep running/waiting for jobs 1 - wait for empty task queue then stop thr...
Definition: bmthreadpool.h:252
@ no_stop
keep spinning on busy-wait
Definition: bmthreadpool.h:253
@ stop_when_done
stop if tsak queue is empty
Definition: bmthreadpool.h:254
@ stop_now
stop right now
Definition: bmthreadpool.h:255
void set_stop_mode(stop_mode sm) noexcept
Setup the criteria for threads shutdown Also notifies all threads on a new directive.
Definition: bmthreadpool.h:378
Definition: bm.h:78
void join_multiple_threads(TCont &tcont)
Wait for multiple threads to exit.
Definition: bmthreadpool.h:108
"noexcept" traits detection for T::lock()
Definition: bmtask.h:213
Empty padding.
Definition: bmthreadpool.h:40
Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
Definition: bmthreadpool.h:37
BitMagic task with a captured function.
Definition: bmtask.h:62
void * argp
arg pointer
Definition: bmtask.h:72
std::atomic_bool done
0 - pending
Definition: bmtask.h:77
@ no_flag
no flag specified
Definition: bmtask.h:65
bm::id64_t flags
task flags to designate barriers
Definition: bmtask.h:75
task_function_t func
captured function callback
Definition: bmtask.h:71
int err_code
error code
Definition: bmtask.h:76