Robot Raconteur Core C++ Library
ThreadPool.h
Go to the documentation of this file.
1 
24 #include <boost/bind/placeholders.hpp>
26 
27 #ifndef ROBOTRACONTEUR_EMSCRIPTEN
28 #include <boost/asio.hpp>
29 #endif
30 
31 #pragma once
32 
33 namespace RobotRaconteur
34 {
35 class ROBOTRACONTEUR_CORE_API RobotRaconteurNode;
36 
62 class ROBOTRACONTEUR_CORE_API ThreadPool : public RR_ENABLE_SHARED_FROM_THIS<ThreadPool>, private boost::noncopyable
63 {
64 
65  protected:
66  std::vector<RR_SHARED_PTR<boost::thread> > threads;
67 
68  boost::mutex queue_mutex;
69 
70  RR_BOOST_ASIO_IO_CONTEXT _io_context;
71 
72  size_t thread_count;
73 
74  bool keepgoing;
75  boost::mutex keepgoing_lock;
76 
77 #if BOOST_ASIO_VERSION < 101200
78  RR_SHARED_PTR<RR_BOOST_ASIO_IO_CONTEXT::work> _work;
79 #else
80  RR_SHARED_PTR<boost::asio::executor_work_guard<RR_BOOST_ASIO_IO_CONTEXT::executor_type> > _work;
81 #endif
82 
83  RR_WEAK_PTR<RobotRaconteurNode> node;
84 
85  public:
97  ThreadPool(const RR_SHARED_PTR<RobotRaconteurNode>& node);
98  virtual ~ThreadPool();
99 
100  RR_SHARED_PTR<RobotRaconteurNode> GetNode();
101 
107  virtual size_t GetThreadPoolCount();
108 
116  virtual void SetThreadPoolCount(size_t count);
117 
123  virtual void Post(boost::function<void()> function);
124 
132  virtual bool TryPost(boost::function<void()> function);
133 
134  virtual void Shutdown();
135 
144  virtual RR_BOOST_ASIO_IO_CONTEXT& get_io_context();
145 
146  protected:
147  virtual void start_new_thread();
148 
149  virtual void thread_function();
150 };
151 
160 class ROBOTRACONTEUR_CORE_API ThreadPoolFactory : private boost::noncopyable
161 {
162  public:
169  virtual RR_SHARED_PTR<ThreadPool> NewThreadPool(const RR_SHARED_PTR<RobotRaconteurNode>& node)
170  {
171  return RR_MAKE_SHARED<ThreadPool>(node);
172  }
173  virtual ~ThreadPoolFactory();
174 };
175 
189 class ROBOTRACONTEUR_CORE_API IOContextThreadPool : public ThreadPool
190 {
191 
192  protected:
193  RR_BOOST_ASIO_IO_CONTEXT& _external_io_context;
194  bool _multithreaded;
195 
196  public:
206  IOContextThreadPool(const RR_SHARED_PTR<RobotRaconteurNode>& node, RR_BOOST_ASIO_IO_CONTEXT& external_io_context,
207  bool multithreaded);
208  RR_OVIRTUAL ~IOContextThreadPool() RR_OVERRIDE;
209 
215  RR_OVIRTUAL size_t GetThreadPoolCount() RR_OVERRIDE;
216 
224  RR_OVIRTUAL void SetThreadPoolCount(size_t count) RR_OVERRIDE;
225 
226  RR_OVIRTUAL void Post(boost::function<void()> function) RR_OVERRIDE;
227  RR_OVIRTUAL bool TryPost(boost::function<void()> function) RR_OVERRIDE;
228 
229  RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
230 
231  RR_OVIRTUAL RR_BOOST_ASIO_IO_CONTEXT& get_io_context() RR_OVERRIDE;
232 };
233 
234 namespace detail
235 {
236 ROBOTRACONTEUR_CORE_API bool ThreadPool_IsNodeMultithreaded(RR_WEAK_PTR<RobotRaconteurNode> node);
237 }
238 
239 namespace detail
240 {
241 template <typename T>
242 struct IOContextThreadPool_AsyncResultAdapter_traits
243 {
244  typedef T result_type;
245 };
246 
247 template <>
248 struct IOContextThreadPool_AsyncResultAdapter_traits<void>
249 {
250  typedef int32_t result_type;
251 };
252 
253 template <typename T>
254 struct IOContextThreadPool_AsyncResultAdapter_data
255 {
256  typedef typename IOContextThreadPool_AsyncResultAdapter_traits<T>::result_type result_type;
257  boost::initialized<result_type> _result;
258  RR_SHARED_PTR<RobotRaconteurException> _exp;
259  boost::initialized<bool> _complete;
260 };
261 
262 ROBOTRACONTEUR_CORE_API RR_SHARED_PTR<RobotRaconteurNode> IOContextThreadPool_RobotRaconteurNode_sp();
263 
264 ROBOTRACONTEUR_CORE_API void IOContextThreadPool_RobotRaconteurNode_DownCastAndThrowException(
265  const RR_SHARED_PTR<RobotRaconteurNode>& node, RobotRaconteurException& exp);
266 } // namespace detail
267 
287 template <typename T>
289 {
290  private:
291  RR_SHARED_PTR<RobotRaconteurNode> _node;
292  RR_BOOST_ASIO_IO_CONTEXT& _io_context;
293  RR_SHARED_PTR<detail::IOContextThreadPool_AsyncResultAdapter_data<T> > _data;
294 
295  public:
296  typedef typename detail::IOContextThreadPool_AsyncResultAdapter_traits<T>::result_type result_type;
297 
306  IOContextThreadPool_AsyncResultAdapter(RR_SHARED_PTR<RobotRaconteurNode>& node,
307  RR_BOOST_ASIO_IO_CONTEXT& io_context)
308  : _node(node), _io_context(io_context),
309  _data(RR_MAKE_SHARED<detail::IOContextThreadPool_AsyncResultAdapter_data<T> >())
310  {}
318  IOContextThreadPool_AsyncResultAdapter(RR_BOOST_ASIO_IO_CONTEXT& io_context)
319  : _node(detail::IOContextThreadPool_RobotRaconteurNode_sp()), _io_context(io_context),
320  _data(RR_MAKE_SHARED<detail::IOContextThreadPool_AsyncResultAdapter_data<T> >())
321  {}
322 
323  void operator()(result_type res, const RR_SHARED_PTR<RobotRaconteurException>& exp)
324  {
325  _data->_complete.data() = true;
326  _data->_result.data() = res;
327  _data->_exp = exp;
328  }
329 
330  void operator()(const RR_SHARED_PTR<RobotRaconteurException>& exp)
331  {
332  _data->_complete.data() = true;
333  _data->_exp = exp;
334  }
335 
336  void operator()(result_type res)
337  {
338  _data->_complete.data() = true;
339  _data->_result.data() = res;
340  }
341 
342  void operator()() { _data->_complete.data() = true; }
343 
355  result_type GetResult()
356  {
357  while (!_data->_complete.data())
358  {
359  _io_context.run_one();
360  }
361 
362  result_type res;
363  RR_SHARED_PTR<RobotRaconteurException> exp;
364  RR_SWAP(res, _data->_result.data());
365  exp = _data->_exp;
366  _data->_exp.reset();
367  if (exp)
368  {
369  RobotRaconteurException* exp1 = exp.get();
370  detail::IOContextThreadPool_RobotRaconteurNode_DownCastAndThrowException(_node, *exp1);
371  }
372  return res;
373  }
374 
389  bool PollResult(result_type& ret, RR_SHARED_PTR<RobotRaconteurException>& exp)
390  {
391  if (!_data->_complete.data())
392  {
393  return false;
394  }
395 
396  RR_SWAP(ret, _data->_result.data());
397  RR_SWAP(exp, _data->_exp);
398  return true;
399  }
400 };
401 
402 #define ROBOTRACONTEUR_ASSERT_MULTITHREADED(node) \
403  BOOST_ASSERT_MSG(detail::ThreadPool_IsNodeMultithreaded(node), "multithreading required for requested operation")
404 
405 #ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
407 using ThreadPoolPtr = RR_SHARED_PTR<ThreadPool>;
409 using ThreadPoolFactoryPtr = RR_SHARED_PTR<ThreadPoolFactory>;
411 using IOContextThreadPoolPtr = RR_SHARED_PTR<IOContextThreadPool>;
412 #endif
413 } // namespace RobotRaconteur
boost::shared_ptr< ThreadPoolFactory > ThreadPoolFactoryPtr
Convenience alias for ThreadPoolFactory shared_ptr.
Definition: ThreadPool.h:409
boost::shared_ptr< IOContextThreadPool > IOContextThreadPoolPtr
Convenience alias for IOContextThreadPool shared_ptr.
Definition: ThreadPool.h:411
boost::shared_ptr< ThreadPool > ThreadPoolPtr
Convenience alias for ThreadPool shared_ptr.
Definition: ThreadPool.h:407
Adapter for asynchronous operations for use with IOContextThreadPool.
Definition: ThreadPool.h:289
bool PollResult(result_type &ret, boost::shared_ptr< RobotRaconteurException > &exp)
Polls for a result, nonblocking.
Definition: ThreadPool.h:389
IOContextThreadPool_AsyncResultAdapter(boost::shared_ptr< RobotRaconteurNode > &node, boost::asio::io_context &io_context)
Construct an IOContextThreadPool_AsyncResultAdapter.
Definition: ThreadPool.h:306
result_type GetResult()
Get the result of the asynchronous operation.
Definition: ThreadPool.h:355
IOContextThreadPool_AsyncResultAdapter(boost::asio::io_context &io_context)
Construct an IOContextThreadPool_AsyncResultAdapter for the singleton RobotRaconteurNode.
Definition: ThreadPool.h:318
Thread pool for use with an external boost::asio::io_context and thread pool.
Definition: ThreadPool.h:190
IOContextThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node, boost::asio::io_context &external_io_context, bool multithreaded)
Construct an IOContextThreadPool.
Base class for Robot Raconteur exceptions.
Definition: Error.h:50
ThreadPool factory for use with RobotRaconteurNode.
Definition: ThreadPool.h:161
virtual boost::shared_ptr< ThreadPool > NewThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node)
Construct and return a new threadpool.
Definition: ThreadPool.h:169
Thread pool for Robot Raconteur nodes.
Definition: ThreadPool.h:63
virtual size_t GetThreadPoolCount()
Get the number of threads in the thread pool.
virtual bool TryPost(boost::function< void()> function)
Try posting a function to be executed by the thread pool in a worker thread and return immediately.
ThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node)
Construct a new ThreadPool.
virtual void SetThreadPoolCount(size_t count)
Set the desired number of threads in the thread pool.
virtual void Post(boost::function< void()> function)
Post a function to be executed by the thread pool in a worker thread and return immediately.