snark
bursty_reader.h
1 // This file is part of snark, a generic and flexible library
2 // for robotics research.
3 //
4 // Copyright (C) 2011 The University of Sydney
5 //
6 // snark is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 3 of the License, or (at your option) any later version.
10 //
11 // snark is distributed in the hope that it will be useful, but WITHOUT ANY
12 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 // FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
14 // for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with snark. If not, see <http://www.gnu.org/licenses/>.
18 
19 #ifndef SNARK_TBB_BURSTY_READER_H_
20 #define SNARK_TBB_BURSTY_READER_H_
21 
22 #include <snark/tbb/queue.h>
23 #include <boost/thread.hpp>
24 #include <tbb/pipeline.h>
25 
26 namespace snark{ namespace tbb{
27 
30 template< typename T >
32 {
33  static bool valid( const T& t ) { return true; }
34 };
35 
38 template< typename T >
40 {
41 public:
42  bursty_reader( boost::function0< T > read, unsigned int size = 0 );
43  bursty_reader( boost::function0< T > read, unsigned int size, unsigned int capacity );
45 
46  bool wait();
47  void stop();
48  void join();
49  ::tbb::filter_t< void, T >& filter() { return m_read_filter; }
50 
51 private:
52  T read( ::tbb::flow_control& flow );
53  void push();
54  void push_thread();
55 
56  queue< T > m_queue;
57  unsigned int m_size;
58  bool m_running;
59  boost::scoped_ptr< boost::thread > m_thread;
60  boost::function0< T > m_read;
61  ::tbb::filter_t< void, T > m_read_filter;
62 };
63 
64 
68 template< typename T >
69 bursty_reader< T >::bursty_reader( boost::function0< T > read, unsigned int size ):
70  m_size( size ),
71  m_running( true ),
72  m_read( read ),
73  m_read_filter( ::tbb::filter::serial_in_order, boost::bind( &bursty_reader< T >::read, this, _1 ) )
74 {
75  m_thread.reset( new boost::thread( boost::bind( &bursty_reader< T >::push_thread, this ) ) );
76 }
77 
82 template< typename T >
83 bursty_reader< T >::bursty_reader( boost::function0< T > read, unsigned int size, unsigned int capacity ):
84  m_queue( capacity ),
85  m_size( size ),
86  m_running( true ),
87  m_read( read ),
88  m_read_filter( ::tbb::filter::serial_in_order, boost::bind( &bursty_reader< T >::read, this, _1 ) )
89 {
90  m_thread.reset( new boost::thread( boost::bind( &bursty_reader< T >::push_thread, this ) ) );
91 }
92 
94 template< typename T >
96 {
97  join();
98 }
99 
100 
103 template< typename T >
105 {
106  if( !m_running && m_queue.empty() ) { return false; }
107  m_queue.wait();
108  return !m_queue.empty();
109 }
110 
113 template< typename T >
115 {
116  m_running = false;
117  m_queue.shutdown();
118 }
119 
121 template< typename T >
123 {
124  stop();
125  if( !m_thread ) { return; }
126  m_thread->join();
127  m_thread.reset();
128 }
129 
132 template< typename T >
133 T bursty_reader< T >::read( ::tbb::flow_control& flow )
134 {
135  if( m_queue.empty() )
136  {
137  flow.stop();
138  return T();
139  }
140  if( m_size > 0 )
141  {
142  T t;
143  unsigned int n = 0;
144  while( m_queue.size() > m_size )
145  {
146  m_queue.pop( t );
147  n++;
148  }
149 // if( n > 0 ) TODO how to warn the user that data is discarded ?
150 // {
151 // std::cerr << "warning: discarded " << n << " frame(s)" << std::endl;
152 // }
153  }
154  T t;
155  m_queue.pop( t );
156  if( !bursty_reader_traits< T >::valid( t ) )
157  {
158  flow.stop();
159  return T();
160  }
161  return t;
162 }
163 
164 
166 template< typename T >
167 void bursty_reader< T >::push()
168 {
169  T t = m_read();
170  if( !bursty_reader_traits< T >::valid( t ) )
171  {
172  m_running = false;
173  m_queue.push( T() ); // HACK to signal m_queue.wait
174  }
175  else
176  {
177  m_queue.push( t );
178  }
179 }
180 
182 template< typename T >
183 void bursty_reader< T >::push_thread()
184 {
185  while( m_running )
186  {
187  push();
188  }
189 }
190 
191 
192 } }
193 
194 #endif // SNARK_TBB_BURSTY_READER_H_