Robot Raconteur Core C++ Library
AsyncUtils.h
Go to the documentation of this file.
1 
24 #include <boost/bind/placeholders.hpp>
25 #include <boost/scope_exit.hpp>
26 #include <boost/system/error_code.hpp>
27 
28 #include "RobotRaconteur/Error.h"
31 #include "RobotRaconteur/Timer.h"
33 
34 #pragma once
35 
36 namespace RobotRaconteur
37 {
38 class ROBOTRACONTEUR_CORE_API RobotRaconteurNode;
39 
40 namespace detail
41 {
42 template <typename T>
43 class sync_async_handler : private boost::noncopyable
44 {
45  public:
46  RR_SHARED_PTR<AutoResetEvent> ev;
47  RR_SHARED_PTR<RobotRaconteurException> err;
48  RR_SHARED_PTR<T> data;
49  boost::mutex data_lock;
50 
51  sync_async_handler() { ev = RR_MAKE_SHARED<AutoResetEvent>(); }
52 
53  sync_async_handler(const RR_SHARED_PTR<RobotRaconteurException>& err)
54  {
55  ev = RR_MAKE_SHARED<AutoResetEvent>();
56  this->err = err;
57  }
58 
59  void operator()() { ev->Set(); }
60 
61  void operator()(const RR_SHARED_PTR<RobotRaconteurException>& err)
62  {
63  boost::mutex::scoped_lock lock(data_lock);
64  this->err = err;
65  ev->Set();
66  }
67 
68  void operator()(const RR_SHARED_PTR<T>& data, const RR_SHARED_PTR<RobotRaconteurException>& err)
69  {
70  boost::mutex::scoped_lock lock(data_lock);
71  this->err = err;
72  this->data = data;
73  ev->Set();
74  }
75 
76  RR_SHARED_PTR<T> end()
77  {
78  ev->WaitOne();
79 
80  boost::mutex::scoped_lock lock(data_lock);
81  if (err)
82  {
83  RobotRaconteurExceptionUtil::DownCastAndThrowException(err);
84  }
85 
86  if (!data)
87  throw InternalErrorException("Internal async error");
88 
89  return data;
90  }
91 
92  void end_void()
93  {
94  ev->WaitOne();
95 
96  boost::mutex::scoped_lock lock(data_lock);
97  if (err)
98  {
99  RobotRaconteurExceptionUtil::DownCastAndThrowException(err);
100  }
101  }
102 
103  bool try_end(RR_SHARED_PTR<T>& res, RR_SHARED_PTR<RobotRaconteurException>& err_out)
104  {
105  ev->WaitOne();
106 
107  boost::mutex::scoped_lock lock(data_lock);
108  if (err)
109  {
110  err_out = err;
111  return false;
112  }
113 
114  if (!data)
115  {
116  err_out = RR_MAKE_SHARED<InternalErrorException>("Internal async error");
117  return false;
118  }
119 
120  res = data;
121  return true;
122  }
123 
124  bool try_end_void(RR_SHARED_PTR<RobotRaconteurException>& err_out)
125  {
126  ev->WaitOne();
127 
128  boost::mutex::scoped_lock lock(data_lock);
129  if (err)
130  {
131  err_out = err;
132  return false;
133  }
134 
135  return true;
136  }
137 };
138 
139 RR_SHARED_PTR<Timer> async_timeout_wrapper_CreateTimer(const RR_SHARED_PTR<RobotRaconteurNode>& node,
140  const boost::posix_time::time_duration& period,
141  RR_MOVE_ARG(boost::function<void(const TimerEvent&)>) handler,
142  bool oneshot);
143 
144 template <typename T>
145 void async_timeout_wrapper_closer(const RR_SHARED_PTR<T>& d)
146 {
147  try
148  {
149  d->Close();
150  }
151  catch (std::exception&)
152  {}
153 }
154 
155 template <typename T, typename T2>
156 void async_timeout_wrapper_closer(const RR_SHARED_PTR<T>& d)
157 {
158  try
159  {
160  RR_SHARED_PTR<T2> t2 = RR_DYNAMIC_POINTER_CAST<T2>(d);
161  if (!t2)
162  return;
163  t2->Close();
164  }
165  catch (std::exception&)
166  {}
167 }
168 
169 template <typename T>
170 class async_timeout_wrapper : public RR_ENABLE_SHARED_FROM_THIS<async_timeout_wrapper<T> >, private boost::noncopyable
171 {
172  private:
173  boost::function<void(const RR_SHARED_PTR<T>&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler_;
174  RR_SHARED_PTR<Timer> timeout_timer_;
175  boost::mutex handled_lock;
176  bool handled;
177  RR_SHARED_PTR<RobotRaconteurException> timeout_exception_;
178  boost::function<void(const RR_SHARED_PTR<T>&)> deleter_;
179  RR_WEAK_PTR<RobotRaconteurNode> node;
180 
181  public:
182  async_timeout_wrapper(
183  const RR_SHARED_PTR<RobotRaconteurNode>& node,
184  boost::function<void(const RR_SHARED_PTR<T>&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
185  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<T>&)>) deleter = 0)
186  : handler_(handler), handled(false), node(node)
187  {
188  deleter_.swap(deleter);
189  }
190 
191  void start_timer(int32_t timeout, const RR_SHARED_PTR<RobotRaconteurException>& timeout_exception =
192  RR_MAKE_SHARED<ConnectionException>("Timeout during operation"));
193 
194  void operator()(const RR_SHARED_PTR<T>& data, const RR_SHARED_PTR<RobotRaconteurException>& err)
195  {
196  {
197  boost::mutex::scoped_lock lock(handled_lock);
198  if (handled)
199  {
200  if (data && deleter_)
201  deleter_(data);
202  return;
203  }
204  handled = true;
205 
206  if (timeout_timer_)
207  timeout_timer_->TryStop();
208 
209  timeout_timer_.reset();
210  }
211 
212  handler_(data, err);
213  }
214 
215  void handle_error(const RR_SHARED_PTR<RobotRaconteurException>& err)
216  {
217  {
218  boost::mutex::scoped_lock lock(handled_lock);
219  if (handled)
220  return;
221  handled = true;
222 
223  {
224 
225  if (timeout_timer_)
226  timeout_timer_->TryStop();
227 
228  timeout_timer_.reset();
229  }
230  }
231 
232  handler_(RR_SHARED_PTR<T>(), err);
233  }
234 
235  void handle_error(const boost::system::error_code& err)
236  {
237  if (err.value() == boost::system::errc::timed_out)
238  handle_error(timeout_exception_);
239  handle_error(RR_MAKE_SHARED<ConnectionException>(err.message()));
240  }
241 
242  private:
243  void timeout_handler(const TimerEvent& /*e*/)
244  {
245  {
246  boost::mutex::scoped_lock lock(handled_lock);
247  if (handled)
248  return;
249  handled = true;
250  // timeout_timer_.reset();
251 
252  timeout_timer_.reset();
253  }
254 
255  handler_(RR_SHARED_PTR<T>(), timeout_exception_);
256  }
257 };
258 
259 template <typename T>
260 void async_timeout_wrapper<T>::start_timer(int32_t timeout,
261  const RR_SHARED_PTR<RobotRaconteurException>& timeout_exception)
262 {
263  RR_SHARED_PTR<RobotRaconteurNode> n = node.lock();
264  if (!n)
265  throw InvalidOperationException("Node has been released");
266 
267  boost::mutex::scoped_lock lock(handled_lock);
268 
269  if (handled)
270  return;
271 
272  if (timeout != RR_TIMEOUT_INFINITE)
273  {
274  timeout_timer_ =
275  async_timeout_wrapper_CreateTimer(n, boost::posix_time::milliseconds(timeout),
276  boost::bind(&async_timeout_wrapper<T>::timeout_handler,
277  this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)),
278  true);
279  timeout_timer_->Start();
280  timeout_exception_ = RR_MOVE(timeout_exception);
281  }
282 }
283 
284 /*template<typename Handler>
285 class handler_move_wrapper
286 {
287 public:
288  handler_move_wrapper(Handler& handler)
289  : handler_(RR_MOVE(handler))
290  {}
291 
292 
293  Handler&& operator()
294  {
295  return RR_MOVE(handler_);
296  }
297 
298 protected:
299  Handler handler_;
300 };
301 
302 template<typename Handler>
303 handler_move_wrapper<Handler> make_handler_move_wrapper(Handler& handler)
304 {
305  return handler_move_wrapper<Handler>(handler);
306 }*/
307 
308 class ROBOTRACONTEUR_CORE_API async_signal_semaphore : private boost::noncopyable
309 {
310  protected:
311  boost::mutex this_lock;
312  boost::condition_variable next_wait;
313  boost::initialized<bool> running;
314  boost::initialized<bool> next;
315  boost::initialized<uint64_t> next_id;
316 
317  public:
318  template <typename F>
319  bool try_fire_next(BOOST_ASIO_MOVE_ARG(F) h)
320  {
321  boost::mutex::scoped_lock lock(this_lock);
322 
323  if (running || next)
324  {
325  uint64_t my_id = ++next_id.data();
326 
327  if (next.data())
328  {
329  next_wait.notify_all();
330  }
331  else
332  {
333  next.data() = true;
334  }
335 
336  while (running)
337  {
338  next_wait.wait(lock);
339  if (my_id != next_id)
340  return false;
341  }
342 
343  next.data() = false;
344  }
345 
346  running.data() = true;
347 
348  BOOST_SCOPE_EXIT_TPL(this_)
349  {
350  boost::mutex::scoped_lock lock2(this_->this_lock);
351  this_->running.data() = false;
352  this_->next_wait.notify_all();
353  }
354  BOOST_SCOPE_EXIT_END;
355 
356  lock.unlock();
357 
358  h();
359 
360  return true;
361  }
362 };
363 
364 class ROBOTRACONTEUR_CORE_API async_signal_pool_semaphore
365  : public RR_ENABLE_SHARED_FROM_THIS<async_signal_pool_semaphore>,
366  private boost::noncopyable
367 {
368  protected:
369  boost::mutex this_lock;
370  boost::initialized<bool> running;
371  boost::function<void()> next;
372  RR_WEAK_PTR<RobotRaconteurNode> node;
373 
374  public:
375  async_signal_pool_semaphore(const RR_SHARED_PTR<RobotRaconteurNode>& node) : node(node) {}
376 
377  template <typename F>
378  void try_fire_next(BOOST_ASIO_MOVE_ARG(F) h)
379  {
380  boost::mutex::scoped_lock lock(this_lock);
381 
382  if (!running)
383  {
384  RR_SHARED_PTR<RobotRaconteurNode> node1 = this->node.lock();
385  if (!node1)
386  return;
387  do_post(node1, h);
388  }
389  else
390  {
391  next = h;
392  }
393  }
394 
395  protected:
396  void do_fire_next(const boost::function<void()>& h)
397  {
398  try
399  {
400  h();
401  }
402  catch (std::exception& exp)
403  {
404  handle_exception(&exp);
405  }
406 
407  BOOST_SCOPE_EXIT(this_)
408  {
409  boost::mutex::scoped_lock lock2(this_->this_lock);
410 
411  boost::function<void()> h2;
412  h2.swap(this_->next);
413  this_->next.clear();
414  this_->running.data() = false;
415  if (!h2)
416  return;
417  RR_SHARED_PTR<RobotRaconteurNode> node = this_->node.lock();
418  if (!node)
419  return;
420  try
421  {
422  this_->do_post(node,
423  boost::bind(&async_signal_pool_semaphore::do_fire_next, this_->shared_from_this(), h2));
424  }
425  catch (std::exception&)
426  {}
427  this_->running.data() = true;
428  }
429  BOOST_SCOPE_EXIT_END;
430  }
431 
432  void do_post(const RR_SHARED_PTR<RobotRaconteurNode>& node1, RR_MOVE_ARG(boost::function<void()>) h);
433 
434  void handle_exception(std::exception* exp);
435 };
436 
437 ROBOTRACONTEUR_CORE_API void InvokeHandler_HandleException(RR_WEAK_PTR<RobotRaconteurNode> node, std::exception& exp);
438 
439 ROBOTRACONTEUR_CORE_API void InvokeHandler_DoPost(RR_WEAK_PTR<RobotRaconteurNode> node,
440  const boost::function<void()>& h, bool shutdown_op = false,
441  bool throw_on_released = true);
442 
443 ROBOTRACONTEUR_CORE_API void InvokeHandler(RR_WEAK_PTR<RobotRaconteurNode> node,
444  const boost::function<void()>& handler);
445 
446 template <typename T>
447 void InvokeHandler(RR_WEAK_PTR<RobotRaconteurNode> node, const typename boost::function<void(const T&)>& handler,
448  const T& value)
449 {
450  try
451  {
452  handler(value);
453  }
454  catch (std::exception& exp)
455  {
456  InvokeHandler_HandleException(RR_MOVE(node), exp);
457  }
458 }
459 
460 ROBOTRACONTEUR_CORE_API void InvokeHandler(
461  RR_WEAK_PTR<RobotRaconteurNode> node,
462  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
463 
464 ROBOTRACONTEUR_CORE_API void InvokeHandlerWithException(
465  RR_WEAK_PTR<RobotRaconteurNode> node,
466  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
467  const RR_SHARED_PTR<RobotRaconteurException>& exp);
468 
469 ROBOTRACONTEUR_CORE_API void InvokeHandlerWithException(
470  RR_WEAK_PTR<RobotRaconteurNode> node,
471  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler, std::exception& exp,
472  MessageErrorType default_err = MessageErrorType_UnknownError);
473 
474 template <typename T>
475 void InvokeHandler(
476  RR_WEAK_PTR<RobotRaconteurNode> node,
477  const typename boost::function<void(const T&, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
478  const T& value)
479 {
480  try
481  {
482  handler(value, RR_SHARED_PTR<RobotRaconteurException>());
483  }
484  catch (std::exception& exp)
485  {
486  InvokeHandler_HandleException(RR_MOVE(node), exp);
487  }
488 }
489 
490 template <typename T>
491 void InvokeHandlerWithException(
492  RR_WEAK_PTR<RobotRaconteurNode> node,
493  const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
494  const RR_SHARED_PTR<RobotRaconteurException>& exp)
495 {
496  typename boost::initialized<typename boost::remove_reference<T>::type> default_value;
497  try
498  {
499  handler(default_value, exp);
500  }
501  catch (std::exception& exp)
502  {
503  InvokeHandler_HandleException(RR_MOVE(node), exp);
504  }
505 }
506 
507 template <typename T>
508 void InvokeHandlerWithException(
509  RR_WEAK_PTR<RobotRaconteurNode> node,
510  const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
511  std::exception& exp, MessageErrorType default_err = MessageErrorType_UnknownError)
512 {
513  typename boost::initialized<typename boost::remove_const<typename boost::remove_reference<T>::type>::type>
514  default_value;
515  try
516  {
517  RR_SHARED_PTR<RobotRaconteurException> err =
518  RobotRaconteurExceptionUtil::ExceptionToSharedPtr(exp, default_err);
519  handler(default_value, err);
520  }
521  catch (std::exception& exp)
522  {
523  InvokeHandler_HandleException(RR_MOVE(node), exp);
524  }
525 }
526 
527 ROBOTRACONTEUR_CORE_API void PostHandler(RR_WEAK_PTR<RobotRaconteurNode> node, const boost::function<void()>& handler,
528  bool shutdown_op = false, bool throw_on_released = true);
529 
530 template <typename T>
531 void PostHandler(RR_WEAK_PTR<RobotRaconteurNode> node, const typename boost::function<void(const T&)>& handler,
532  const T& value, bool shutdown_op = false, bool throw_on_released = true)
533 {
534  boost::function<void()> h = boost::bind(handler, value);
535  InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
536 }
537 
538 ROBOTRACONTEUR_CORE_API void PostHandler(
539  RR_WEAK_PTR<RobotRaconteurNode> node,
540  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler, bool shutdown_op = false,
541  bool throw_on_released = true);
542 
543 ROBOTRACONTEUR_CORE_API void PostHandlerWithException(
544  RR_WEAK_PTR<RobotRaconteurNode> node,
545  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
546  const RR_SHARED_PTR<RobotRaconteurException>& exp, bool shutdown_op = false, bool throw_on_released = true);
547 
548 ROBOTRACONTEUR_CORE_API void PostHandlerWithException(
549  RR_WEAK_PTR<RobotRaconteurNode> node,
550  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler, std::exception& exp,
551  MessageErrorType default_err = MessageErrorType_UnknownError, bool shutdown_op = false,
552  bool throw_on_released = true);
553 
554 template <typename T>
555 void PostHandler(RR_WEAK_PTR<RobotRaconteurNode> node,
556  const typename boost::function<void(const T&, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
557  const T& value, bool shutdown_op = false, bool throw_on_released = true)
558 {
559  boost::function<void()> h = boost::bind(handler, value, RR_SHARED_PTR<RobotRaconteurException>());
560  InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
561 }
562 
563 template <typename T>
564 void PostHandlerWithException(
565  RR_WEAK_PTR<RobotRaconteurNode> node,
566  const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
567  const RR_SHARED_PTR<RobotRaconteurException>& exp, bool shutdown_op = false, bool throw_on_released = true)
568 {
569  typename boost::initialized<typename boost::remove_reference<T>::type> default_value;
570  boost::function<void()> h = boost::bind(handler, default_value, exp);
571  InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
572 }
573 
574 template <typename T>
575 void PostHandlerWithException(
576  RR_WEAK_PTR<RobotRaconteurNode> node,
577  const typename boost::function<void(T, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler,
578  std::exception& exp, MessageErrorType default_err = MessageErrorType_UnknownError, bool shutdown_op = false,
579  bool throw_on_released = true)
580 {
581  typename boost::initialized<typename boost::remove_reference<T>::type> default_value;
582  RR_SHARED_PTR<RobotRaconteurException> err = RobotRaconteurExceptionUtil::ExceptionToSharedPtr(exp, default_err);
583  boost::function<void()> h = boost::bind(handler, default_value, err);
584  InvokeHandler_DoPost(RR_MOVE(node), h, shutdown_op, throw_on_released);
585 }
586 
587 } // namespace detail
588 } // namespace RobotRaconteur
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition: RobotRaconteurConstants.h:566