24 #include <boost/bind/placeholders.hpp>
27 #ifndef ROBOTRACONTEUR_EMSCRIPTEN
28 #include <boost/asio.hpp>
33 namespace RobotRaconteur
35 class ROBOTRACONTEUR_CORE_API RobotRaconteurNode;
62 class ROBOTRACONTEUR_CORE_API
ThreadPool :
public RR_ENABLE_SHARED_FROM_THIS<ThreadPool>,
private boost::noncopyable
66 std::vector<RR_SHARED_PTR<boost::thread> > threads;
68 boost::mutex queue_mutex;
70 RR_BOOST_ASIO_IO_CONTEXT _io_context;
75 boost::mutex keepgoing_lock;
77 #if BOOST_ASIO_VERSION < 101200
78 RR_SHARED_PTR<RR_BOOST_ASIO_IO_CONTEXT::work> _work;
80 RR_SHARED_PTR<boost::asio::executor_work_guard<RR_BOOST_ASIO_IO_CONTEXT::executor_type> > _work;
83 RR_WEAK_PTR<RobotRaconteurNode> node;
97 ThreadPool(
const RR_SHARED_PTR<RobotRaconteurNode>& node);
100 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
123 virtual void Post(boost::function<
void()>
function);
132 virtual bool TryPost(boost::function<
void()>
function);
134 virtual void Shutdown();
144 virtual RR_BOOST_ASIO_IO_CONTEXT& get_io_context();
147 virtual void start_new_thread();
149 virtual void thread_function();
169 virtual RR_SHARED_PTR<ThreadPool>
NewThreadPool(
const RR_SHARED_PTR<RobotRaconteurNode>& node)
171 return RR_MAKE_SHARED<ThreadPool>(node);
193 RR_BOOST_ASIO_IO_CONTEXT& _external_io_context;
206 IOContextThreadPool(
const RR_SHARED_PTR<RobotRaconteurNode>& node, RR_BOOST_ASIO_IO_CONTEXT& external_io_context,
215 RR_OVIRTUAL
size_t GetThreadPoolCount() RR_OVERRIDE;
224 RR_OVIRTUAL
void SetThreadPoolCount(
size_t count) RR_OVERRIDE;
226 RR_OVIRTUAL
void Post(boost::function<
void()> function) RR_OVERRIDE;
227 RR_OVIRTUAL
bool TryPost(boost::function<
void()> function) RR_OVERRIDE;
229 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE;
231 RR_OVIRTUAL RR_BOOST_ASIO_IO_CONTEXT& get_io_context() RR_OVERRIDE;
236 ROBOTRACONTEUR_CORE_API
bool ThreadPool_IsNodeMultithreaded(RR_WEAK_PTR<RobotRaconteurNode> node);
241 template <
typename T>
242 struct IOContextThreadPool_AsyncResultAdapter_traits
244 typedef T result_type;
248 struct IOContextThreadPool_AsyncResultAdapter_traits<void>
250 typedef int32_t result_type;
253 template <
typename T>
254 struct IOContextThreadPool_AsyncResultAdapter_data
256 typedef typename IOContextThreadPool_AsyncResultAdapter_traits<T>::result_type result_type;
257 boost::initialized<result_type> _result;
258 RR_SHARED_PTR<RobotRaconteurException> _exp;
259 boost::initialized<bool> _complete;
262 ROBOTRACONTEUR_CORE_API RR_SHARED_PTR<RobotRaconteurNode> IOContextThreadPool_RobotRaconteurNode_sp();
264 ROBOTRACONTEUR_CORE_API
void IOContextThreadPool_RobotRaconteurNode_DownCastAndThrowException(
265 const RR_SHARED_PTR<RobotRaconteurNode>& node, RobotRaconteurException& exp);
287 template <
typename T>
291 RR_SHARED_PTR<RobotRaconteurNode> _node;
292 RR_BOOST_ASIO_IO_CONTEXT& _io_context;
293 RR_SHARED_PTR<detail::IOContextThreadPool_AsyncResultAdapter_data<T> > _data;
296 typedef typename detail::IOContextThreadPool_AsyncResultAdapter_traits<T>::result_type result_type;
307 RR_BOOST_ASIO_IO_CONTEXT& io_context)
308 : _node(node), _io_context(io_context),
309 _data(RR_MAKE_SHARED<detail::IOContextThreadPool_AsyncResultAdapter_data<T> >())
319 : _node(detail::IOContextThreadPool_RobotRaconteurNode_sp()), _io_context(io_context),
320 _data(RR_MAKE_SHARED<detail::IOContextThreadPool_AsyncResultAdapter_data<T> >())
323 void operator()(result_type res,
const RR_SHARED_PTR<RobotRaconteurException>& exp)
325 _data->_complete.data() =
true;
326 _data->_result.data() = res;
330 void operator()(
const RR_SHARED_PTR<RobotRaconteurException>& exp)
332 _data->_complete.data() =
true;
336 void operator()(result_type res)
338 _data->_complete.data() =
true;
339 _data->_result.data() = res;
342 void operator()() { _data->_complete.data() =
true; }
357 while (!_data->_complete.data())
359 _io_context.run_one();
363 RR_SHARED_PTR<RobotRaconteurException> exp;
364 RR_SWAP(res, _data->_result.data());
370 detail::IOContextThreadPool_RobotRaconteurNode_DownCastAndThrowException(_node, *exp1);
389 bool PollResult(result_type& ret, RR_SHARED_PTR<RobotRaconteurException>& exp)
391 if (!_data->_complete.data())
396 RR_SWAP(ret, _data->_result.data());
397 RR_SWAP(exp, _data->_exp);
402 #define ROBOTRACONTEUR_ASSERT_MULTITHREADED(node) \
403 BOOST_ASSERT_MSG(detail::ThreadPool_IsNodeMultithreaded(node), "multithreading required for requested operation")
405 #ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
boost::shared_ptr< ThreadPoolFactory > ThreadPoolFactoryPtr
Convenience alias for ThreadPoolFactory shared_ptr.
Definition: ThreadPool.h:409
boost::shared_ptr< IOContextThreadPool > IOContextThreadPoolPtr
Convenience alias for IOContextThreadPool shared_ptr.
Definition: ThreadPool.h:411
boost::shared_ptr< ThreadPool > ThreadPoolPtr
Convenience alias for ThreadPool shared_ptr.
Definition: ThreadPool.h:407
Adapter for asynchronous operations for use with IOContextThreadPool.
Definition: ThreadPool.h:289
bool PollResult(result_type &ret, boost::shared_ptr< RobotRaconteurException > &exp)
Polls for a result, nonblocking.
Definition: ThreadPool.h:389
IOContextThreadPool_AsyncResultAdapter(boost::shared_ptr< RobotRaconteurNode > &node, boost::asio::io_context &io_context)
Construct an IOContextThreadPool_AsyncResultAdapter.
Definition: ThreadPool.h:306
result_type GetResult()
Get the result of the asynchronous operation.
Definition: ThreadPool.h:355
IOContextThreadPool_AsyncResultAdapter(boost::asio::io_context &io_context)
Construct an IOContextThreadPool_AsyncResultAdapter for the singleton RobotRaconteurNode.
Definition: ThreadPool.h:318
Thread pool for use with an external boost::asio::io_context and thread pool.
Definition: ThreadPool.h:190
IOContextThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node, boost::asio::io_context &external_io_context, bool multithreaded)
Construct an IOContextThreadPool.
Base class for Robot Raconteur exceptions.
Definition: Error.h:50
ThreadPool factory for use with RobotRaconteurNode.
Definition: ThreadPool.h:161
virtual boost::shared_ptr< ThreadPool > NewThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node)
Construct and return a new threadpool.
Definition: ThreadPool.h:169
Thread pool for Robot Raconteur nodes.
Definition: ThreadPool.h:63
virtual size_t GetThreadPoolCount()
Get the number of threads in the thread pool.
virtual bool TryPost(boost::function< void()> function)
Try posting a function to be executed by the thread pool in a worker thread and return immediately.
ThreadPool(const boost::shared_ptr< RobotRaconteurNode > &node)
Construct a new ThreadPool.
virtual void SetThreadPoolCount(size_t count)
Set the desired number of threads in the thread pool.
virtual void Post(boost::function< void()> function)
Post a function to be executed by the thread pool in a worker thread and return immediately.