1#ifndef BMTPOOL__H__INCLUDED__
2#define BMTPOOL__H__INCLUDED__
26#include <condition_variable>
54template<
class Pad = bm::pad0_struct>
65 unsigned locked = locked_.load(std::memory_order_relaxed);
67 locked_.compare_exchange_weak(locked,
true,
68 std::memory_order_acquire))
70#if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
80 unsigned locked = locked_.load(std::memory_order_relaxed);
82 locked_.compare_exchange_weak(locked,
true,
83 std::memory_order_acquire))
91 locked_.store(
false, std::memory_order_release);
98 std::atomic<unsigned> locked_;
107template<
typename TCont>
110 typename TCont::iterator it_end = tcont.end();
111 for (
typename TCont::iterator it = tcont.begin(); it != it_end; ++it)
119template<
typename QValue,
typename Lock>
class thread_pool;
128template<
typename Value,
typename Lock>
147 std::lock_guard<lock_type> lg(dq_lock_);
172 std::lock_guard<lock_type> guard(dq_lock_);
173 if (data_queue_.empty())
175 v = data_queue_.front();
183 std::lock_guard<lock_type> guard(dq_lock_);
184 return data_queue_.empty();
195 {
return dq_lock_.try_lock(); }
237template<
typename QValue,
typename Lock>
280 void start(
unsigned tcount);
299 {
return stop_flag_.load(std::memory_order_relaxed); }
314 std::vector<std::thread> thread_vect_;
315 std::atomic_int stop_flag_{0};
318 mutable std::mutex task_done_mut_;
319 std::condition_variable task_done_cond_;
329template<
typename TPool>
342 bool wait_for_batch);
366template<
typename QValue,
typename Lock>
369 int is_stop = stop_flag_;
371 set_stop_mode(stop_when_done);
377template<
typename QValue,
typename Lock>
381 job_queue_.queue_push_cond_.notify_all();
386template<
typename QValue,
typename Lock>
389 int is_stop = stop_flag_.load(std::memory_order_relaxed);
390 if (is_stop == stop_now)
393 for(
unsigned i = 0;i < tcount; ++i)
395 thread_vect_.emplace_back(
402template<
typename QValue,
typename Lock>
406 thread_vect_.resize(0);
411template<
typename QValue,
typename Lock>
414 const std::chrono::duration<int, std::milli> wait_duration(20);
417 if (job_queue_.empty())
419 std::cv_status wait_res;
421 std::unique_lock<std::mutex> lk(task_done_mut_);
422 wait_res = task_done_cond_.wait_for(lk, wait_duration);
424 if (wait_res == std::cv_status::timeout)
426 std::this_thread::yield();
427 int is_stop = is_stopped();
428 if (is_stop == stop_now)
436template<
typename QValue,
typename Lock>
439 const std::chrono::duration<int, std::milli> wait_duration(10);
442 int is_stop = is_stopped();
443 if (is_stop == stop_now)
459 task_done_cond_.notify_one();
464 is_stop = is_stopped();
471 std::cv_status wait_res;
473 std::unique_lock<std::mutex> lk(job_queue_.signal_mut_);
475 job_queue_.queue_push_cond_.wait_for(lk, wait_duration);
477 if (wait_res == std::cv_status::timeout)
479 is_stop = is_stopped();
480 if (is_stop == stop_now)
482 std::this_thread::yield();
493template<
typename TPool>
499 typename thread_pool_type::queue_type& qu = tpool.get_job_queue();
505 tdescr->
argp = tdescr;
512 tpool.wait_empty_queue();
513 wait_for_batch_done(tpool,
task_batch, 0, batch_size - 1);
518 tdescr->
done.store(1, std::memory_order_release);
522 if (new_batch_size != batch_size)
523 batch_size = new_batch_size;
529 auto is_stop = tpool.is_stopped();
530 if (is_stop == thread_pool_type::stop_now)
537 if (wait_for_batch && batch_size)
539 tpool.wait_empty_queue();
540 wait_for_batch_done(tpool,
task_batch, 0, batch_size - 1);
547template<
typename TPool>
560 auto done = tdescr->
done.load(std::memory_order_consume);
563 auto is_stop = tpool.is_stopped();
564 if (is_stop == thread_pool_type::stop_now)
566 std::this_thread::yield();
568 done = tdescr->
done.load(std::memory_order_acquire);
Task definitions for parallel programming with BitMagic.
Thread-sync queue with MT access protecion.
std::queue< value_type > queue_type
bool try_pop(value_type &v)
Extract value.
void push(const value_type &v)
Push value to the back of the queue.
void lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
lock the queue access
void push_no_lock(const value_type &v)
Push value to the back of the queue without lock protection It is assumed that processing did not sta...
std::condition_variable queue_push_cond_
mutex paired conditional
std::mutex signal_mut_
signal mutex for q submissions
queue_sync() noexcept
constructor
bool try_lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
Try to lock the queue exclusively.
void unlock() noexcept(bm::is_lock_noexcept< lock_type >::value)
unlock the queue access
Spin-lock with two-phase acquire (read + cas) padding parameter optionally adds a buffer to avoid CPU...
bool try_lock() noexcept
Try to acquire the lock, return true if successfull.
void unlock() noexcept
Unlock the lock.
void lock() noexcept
Lock the lock.
Interface definition (base class) for a group of tasks (batch)
virtual size_type size() const =0
Return size of batch.
virtual bm::task_descr * get_task(size_type task_idx)=0
Get task by index in the batch.
Basic implementation for collection of tasks for parallel execution.
virtual size_type size() const BMNOEXCEPT
task_batch_base intreface implementation
virtual bm::task_descr * get_task(size_type task_idx)
Get task by index in the batch.
Utility class to submit task batch to the running thread pool and optionally wait for it getting done...
static void wait_for_batch_done(thread_pool_type &tpool, bm::task_batch_base &tasks, task_batch_base::size_type from_idx, task_batch_base::size_type to_idx)
Check if all batch jobs in the specified interval are done Spin wait if not.
static void run(thread_pool_type &tpool, bm::task_batch_base &tasks, bool wait_for_batch)
task_batch_base::size_type size_type
Thread pool with custom (thread safe) queue.
void stop() noexcept
Request an immediate stop of all threads in the pool.
queue_type & get_job_queue() noexcept
Get access to the job submission queue.
bm::queue_sync< QValue, lock_type > queue_type
void join()
Wait for threads to finish (or stop if stop was requested)
thread_pool(stop_mode sm=no_stop) noexcept
void worker_func()
Internal worker wrapper with busy-wait spin loop making pthread-like call for tasks.
int is_stopped() const noexcept
Return if thread pool is stopped by a request.
void start(unsigned tcount)
Start thread pool worker threads.
void wait_empty_queue()
Conditional spin-wait for the queue to empty (Important note: tasks may still be running,...
stop_mode
Stop modes for threads: 0 - keep running/waiting for jobs 1 - wait for empty task queue then stop thr...
@ no_stop
keep spinning on busy-wait
@ stop_when_done
stop if tsak queue is empty
void set_stop_mode(stop_mode sm) noexcept
Setup the criteria for threads shutdown Also notifies all threads on a new directive.
void join_multiple_threads(TCont &tcont)
Wait for multiple threads to exit.
"noexcept" traits detection for T::lock()
Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
BitMagic task with a captured function.
std::atomic_bool done
0 - pending
@ no_flag
no flag specified
bm::id64_t flags
task flags to designate barriers
task_function_t func
captured function callback