19 #ifndef SNARK_TBB_QUEUE_H_
20 #define SNARK_TBB_QUEUE_H_
22 #include <boost/thread/condition.hpp>
23 #include <tbb/concurrent_queue.h>
25 namespace snark{
namespace tbb{
31 counter() : value_( 0 ), shutdown_( false ) {}
34 unsigned int operator++()
36 boost::lock_guard< boost::mutex > lock( mutex_ );
38 condition_.notify_all();
43 unsigned int operator--()
45 boost::unique_lock<boost::mutex> lock( mutex_ );
46 while( value_ == 0 && !shutdown_ ) { condition_.timed_wait( lock, boost::posix_time::milliseconds( 100 ) ); }
47 if( value_ > 0 ) { --value_; }
52 unsigned int wait_until_non_zero()
54 boost::unique_lock<boost::mutex> lock( mutex_ );
55 while( value_ == 0 && !shutdown_ ) { condition_.timed_wait( lock, boost::posix_time::milliseconds( 100 ) ); }
60 void shutdown() { shutdown_ =
true; condition_.notify_all(); }
63 boost::condition_variable condition_;
74 template<
typename T >
79 queue(
unsigned int capacity ) { m_queue.set_capacity( capacity ); }
82 void push(
const T& t ) { m_queue.push( t ); ++counter_; }
83 void pop( T& t ) { m_queue.pop( t ); --counter_; }
84 unsigned int size()
const {
return m_queue.size(); }
85 bool empty()
const {
return m_queue.empty(); }
86 void wait() { counter_.wait_until_non_zero(); }
87 void shutdown() { counter_.shutdown(); }
90 ::tbb::concurrent_bounded_queue< T > m_queue;
97 #endif // SNARK_TBB_QUEUE_H_