snark
queue.h
1 // This file is part of snark, a generic and flexible library
2 // for robotics research.
3 //
4 // Copyright (C) 2011 The University of Sydney
5 //
6 // snark is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 3 of the License, or (at your option) any later version.
10 //
11 // snark is distributed in the hope that it will be useful, but WITHOUT ANY
12 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 // FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
14 // for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with snark. If not, see <http://www.gnu.org/licenses/>.
18 
19 #ifndef SNARK_TBB_QUEUE_H_
20 #define SNARK_TBB_QUEUE_H_
21 
22 #include <boost/thread/condition.hpp>
23 #include <tbb/concurrent_queue.h>
24 
25 namespace snark{ namespace tbb{
26 
27 class counter
28 {
29  public:
31  counter() : value_( 0 ), shutdown_( false ) {}
32 
34  unsigned int operator++()
35  {
36  boost::lock_guard< boost::mutex > lock( mutex_ );
37  ++value_;
38  condition_.notify_all();
39  return value_;
40  }
41 
43  unsigned int operator--()
44  {
45  boost::unique_lock<boost::mutex> lock( mutex_ );
46  while( value_ == 0 && !shutdown_ ) { condition_.timed_wait( lock, boost::posix_time::milliseconds( 100 ) ); }
47  if( value_ > 0 ) { --value_; }
48  return value_;
49  }
50 
52  unsigned int wait_until_non_zero()
53  {
54  boost::unique_lock<boost::mutex> lock( mutex_ );
55  while( value_ == 0 && !shutdown_ ) { condition_.timed_wait( lock, boost::posix_time::milliseconds( 100 ) ); }
56  return value_;
57  }
58 
60  void shutdown() { shutdown_ = true; condition_.notify_all(); }
61 
62  private:
63  boost::condition_variable condition_;
64  boost::mutex mutex_;
65  unsigned int value_;
66  bool shutdown_;
67 
68 };
69 
74 template< typename T >
75 class queue
76 {
77 public:
78  queue() {}
79  queue( unsigned int capacity ) { m_queue.set_capacity( capacity ); }
80  ~queue() {}
81 
82  void push( const T& t ) { m_queue.push( t ); ++counter_; }
83  void pop( T& t ) { m_queue.pop( t ); --counter_; }
84  unsigned int size() const { return m_queue.size(); }
85  bool empty() const { return m_queue.empty(); }
86  void wait() { counter_.wait_until_non_zero(); }
87  void shutdown() { counter_.shutdown(); }
88 
89 private:
90  ::tbb::concurrent_bounded_queue< T > m_queue;
91  counter counter_;
92 
93 };
94 
95 } }
96 
97 #endif // SNARK_TBB_QUEUE_H_