Robot Raconteur Core C++ Library
WireMember.h
Go to the documentation of this file.
1 
24 #pragma once
25 
27 #include "RobotRaconteur/Message.h"
30 #include <boost/call_traits.hpp>
31 
32 #pragma warning(push)
33 #pragma warning(disable : 4250)
34 #pragma warning(disable : 4996)
35 #include <boost/signals2.hpp>
36 
37 namespace RobotRaconteur
38 {
39 class ROBOTRACONTEUR_CORE_API WireBase;
40 class ROBOTRACONTEUR_CORE_API WireConnectionBase;
41 class ROBOTRACONTEUR_CORE_API WireConnectionBaseListener;
42 namespace detail
43 {
44 class WireSubscription_connection;
45 ROBOTRACONTEUR_CORE_API bool WireConnectionBase_IsValueExpired(RR_WEAK_PTR<RobotRaconteurNode> node,
46  const boost::posix_time::ptime& recv_time,
47  int32_t lifespan);
48 } // namespace detail
49 
56 class ROBOTRACONTEUR_CORE_API WireConnectionBase : public RR_ENABLE_SHARED_FROM_THIS<WireConnectionBase>,
57  private boost::noncopyable
58 {
59 
60  friend class WireBase;
61  friend class WireClientBase;
62  friend class WireServerBase;
63  friend class WireBroadcasterBase;
64  friend class WireSubscriptionBase;
65  friend class detail::WireSubscription_connection;
66 
67  public:
76  virtual uint32_t GetEndpoint();
77 
86 
95 
103  virtual void Close();
104 
113  virtual void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
114  int32_t timeout);
115 
116  WireConnectionBase(const RR_SHARED_PTR<WireBase>& parent, uint32_t endpoint = 0,
117  MemberDefinition_Direction direction = MemberDefinition_Direction_both);
118 
119  virtual ~WireConnectionBase() {}
120 
121  virtual void WirePacketReceived(TimeSpec timespec, const RR_INTRUSIVE_PTR<RRValue>& packet);
122 
132  virtual bool GetInValueValid();
133 
143  virtual bool GetOutValueValid();
144 
156  bool WaitInValueValid(int32_t timeout = RR_TIMEOUT_INFINITE);
157 
169  bool WaitOutValueValid(int32_t timeout = RR_TIMEOUT_INFINITE);
170 
171  RR_SHARED_PTR<RobotRaconteurNode> GetNode();
172 
182  virtual bool GetIgnoreInValue();
183 
193  virtual void SetIgnoreInValue(bool ignore);
194 
195  virtual void AddListener(const RR_SHARED_PTR<WireConnectionBaseListener>& listener);
196 
207 
217  virtual int32_t GetInValueLifespan();
231  virtual void SetInValueLifespan(int32_t millis);
232 
242  virtual int32_t GetOutValueLifespan();
256  virtual void SetOutValueLifespan(int32_t millis);
257 
258  protected:
259  virtual void RemoteClose();
260 
261  RR_INTRUSIVE_PTR<RRValue> inval;
262  RR_INTRUSIVE_PTR<RRValue> outval;
263 
264  bool inval_valid;
265  TimeSpec lasttime_send;
266  boost::posix_time::ptime lasttime_send_local;
267 
268  bool outval_valid;
269  TimeSpec lasttime_recv;
270  boost::posix_time::ptime lasttime_recv_local;
271 
272  boost::condition_variable inval_wait;
273  boost::condition_variable outval_wait;
274 
275  int32_t inval_lifespan;
276  int32_t outval_lifespan;
277 
278  uint32_t endpoint;
279  RR_WEAK_PTR<WireBase> parent;
280  std::string service_path;
281  std::string member_name;
282 
283  boost::mutex sendlock;
284  boost::mutex recvlock;
285 
286  bool send_closed;
287  bool recv_closed;
288 
289  RR_INTRUSIVE_PTR<RRValue> GetInValueBase();
290 
291  RR_INTRUSIVE_PTR<RRValue> GetOutValueBase();
292 
293  void SetOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value);
294 
295  bool TryGetInValueBase(RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec& time);
296  bool TryGetOutValueBase(RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec& time);
297 
298  virtual void fire_WireValueChanged(const RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec time) = 0;
299 
300  virtual void fire_WireClosedCallback() = 0;
301 
302  void Shutdown();
303 
304  RR_SHARED_PTR<WireBase> GetParent();
305 
306  boost::mutex inval_lock;
307  boost::mutex outval_lock;
308 
309  bool ignore_inval;
310 
311  boost::mutex listeners_lock;
312  std::list<RR_WEAK_PTR<WireConnectionBaseListener> > listeners;
313 
314  detail::async_signal_semaphore wire_value_changed_semaphore;
315 
316  RR_WEAK_PTR<RobotRaconteurNode> node;
317 
318  MemberDefinition_Direction direction;
319 };
320 
348 template <typename T>
350 {
351  private:
352  boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> WireConnectionClosedCallback;
353  boost::mutex WireConnectionClosedCallback_lock;
354 
355  public:
362  boost::signals2::signal<void(const RR_SHARED_PTR<WireConnection<T> >& connection, T value, TimeSpec time)>
364 
371  boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> GetWireConnectionClosedCallback()
372  {
373  boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
374  return WireConnectionClosedCallback;
375  }
376 
387  void SetWireConnectionClosedCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> callback)
388  {
389  boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
390  WireConnectionClosedCallback = callback;
391  }
392 
393  RR_OVIRTUAL ~WireConnection() RR_OVERRIDE {}
394 
404  virtual T GetInValue() { return RRPrimUtil<T>::PreUnpack(GetInValueBase()); }
405 
415  virtual T GetOutValue() { return RRPrimUtil<T>::PreUnpack(GetOutValueBase()); }
416 
426  virtual void SetOutValue(typename boost::call_traits<T>::param_type value)
427  {
428  SetOutValueBase(RRPrimUtil<T>::PrePack(value));
429  }
430 
442  bool TryGetInValue(T& value, TimeSpec& time)
443  {
444  RR_INTRUSIVE_PTR<RRValue> o;
445  if (!TryGetInValueBase(o, time))
446  return false;
447  value = RRPrimUtil<T>::PreUnpack(o);
448  return true;
449  }
450 
462  bool TryGetOutValue(T& value, TimeSpec& time)
463  {
464  RR_INTRUSIVE_PTR<RRValue> o;
465  if (!TryGetOutValueBase(o, time))
466  return false;
467  value = RRPrimUtil<T>::PreUnpack(o);
468  return true;
469  }
470 
471  WireConnection(const RR_SHARED_PTR<WireBase>& parent, uint32_t endpoint = 0,
472  MemberDefinition_Direction direction = MemberDefinition_Direction_both)
473  : WireConnectionBase(parent, endpoint, direction)
474  {}
475 
476  protected:
477  RR_OVIRTUAL void fire_WireValueChanged(const RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec time) RR_OVERRIDE
478  {
479  WireValueChanged(RR_STATIC_POINTER_CAST<WireConnection<T> >(shared_from_this()),
480  RRPrimUtil<T>::PreUnpack(value), time);
481  }
482 
483  RR_OVIRTUAL void fire_WireClosedCallback() RR_OVERRIDE
484  {
485  boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> c = GetWireConnectionClosedCallback();
486  if (!c)
487  return;
488  c(RR_STATIC_POINTER_CAST<WireConnection<T> >(shared_from_this()));
489  }
490 
491  public:
492  RR_OVIRTUAL void Close() RR_OVERRIDE
493  {
495  {
496  boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
497  WireConnectionClosedCallback.clear();
498  }
499  WireValueChanged.disconnect_all_slots();
500  }
501 
502  protected:
503  virtual void AsyncClose1(const RR_SHARED_PTR<RobotRaconteurException>& err,
504  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
505  {
506  try
507  {
508  {
509  boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
510  WireConnectionClosedCallback.clear();
511  }
512  WireValueChanged.disconnect_all_slots();
513  }
514  catch (std::exception&)
515  {}
516 
517  handler(err);
518  }
519 
520  public:
521  RR_OVIRTUAL void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
522  int32_t timeout = 2000) RR_OVERRIDE
523  {
525  RR_STATIC_POINTER_CAST<WireConnection<T> >(shared_from_this()),
526  RR_BOOST_PLACEHOLDERS(_1), handler),
527  timeout);
528  }
529 
530  protected:
531  RR_OVIRTUAL void RemoteClose() RR_OVERRIDE
532  {
533  WireConnectionBase::RemoteClose();
534  {
535  boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
536  WireConnectionClosedCallback.clear();
537  }
538  WireValueChanged.disconnect_all_slots();
539  }
540 };
541 
546 class ROBOTRACONTEUR_CORE_API WireBase : public RR_ENABLE_SHARED_FROM_THIS<WireBase>, private boost::noncopyable
547 {
548 
549  public:
550  friend class WireConnectionBase;
551 
552  virtual ~WireBase() {}
553 
559  virtual std::string GetMemberName() = 0;
560 
561  virtual std::string GetServicePath() = 0;
562 
563  virtual void WirePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) = 0;
564 
565  virtual void Shutdown() = 0;
566 
567  virtual void AsyncClose(const RR_SHARED_PTR<WireConnectionBase>& endpoint, bool remote, uint32_t ee,
568  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
569  int32_t timeout) = 0;
570 
571  protected:
572  WireBase();
573 
574  virtual void SendWirePacket(const RR_INTRUSIVE_PTR<RRValue>& data, TimeSpec time, uint32_t endpoint) = 0;
575 
576  bool rawelements;
577 
578  void DispatchPacket(const RR_INTRUSIVE_PTR<MessageEntry>& me, const RR_SHARED_PTR<WireConnectionBase>& e);
579 
580  RR_INTRUSIVE_PTR<RRValue> UnpackPacket(const RR_INTRUSIVE_PTR<MessageEntry>& me, TimeSpec& ts);
581 
582  RR_INTRUSIVE_PTR<MessageEntry> PackPacket(const RR_INTRUSIVE_PTR<RRValue>& data, TimeSpec time);
583 
584  virtual RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data)
585  {
586  return GetNode()->PackVarType(data);
587  }
588 
589  virtual RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata)
590  {
591  return GetNode()->UnpackVarType(mdata);
592  }
593 
594  RR_WEAK_PTR<RobotRaconteurNode> node;
595 
596  MemberDefinition_Direction direction;
597 
598  public:
599  RR_SHARED_PTR<RobotRaconteurNode> GetNode();
600 
611 };
612 
665 template <typename T>
666 class Wire : public virtual WireBase
667 {
668 
669  friend class WireConnectionBase;
670 
671  public:
672  Wire(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify) { this->verify = RR_MOVE(verify); }
673 
674  RR_OVIRTUAL ~Wire() RR_OVERRIDE {}
675 
689  virtual RR_SHARED_PTR<WireConnection<T> > Connect() = 0;
690 
701  virtual void AsyncConnect(
702  boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
703  handler,
704  int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
705 
722  virtual T PeekInValue(TimeSpec& ts) = 0;
723 
740  virtual T PeekOutValue(TimeSpec& ts) = 0;
741 
755  virtual void PokeOutValue(const T& value) = 0;
756 
767  virtual void AsyncPeekInValue(
768  boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
769  int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
770 
781  virtual void AsyncPeekOutValue(
782  boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
783  int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
784 
796  virtual void AsyncPokeOutValue(const T& value,
797  boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
798  int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
799 
807  virtual boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> GetWireConnectCallback() = 0;
808 
827  virtual void SetWireConnectCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> function) = 0;
828 
836  virtual boost::function<T(const uint32_t&)> GetPeekInValueCallback() = 0;
837 
861  virtual void SetPeekInValueCallback(boost::function<T(const uint32_t&)> function) = 0;
862 
870  virtual boost::function<T(const uint32_t&)> GetPeekOutValueCallback() = 0;
871 
895  virtual void SetPeekOutValueCallback(boost::function<T(const uint32_t&)> function) = 0;
896 
905  virtual boost::function<void(const T&, const TimeSpec&, const uint32_t&)> GetPokeOutValueCallback() = 0;
906 
932  boost::function<void(const T&, const TimeSpec&, const uint32_t&)> function) = 0;
933 
934  protected:
935  RR_OVIRTUAL RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data) RR_OVERRIDE
936  {
937  if (verify)
938  {
939  verify(data);
940  }
941  return GetNode()->template PackAnyType<typename RRPrimUtil<T>::BoxedType>(data);
942  }
943 
944  RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata) RR_OVERRIDE
945  {
946  if (!verify)
947  {
948  return GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
949  }
950  else
951  {
952  RR_INTRUSIVE_PTR<RRValue> ret = GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
953  verify(ret);
954  return ret;
955  }
956  }
957 
958  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify;
959 };
960 
961 class ROBOTRACONTEUR_CORE_API ServiceStub;
962 
963 class ROBOTRACONTEUR_CORE_API WireClientBase : public virtual WireBase
964 {
965  friend class WireConnectionBase;
966  friend class WireSubscriptionBase;
967  friend class detail::WireSubscription_connection;
968 
969  public:
970  RR_OVIRTUAL ~WireClientBase() RR_OVERRIDE {}
971 
972  RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
973 
974  RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
975 
976  RR_OVIRTUAL void WirePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
977 
978  RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
979 
980  RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<WireConnectionBase>& endpoint, bool remote, uint32_t ee,
981  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
982  handler,
983  int32_t timeout) RR_OVERRIDE;
984 
985  RR_SHARED_PTR<ServiceStub> GetStub();
986 
987  protected:
988  RR_OVIRTUAL void SendWirePacket(const RR_INTRUSIVE_PTR<RRValue>& packet, TimeSpec time,
989  uint32_t endpoint) RR_OVERRIDE;
990 
991  std::string m_MemberName;
992  std::string service_path;
993  uint32_t endpoint;
994 
995  RR_SHARED_PTR<WireConnectionBase> connection;
996  boost::mutex connection_lock;
997 
998  RR_WEAK_PTR<ServiceStub> stub;
999 
1000  void AsyncConnect_internal(RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<WireConnectionBase>&,
1001  const RR_SHARED_PTR<RobotRaconteurException>&)>)
1002  handler,
1003  int32_t timeout);
1004 
1005  void AsyncConnect_internal1(const RR_INTRUSIVE_PTR<MessageEntry>& ret,
1006  const RR_SHARED_PTR<RobotRaconteurException>& err,
1007  boost::function<void(const RR_SHARED_PTR<WireConnectionBase>&,
1008  const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
1009 
1010  WireClientBase(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub,
1011  MemberDefinition_Direction direction);
1012 
1013  virtual RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(MemberDefinition_Direction direction) = 0;
1014 
1015  RR_INTRUSIVE_PTR<RRValue> PeekInValueBase(TimeSpec& ts);
1016  RR_INTRUSIVE_PTR<RRValue> PeekOutValueBase(TimeSpec& ts);
1017  void PokeOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value);
1018 
1019  void AsyncPeekInValueBase(RR_MOVE_ARG(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&, const TimeSpec&,
1020  const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
1021  int32_t timeout = RR_TIMEOUT_INFINITE);
1022  void AsyncPeekOutValueBase(RR_MOVE_ARG(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&, const TimeSpec&,
1023  const RR_SHARED_PTR<RobotRaconteurException>&)>)
1024  handler,
1025  int32_t timeout = RR_TIMEOUT_INFINITE);
1026  void AsyncPokeOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value,
1027  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
1028  handler,
1029  int32_t timeout = RR_TIMEOUT_INFINITE);
1030 
1031  void AsyncPeekValueBaseEnd1(const RR_INTRUSIVE_PTR<MessageEntry>& m,
1032  const RR_SHARED_PTR<RobotRaconteurException>& err,
1033  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&, const TimeSpec&,
1034  const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
1035 };
1036 
1037 template <typename T>
1038 class WireClient : public virtual Wire<T>, public virtual WireClientBase
1039 {
1040  public:
1041  WireClient(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub,
1042  MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1043  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1044  : WireClientBase(name, stub, direction), Wire<T>(verify)
1045  {
1046  rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1047  }
1048 
1049  RR_OVIRTUAL ~WireClient() RR_OVERRIDE {}
1050 
1051  RR_OVIRTUAL void AsyncConnect(
1052  boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
1053  handler,
1054  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1055  {
1056  AsyncConnect_internal(boost::bind(handler,
1057  boost::bind(&WireClient<T>::AsyncConnect_cast, RR_BOOST_PLACEHOLDERS(_1)),
1058  RR_BOOST_PLACEHOLDERS(_2)),
1059  timeout);
1060  }
1061 
1062  RR_OVIRTUAL RR_SHARED_PTR<WireConnection<T> > Connect() RR_OVERRIDE
1063  {
1064  ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
1065 
1066  RR_SHARED_PTR<detail::sync_async_handler<WireConnection<T> > > t =
1067  RR_MAKE_SHARED<detail::sync_async_handler<WireConnection<T> > >();
1068  AsyncConnect(boost::bind(&detail::sync_async_handler<WireConnection<T> >::operator(), t,
1069  RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2)),
1070  GetNode()->GetRequestTimeout());
1071  return t->end();
1072  }
1073 
1074  protected:
1075  static RR_SHARED_PTR<WireConnection<T> > AsyncConnect_cast(const RR_SHARED_PTR<WireConnectionBase>& b)
1076  {
1077  return rr_cast<WireConnection<T> >(b);
1078  }
1079 
1080  void AsyncPeekValueBaseEnd2(
1081  const RR_INTRUSIVE_PTR<RRValue>& value, const TimeSpec& ts, const RR_SHARED_PTR<RobotRaconteurException>& err,
1082  const boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
1083  {
1084 
1085  if (err)
1086  {
1087  typename boost::initialized<T> err_value;
1088  handler(err_value, ts, err);
1089  return;
1090  }
1091 
1092  T value2;
1093  try
1094  {
1095  value2 = RRPrimUtil<T>::PreUnpack(value);
1096  }
1097  catch (std::exception& exp)
1098  {
1099  typename boost::initialized<T> err_value;
1100  RR_SHARED_PTR<RobotRaconteurException> err = RobotRaconteurExceptionUtil::ExceptionToSharedPtr(exp);
1101  handler(err_value, ts, err);
1102  return;
1103  }
1104 
1105  handler(value2, ts, err);
1106  }
1107 
1108  public:
1109  RR_OVIRTUAL T PeekInValue(TimeSpec& ts) RR_OVERRIDE { return RRPrimUtil<T>::PreUnpack(PeekInValueBase(ts)); }
1110  RR_OVIRTUAL T PeekOutValue(TimeSpec& ts) RR_OVERRIDE { return RRPrimUtil<T>::PreUnpack(PeekOutValueBase(ts)); }
1111  RR_OVIRTUAL void PokeOutValue(const T& value) RR_OVERRIDE
1112  {
1113  return PokeOutValueBase(RRPrimUtil<T>::PrePack(value));
1114  }
1115  RR_OVIRTUAL void AsyncPeekInValue(
1116  boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1117  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1118  {
1119  AsyncPeekInValueBase(boost::bind(&WireClient::AsyncPeekValueBaseEnd2,
1120  RR_DYNAMIC_POINTER_CAST<WireClient>(shared_from_this()),
1121  RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2),
1122  RR_BOOST_PLACEHOLDERS(_3), RR_MOVE(handler)),
1123  timeout);
1124  }
1125  RR_OVIRTUAL void AsyncPeekOutValue(
1126  boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1127  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1128  {
1129  AsyncPeekOutValueBase(boost::bind(&WireClient::AsyncPeekValueBaseEnd2,
1130  RR_DYNAMIC_POINTER_CAST<WireClient>(shared_from_this()),
1131  RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2),
1132  RR_BOOST_PLACEHOLDERS(_3), RR_MOVE(handler)),
1133  timeout);
1134  }
1135  RR_OVIRTUAL void AsyncPokeOutValue(const T& value,
1136  boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1137  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1138  {
1139  AsyncPokeOutValueBase(RRPrimUtil<T>::PrePack(value), RR_MOVE(handler), timeout);
1140  }
1141 
1142  // Unused service-side functions
1143  RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> GetWireConnectCallback() RR_OVERRIDE
1144  {
1145  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1146  "GetWireConnectCallback is not valid for WireClient");
1147  throw InvalidOperationException("Not valid for client");
1148  }
1149  RR_OVIRTUAL void SetWireConnectCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> function)
1150  RR_OVERRIDE
1151  {
1152  RR_UNUSED(function);
1153  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1154  "SetWireConnectCallback is not valid for WireClient");
1155  throw InvalidOperationException("Not valid for client");
1156  }
1157  RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekInValueCallback() RR_OVERRIDE
1158  {
1159  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1160  "GetPeekInValueCallback is not valid for WireClient");
1161  throw InvalidOperationException("Not valid for client");
1162  }
1163  RR_OVIRTUAL void SetPeekInValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1164  {
1165  RR_UNUSED(function);
1166  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1167  "SetPeekInValueCallback is not valid for WireClient");
1168  throw InvalidOperationException("Not valid for client");
1169  }
1170  RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekOutValueCallback() RR_OVERRIDE
1171  {
1172  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1173  "GetPeekOutValueCallback is not valid for WireClient");
1174  throw InvalidOperationException("Not valid for client");
1175  }
1176  RR_OVIRTUAL void SetPeekOutValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1177  {
1178  RR_UNUSED(function);
1179  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1180  "SetPeekOutValueCallback is not valid for WireClient");
1181  throw InvalidOperationException("Not valid for client");
1182  }
1183  RR_OVIRTUAL boost::function<void(const T&, const TimeSpec&, const uint32_t&)> GetPokeOutValueCallback() RR_OVERRIDE
1184  {
1185  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1186  "GetPokeOutValueCallback is not valid for WireClient");
1187  throw InvalidOperationException("Not valid for client");
1188  }
1189  RR_OVIRTUAL void SetPokeOutValueCallback(boost::function<void(const T&, const TimeSpec&, const uint32_t&)> function)
1190  RR_OVERRIDE
1191  {
1192  RR_UNUSED(function);
1193  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1194  "SetPokeOutValueCallback is not valid for WireClient");
1195  throw InvalidOperationException("Not valid for client");
1196  }
1197 
1198  protected:
1199  RR_OVIRTUAL RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(MemberDefinition_Direction direction)
1200  RR_OVERRIDE
1201  {
1202  return RR_MAKE_SHARED<WireConnection<T> >(RR_STATIC_POINTER_CAST<WireBase>(shared_from_this()), 0, direction);
1203  }
1204 };
1205 
1206 class ROBOTRACONTEUR_CORE_API ServiceSkel;
1207 class ROBOTRACONTEUR_CORE_API WireServerBase : public virtual WireBase
1208 {
1209  friend class WireConnectionBase;
1210 
1211  public:
1212  RR_OVIRTUAL ~WireServerBase() RR_OVERRIDE {}
1213 
1214  RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
1215 
1216  RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
1217 
1218  RR_OVIRTUAL void WirePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
1219 
1220  RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
1221 
1222  RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<WireConnectionBase>& endpoint, bool remote, uint32_t ee,
1223  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
1224  handler,
1225  int32_t timeout) RR_OVERRIDE;
1226 
1227  virtual RR_INTRUSIVE_PTR<MessageEntry> WireCommand(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e);
1228 
1229  RR_SHARED_PTR<ServiceSkel> GetSkel();
1230 
1231  protected:
1232  RR_OVIRTUAL void SendWirePacket(const RR_INTRUSIVE_PTR<RRValue>& packet, TimeSpec time,
1233  uint32_t endpoint) RR_OVERRIDE;
1234 
1235  std::string m_MemberName;
1236  std::string service_path;
1237 
1238  RR_UNORDERED_MAP<uint32_t, RR_SHARED_PTR<WireConnectionBase> > connections;
1239  boost::mutex connections_lock;
1240 
1241  RR_WEAK_PTR<ServiceSkel> skel;
1242 
1243  WireServerBase(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel,
1244  MemberDefinition_Direction direction);
1245 
1246  virtual RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(uint32_t e,
1247  MemberDefinition_Direction direction) = 0;
1248 
1249  virtual void fire_WireConnectCallback(const RR_SHARED_PTR<WireConnectionBase>& e) = 0;
1250 
1251  bool init;
1252  boost::signals2::connection listener_connection;
1253 
1254  public:
1255  void ClientDisconnected(const RR_SHARED_PTR<ServerContext>& context, ServerServiceListenerEventType ev,
1256  const RR_SHARED_PTR<void>& param);
1257 
1258  protected:
1259  virtual RR_INTRUSIVE_PTR<RRValue> do_PeekInValue(const uint32_t&) = 0;
1260  virtual RR_INTRUSIVE_PTR<RRValue> do_PeekOutValue(const uint32_t&) = 0;
1261  virtual void do_PokeOutValue(const RR_INTRUSIVE_PTR<RRValue>& value, const TimeSpec&, const uint32_t& ep) = 0;
1262 };
1263 
1264 template <typename T>
1265 class WireServer : public virtual WireServerBase, public virtual Wire<T>
1266 {
1267 
1268  public:
1269  WireServer(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel,
1270  MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1271  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1272  : WireServerBase(name, skel, direction), Wire<T>(verify)
1273  {
1274  rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1275  }
1276 
1277  RR_OVIRTUAL ~WireServer() RR_OVERRIDE {}
1278 
1279  RR_OVIRTUAL void AsyncConnect(
1280  boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
1281  handler,
1282  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1283  {
1284  RR_UNUSED(handler);
1285  RR_UNUSED(timeout);
1286  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1287  "AsyncConnect is not valid for WireServer");
1288  throw InvalidOperationException("Not valid for server");
1289  }
1290  RR_OVIRTUAL RR_SHARED_PTR<WireConnection<T> > Connect() RR_OVERRIDE
1291  {
1292  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1293  "Connect is not valid for WireServer");
1294  throw InvalidOperationException("Not valid for server");
1295  }
1296  RR_OVIRTUAL T PeekInValue(TimeSpec& ts) RR_OVERRIDE
1297  {
1298  RR_UNUSED(ts);
1299  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1300  "PeekInValue is not valid for WireServer");
1301  throw InvalidOperationException("Not valid for server");
1302  }
1303  RR_OVIRTUAL T PeekOutValue(TimeSpec& ts) RR_OVERRIDE
1304  {
1305  RR_UNUSED(ts);
1306  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1307  "PeekOutValue is not valid for WireServer");
1308  throw InvalidOperationException("Not valid for server");
1309  }
1310  RR_OVIRTUAL void PokeOutValue(const T& value) RR_OVERRIDE
1311  {
1312  RR_UNUSED(value);
1313  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1314  "PokeOutValue is not valid for WireServer");
1315  throw InvalidOperationException("Not valid for server");
1316  }
1317  RR_OVIRTUAL void AsyncPeekInValue(
1318  boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1319  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1320  {
1321  RR_UNUSED(handler);
1322  RR_UNUSED(timeout);
1323  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1324  "AsyncPeekInValue is not valid for WireServer");
1325  throw InvalidOperationException("Not valid for server");
1326  }
1327  RR_OVIRTUAL void AsyncPeekOutValue(
1328  boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1329  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1330  {
1331  RR_UNUSED(handler);
1332  RR_UNUSED(timeout);
1333  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1334  "AsyncPeekOutValue is not valid for WireServer");
1335  throw InvalidOperationException("Not valid for server");
1336  }
1337  RR_OVIRTUAL void AsyncPokeOutValue(const T& value,
1338  boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1339  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1340  {
1341  RR_UNUSED(value);
1342  RR_UNUSED(handler);
1343  RR_UNUSED(timeout);
1344  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1345  "AsyncPokeOutValue is not valid for WireServer");
1346  throw InvalidOperationException("Not valid for server");
1347  }
1348 
1349  RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> GetWireConnectCallback() RR_OVERRIDE
1350  {
1351  return callback;
1352  }
1353  RR_OVIRTUAL void SetWireConnectCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> function)
1354  RR_OVERRIDE
1355  {
1356  callback = function;
1357  }
1358  RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekInValueCallback() RR_OVERRIDE { return peek_in_callback; }
1359  RR_OVIRTUAL void SetPeekInValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1360  {
1361  peek_in_callback = function;
1362  }
1363  RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekOutValueCallback() RR_OVERRIDE { return peek_out_callback; }
1364  RR_OVIRTUAL void SetPeekOutValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1365  {
1366  peek_out_callback = function;
1367  }
1368  RR_OVIRTUAL boost::function<void(const T&, const TimeSpec&, const uint32_t&)> GetPokeOutValueCallback() RR_OVERRIDE
1369  {
1370  return poke_out_callback;
1371  }
1372  RR_OVIRTUAL void SetPokeOutValueCallback(boost::function<void(const T&, const TimeSpec&, const uint32_t&)> function)
1373  RR_OVERRIDE
1374  {
1375  poke_out_callback = function;
1376  }
1377 
1378  protected:
1379  RR_OVIRTUAL RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(
1380  uint32_t e, MemberDefinition_Direction direction) RR_OVERRIDE
1381  {
1382  return RR_MAKE_SHARED<WireConnection<T> >(RR_STATIC_POINTER_CAST<WireBase>(shared_from_this()), e, direction);
1383  }
1384 
1385  boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> callback;
1386  boost::function<T(const uint32_t&)> peek_in_callback;
1387  boost::function<T(const uint32_t&)> peek_out_callback;
1388  boost::function<void(const T&, const TimeSpec&, const uint32_t&)> poke_out_callback;
1389 
1390  RR_OVIRTUAL void fire_WireConnectCallback(const RR_SHARED_PTR<WireConnectionBase>& e) RR_OVERRIDE
1391  {
1392  if (!callback)
1393  return;
1394  callback(RR_STATIC_POINTER_CAST<WireConnection<T> >(e));
1395  }
1396 
1397  RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> do_PeekInValue(const uint32_t& ep) RR_OVERRIDE
1398  {
1399  if (!peek_in_callback)
1400  {
1401  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, ep, service_path, m_MemberName,
1402  "Attempt to call PeekInValue when callback not set");
1403  throw InvalidOperationException("Invalid operation");
1404  }
1405  return RRPrimUtil<T>::PrePack(peek_in_callback(ep));
1406  }
1407 
1408  RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> do_PeekOutValue(const uint32_t& ep) RR_OVERRIDE
1409  {
1410  if (!peek_out_callback)
1411  {
1412  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, ep, service_path, m_MemberName,
1413  "Attempt to call PeekOutValue when callback not set");
1414  throw InvalidOperationException("Invalid operation");
1415  }
1416  return RRPrimUtil<T>::PrePack(peek_out_callback(ep));
1417  }
1418 
1419  RR_OVIRTUAL void do_PokeOutValue(const RR_INTRUSIVE_PTR<RRValue>& value, const TimeSpec& ts,
1420  const uint32_t& ep) RR_OVERRIDE
1421  {
1422  if (!poke_out_callback)
1423  {
1424  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, ep, service_path, m_MemberName,
1425  "Attempt to call PokeOutValue when callback not set");
1426  throw InvalidOperationException("Invalid operation");
1427  }
1428  return poke_out_callback(RRPrimUtil<T>::PreUnpack(value), ts, ep);
1429  }
1430 
1431  public:
1432  RR_OVIRTUAL void Shutdown() RR_OVERRIDE
1433  {
1434  WireServerBase::Shutdown();
1435  callback.clear();
1436  peek_in_callback.clear();
1437  peek_out_callback.clear();
1438  poke_out_callback.clear();
1439  }
1440 };
1441 
1442 namespace detail
1443 {
1444 template <typename T>
1445 class Wire_traits;
1446 
1447 template <typename T>
1448 class Wire_traits<Wire<T> >
1449 {
1450  public:
1451  typedef WireConnection<T> wireconnection_type;
1452  typedef WireClient<T> wireclient_type;
1453  typedef WireServer<T> wireserver_type;
1454 };
1455 
1456 } // namespace detail
1457 
1458 namespace detail
1459 {
1460 class WireBroadcaster_connected_connection;
1461 }
1462 
1469 class ROBOTRACONTEUR_CORE_API WireBroadcasterBase : public RR_ENABLE_SHARED_FROM_THIS<WireBroadcasterBase>
1470 {
1471  public:
1472  size_t GetActiveWireConnectionCount();
1473 
1474  virtual ~WireBroadcasterBase();
1475 
1482  boost::function<bool(RR_SHARED_PTR<WireBroadcasterBase>&, uint32_t)> GetPredicate();
1483 
1501  void SetPredicate(boost::function<bool(const RR_SHARED_PTR<WireBroadcasterBase>&, uint32_t)> f);
1502 
1505 
1507  void SetOutValueLifespan(int32_t millis);
1508 
1509  protected:
1511 
1512  void InitBase(const RR_SHARED_PTR<WireBase>& wire);
1513 
1514  void ConnectionClosedBase(const RR_SHARED_PTR<detail::WireBroadcaster_connected_connection>& ep);
1515 
1516  void ConnectionConnectedBase(const RR_SHARED_PTR<WireConnectionBase>& ep);
1517 
1518  void SetOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value);
1519 
1520  virtual void AttachWireServerEvents(const RR_SHARED_PTR<WireServerBase>& w);
1521 
1522  virtual void AttachWireConnectionEvents(const RR_SHARED_PTR<WireConnectionBase>& w,
1523  const RR_SHARED_PTR<detail::WireBroadcaster_connected_connection>& cep);
1524 
1525  RR_INTRUSIVE_PTR<RRValue> ClientPeekInValueBase();
1526 
1527  std::list<RR_SHARED_PTR<detail::WireBroadcaster_connected_connection> > connected_wires;
1528  boost::mutex connected_wires_lock;
1529  RR_WEAK_PTR<WireServerBase> wire;
1530  RR_WEAK_PTR<RobotRaconteurNode> node;
1531  std::string service_path;
1532  std::string member_name;
1533 
1534  bool copy_element;
1535 
1536  boost::function<bool(RR_SHARED_PTR<WireBroadcasterBase>&, uint32_t)> predicate;
1537 
1538  RR_INTRUSIVE_PTR<RRValue> out_value;
1539  boost::initialized<bool> out_value_valid;
1540 
1541  int32_t out_value_lifespan;
1542  boost::posix_time::ptime out_value_lasttime_local;
1543 
1544  void ServiceEvent(ServerServiceListenerEventType evt);
1545 
1546  RR_SHARED_PTR<WireBase> GetWireBase();
1547 };
1548 
1574 template <typename T>
1576 {
1577  public:
1586 
1595  void Init(RR_SHARED_PTR<Wire<T> > wire) { InitBase(wire); }
1596 
1609  void SetOutValue(T value) { SetOutValueBase(RRPrimUtil<T>::PrePack(value)); }
1610 
1611  RR_SHARED_PTR<Wire<T> > GetWire() { return rr_cast<Wire<T> >(GetWireBase()); }
1612 
1613  protected:
1614  T ClientPeekInValue() { return RRPrimUtil<T>::PreUnpack(ClientPeekInValueBase()); }
1615 
1616  static T ClientPeekOutValue() { throw ReadOnlyMemberException("Read only wire"); }
1617  static T ClientPokeOutValue() { throw ReadOnlyMemberException("Read only wire"); }
1618 
1619  RR_OVIRTUAL void AttachWireServerEvents(const RR_SHARED_PTR<WireServerBase>& w) RR_OVERRIDE
1620  {
1621  RR_SHARED_PTR<WireServer<T> > w_T = rr_cast<WireServer<T> >(w);
1622  w_T->SetWireConnectCallback(boost::bind(&WireBroadcaster::ConnectionConnectedBase, this->shared_from_this(),
1623  RR_BOOST_PLACEHOLDERS(_1)));
1624  w_T->SetPeekInValueCallback(boost::bind(&WireBroadcaster<T>::ClientPeekInValue,
1625  RR_STATIC_POINTER_CAST<WireBroadcaster<T> >(this->shared_from_this())));
1626  w_T->SetPeekOutValueCallback(boost::bind(&WireBroadcaster<T>::ClientPeekOutValue));
1627  w_T->SetPokeOutValueCallback(boost::bind(&WireBroadcaster<T>::ClientPokeOutValue));
1628  }
1629 
1630  RR_OVIRTUAL void AttachWireConnectionEvents(const RR_SHARED_PTR<WireConnectionBase>& w,
1631  const RR_SHARED_PTR<detail::WireBroadcaster_connected_connection>& c)
1632  RR_OVERRIDE
1633  {
1634  RR_SHARED_PTR<WireConnection<T> > w_T = rr_cast<WireConnection<T> >(w);
1635  w_T->SetWireConnectionClosedCallback(
1636  boost::bind(&WireBroadcaster::ConnectionClosedBase, this->shared_from_this(), c));
1637  }
1638 };
1639 
1640 namespace detail
1641 {
1642 static void WireUnicastReceiverBase_empty_close_handler(const RR_SHARED_PTR<RobotRaconteurException>& err) {}
1643 } // namespace detail
1644 
1645 template <typename T, typename U>
1646 class WireUnicastReceiverBase : public RR_ENABLE_SHARED_FROM_THIS<WireUnicastReceiverBase<T, U> >
1647 {
1648  public:
1649  typedef typename detail::Wire_traits<T>::wireserver_type wireserver_type;
1650  typedef typename detail::Wire_traits<T>::wireconnection_type wireconnection_type;
1651 
1659  WireUnicastReceiverBase() : in_value_lifespan(-1) {}
1660  virtual ~WireUnicastReceiverBase() {}
1661 
1670  void Init(const RR_SHARED_PTR<T>& wire)
1671  {
1672  node = wire->GetNode();
1673  in_value_lifespan = -1;
1674  RR_SHARED_PTR<wireserver_type> wire_server = RR_DYNAMIC_POINTER_CAST<wireserver_type>(wire);
1675  if (!wire_server)
1676  {
1677  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, member_name,
1678  "WireUnicastReceiver init must be passed a WireServer");
1679  throw InvalidOperationException("WireServer required for WireUnicastReceiver");
1680  }
1681  this->wire = wire_server;
1682  wire_server->SetWireConnectCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ConnectionConnected,
1683  this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1684  wire_server->SetPeekInValueCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ClientPeekInValue));
1685  wire_server->SetPeekOutValueCallback(
1686  boost::bind(&WireUnicastReceiverBase<T, U>::ClientPeekOutValue, this->shared_from_this()));
1687  wire_server->SetPokeOutValueCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ClientPokeOutValue,
1688  this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1),
1689  RR_BOOST_PLACEHOLDERS(_2), RR_BOOST_PLACEHOLDERS(_3)));
1690 
1691  wire_server->GetSkel()->GetContext()->ServerServiceListener.connect(
1692  boost::signals2::signal<void(
1693  const RR_SHARED_PTR<ServerContext>&, ServerServiceListenerEventType,
1694  const RR_SHARED_PTR<void>&)>::slot_type(boost::bind(&WireUnicastReceiverBase::ServiceEvent, this,
1695  RR_BOOST_PLACEHOLDERS(_2)))
1696  .track(this->shared_from_this()));
1697 
1698  this->service_path = wire_server->GetServicePath();
1699  this->member_name = wire_server->GetMemberName();
1700 
1701  ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, -1, service_path, member_name,
1702  "WireUnicastReceiver initialized");
1703  }
1704 
1716  U GetInValue(TimeSpec& ts, uint32_t& ep)
1717  {
1718  boost::mutex::scoped_lock lock(this_lock);
1719  if (!in_value_valid.data())
1720  throw ValueNotSetException("Value not set");
1721  if (detail::WireConnectionBase_IsValueExpired(node, in_value_lasttime_local, in_value_lifespan))
1722  {
1723  throw ValueNotSetException("Value expired");
1724  }
1725  ts = in_value_ts;
1726  ep = in_value_ep;
1727  return in_value;
1728  }
1729 
1743  bool TryGetInValue(U& value, TimeSpec& ts, uint32_t& ep)
1744  {
1745  boost::mutex::scoped_lock lock(this_lock);
1746  if (!in_value_valid)
1747  return false;
1748  if (detail::WireConnectionBase_IsValueExpired(node, in_value_lasttime_local, in_value_lifespan))
1749  {
1750  return false;
1751  }
1752  value = in_value;
1753  ts = in_value_ts;
1754  ep = in_value_ep;
1755  return true;
1756  }
1757 
1764  boost::signals2::signal<void(const U&, const TimeSpec&, const uint32_t&)> InValueChanged;
1765 
1767  int32_t GetInValueLifespan()
1768  {
1769  boost::mutex::scoped_lock lock(this_lock);
1770  return in_value_lifespan;
1771  }
1772 
1774  void SetInValueLifespan(int32_t millis)
1775  {
1776  boost::mutex::scoped_lock lock(this_lock);
1777  in_value_lifespan = millis;
1778  }
1779 
1780  RR_SHARED_PTR<T> GetWire() { return wire; }
1781 
1782  protected:
1783  void ConnectionConnected(const RR_SHARED_PTR<wireconnection_type>& connection)
1784  {
1785  boost::mutex::scoped_lock lock(this_lock);
1786  if (active_connection)
1787  {
1788  uint32_t active_ep = active_connection->GetEndpoint();
1789  try
1790  {
1791  active_connection->AsyncClose(&detail::WireUnicastReceiverBase_empty_close_handler);
1792  }
1793  catch (std::exception&)
1794  {}
1795  active_connection.reset();
1796 
1797  ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, active_ep, service_path, member_name,
1798  "WireUnicastReceiver active wire closed for new connection");
1799  }
1800  active_connection = connection;
1801  connection->SetWireConnectionClosedCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ConnectionClosed,
1802  this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1803  connection->WireValueChanged.connect(boost::bind(&WireUnicastReceiverBase<T, U>::ConnectionInValueChanged,
1804  this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1),
1805  RR_BOOST_PLACEHOLDERS(_2), RR_BOOST_PLACEHOLDERS(_3)));
1806 
1807  ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, -1, service_path, member_name,
1808  "WireUnicastReceiver wire connected, made active wire");
1809  }
1810 
1811  void ConnectionClosed(const RR_SHARED_PTR<wireconnection_type>& connection)
1812  {
1813  boost::mutex::scoped_lock lock(this_lock);
1814  if (active_connection == connection)
1815  {
1816  active_connection.reset();
1817  }
1818  }
1819 
1820  void ConnectionInValueChanged(const RR_SHARED_PTR<wireconnection_type>& connection, const U& value,
1821  const TimeSpec& time)
1822  {
1823  ClientPokeOutValue(value, time, connection->GetEndpoint());
1824  }
1825 
1826  static U ClientPeekInValue() { throw WriteOnlyMemberException("Write only wire"); }
1827 
1828  U ClientPeekOutValue()
1829  {
1830  boost::mutex::scoped_lock lock(this_lock);
1831  if (!in_value_valid)
1832  throw ValueNotSetException("Value not set");
1833  return in_value;
1834  }
1835 
1836  void ClientPokeOutValue(const U& value, const TimeSpec& ts, const uint32_t& ep)
1837  {
1838  RR_SHARED_PTR<RobotRaconteurNode> n = node.lock();
1839  boost::mutex::scoped_lock lock(this_lock);
1840  in_value = value;
1841  in_value_ts = ts;
1842  in_value_valid.data() = true;
1843  in_value_ep.data() = ep;
1844  if (n)
1845  {
1846  in_value_lasttime_local = n->NowNodeTime();
1847  }
1848 
1849  lock.unlock();
1850 
1851  InValueChanged(value, ts, ep);
1852 
1853  ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, ep, service_path, member_name,
1854  "WireUnicastReceiver value changed");
1855  }
1856 
1857  void ServiceEvent(ServerServiceListenerEventType evt)
1858  {
1859  if (evt != ServerServiceListenerEventType_ServiceClosed)
1860  return;
1861  boost::mutex::scoped_lock lock(this_lock);
1862  InValueChanged.disconnect_all_slots();
1863  }
1864 
1865  RR_SHARED_PTR<wireserver_type> wire;
1866  RR_SHARED_PTR<wireconnection_type> active_connection;
1867  boost::mutex this_lock;
1868  U in_value;
1869  TimeSpec in_value_ts;
1870  boost::initialized<bool> in_value_valid;
1871  boost::initialized<uint32_t> in_value_ep;
1872  boost::posix_time::ptime in_value_lasttime_local;
1873  int32_t in_value_lifespan;
1874 
1875  std::string member_name;
1876  std::string service_path;
1877  RR_WEAK_PTR<RobotRaconteurNode> node;
1878 };
1879 
1910 template <typename T>
1911 class WireUnicastReceiver : public WireUnicastReceiverBase<Wire<T>, T>
1912 {
1913 };
1914 #ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
1916 using WireConnectionBasePtr = RR_SHARED_PTR<WireConnectionBase>;
1918 template <typename T>
1919 using WireConnectionPtr = RR_SHARED_PTR<WireConnection<T> >;
1921 using WireBasePtr = RR_SHARED_PTR<WireBase>;
1923 template <typename T>
1924 using WirePtr = RR_SHARED_PTR<Wire<T> >;
1926 template <typename T>
1927 using WireBroadcasterPtr = RR_SHARED_PTR<WireBroadcaster<T> >;
1929 template <typename T>
1930 using WireUnicastReceiverPtr = RR_SHARED_PTR<WireUnicastReceiver<T> >;
1931 #endif
1932 
1933 } // namespace RobotRaconteur
1934 
1935 #pragma warning(pop)
ServerServiceListenerEventType
Enum of service listener events.
Definition: RobotRaconteurConstants.h:518
MemberDefinition_Direction
Member direction enum.
Definition: RobotRaconteurConstants.h:534
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition: RobotRaconteurConstants.h:566
boost::shared_ptr< WireConnectionBase > WireConnectionBasePtr
Convenience alias for WireConnectionBase shared_ptr.
Definition: WireMember.h:1916
boost::shared_ptr< Wire< T > > WirePtr
Convenience alias for Wire shared_ptr.
Definition: WireMember.h:1924
boost::shared_ptr< WireBase > WireBasePtr
Convenience alias for WireBase shared_ptr.
Definition: WireMember.h:1921
boost::shared_ptr< WireBroadcaster< T > > WireBroadcasterPtr
Convenience alias for WireBroadcaster shared_ptr.
Definition: WireMember.h:1927
boost::shared_ptr< WireConnection< T > > WireConnectionPtr
Convenience alias for WireConnection shared_ptr.
Definition: WireMember.h:1919
boost::shared_ptr< WireUnicastReceiver< T > > WireUnicastReceiverPtr
Convenience alias for WireUnicastReceiver shared_ptr.
Definition: WireMember.h:1930
Represents. a point in time. Used by wire members to timestamp packets.
Definition: DataTypes.h:2666
Base class for Wire.
Definition: WireMember.h:547
virtual std::string GetMemberName()=0
Get the member name of the wire.
MemberDefinition_Direction Direction()
The direction of the wire.
Base class for WireBroadcaster.
Definition: WireMember.h:1470
void SetOutValueLifespan(int32_t millis)
Set the lifespan of OutValue.
boost::function< bool(boost::shared_ptr< WireBroadcasterBase > &, uint32_t)> GetPredicate()
Get the current predicate callback function.
int32_t GetOutValueLifespan()
Get the lifespan of OutValue.
void SetPredicate(boost::function< bool(const boost::shared_ptr< WireBroadcasterBase > &, uint32_t)> f)
Set the predicate callback function.
Broadcaster to send values to all connected clients.
Definition: WireMember.h:1576
WireBroadcaster()
Construct a new WireBroadcaster.
Definition: WireMember.h:1585
void SetOutValue(T value)
Set the OutValue for all connections.
Definition: WireMember.h:1609
void Init(boost::shared_ptr< Wire< T > > wire)
Initialize the WireBroadcaster.
Definition: WireMember.h:1595
Base class for WireConnection.
Definition: WireMember.h:58
virtual int32_t GetOutValueLifespan()
Get the lifespan of OutValue.
virtual void SetIgnoreInValue(bool ignore)
Set whether wire connection should ignore incoming values.
virtual TimeSpec GetLastValueSentTime()
Get the timestamp of the last sent value.
bool WaitInValueValid(int32_t timeout=RR_TIMEOUT_INFINITE)
Waits for InValue to be valid.
virtual void Close()
Close the wire connection.
bool WaitOutValueValid(int32_t timeout=RR_TIMEOUT_INFINITE)
Waits for OutValue to be valid.
virtual uint32_t GetEndpoint()
Returns the Robot Raconteur node Endpoint ID.
virtual void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout)
Asynchronously close the wire connection.
virtual bool GetOutValueValid()
Get if the OutValue is valid.
MemberDefinition_Direction Direction()
The direction of the wire.
virtual void SetOutValueLifespan(int32_t millis)
Set the lifespan of OutValue.
virtual int32_t GetInValueLifespan()
Get the lifespan of InValue.
virtual bool GetIgnoreInValue()
Get if wire connection is ignoring incoming values.
virtual void SetInValueLifespan(int32_t millis)
Set the lifespan of InValue.
virtual TimeSpec GetLastValueReceivedTime()
Get the timestamp of the last received value.
virtual bool GetInValueValid()
Get if the InValue is valid.
Wire connection used to transmit "most recent" values.
Definition: WireMember.h:350
virtual T GetOutValue()
Get the current OutValue.
Definition: WireMember.h:415
RR_OVIRTUAL void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=2000) RR_OVERRIDE
Asynchronously close the wire connection.
Definition: WireMember.h:521
virtual T GetInValue()
Get the current InValue.
Definition: WireMember.h:404
bool TryGetInValue(T &value, TimeSpec &time)
Try getting the InValue, returning true on success or false on failure.
Definition: WireMember.h:442
RR_OVIRTUAL void Close() RR_OVERRIDE
Close the wire connection.
Definition: WireMember.h:492
bool TryGetOutValue(T &value, TimeSpec &time)
Try getting the OutValue, returning true on success or false on failure.
Definition: WireMember.h:462
boost::signals2::signal< void(const boost::shared_ptr< WireConnection< T > > &connection, T value, TimeSpec time)> WireValueChanged
Signal invoked when the InValue is changed.
Definition: WireMember.h:363
virtual void SetOutValue(typename boost::call_traits< T >::param_type value)
Set the OutValue and transmit to the peer connection.
Definition: WireMember.h:426
void SetWireConnectionClosedCallback(boost::function< void(const boost::shared_ptr< WireConnection< T > > &)> callback)
Set the connection closed callback function.
Definition: WireMember.h:387
boost::function< void(boost::shared_ptr< WireConnection< T > >)> GetWireConnectionClosedCallback()
Get the currently configured connection closed callback function.
Definition: WireMember.h:371
wire member type interface
Definition: WireMember.h:667
virtual void SetWireConnectCallback(boost::function< void(const boost::shared_ptr< WireConnection< T > > &)> function)=0
Set wire connected callback function.
virtual boost::function< void(const T &, const TimeSpec &, const uint32_t &)> GetPokeOutValueCallback()=0
Get the currently configure PokeOutValue callback.
virtual T PeekInValue(TimeSpec &ts)=0
Peek the current InValue.
virtual boost::shared_ptr< WireConnection< T > > Connect()=0
Connect the wire.
virtual void SetPeekInValueCallback(boost::function< T(const uint32_t &)> function)=0
Set the PeekInValue callback function.
virtual boost::function< T(const uint32_t &)> GetPeekOutValueCallback()=0
Get the currently configure PeekOutValue callback.
virtual void AsyncConnect(boost::function< void(const boost::shared_ptr< WireConnection< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously connect the wire.
virtual boost::function< void(const boost::shared_ptr< WireConnection< T > > &)> GetWireConnectCallback()=0
Get the currently configured wire connected callback function.
virtual void AsyncPeekInValue(boost::function< void(const T &, const TimeSpec &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously peek the current InValue.
virtual boost::function< T(const uint32_t &)> GetPeekInValueCallback()=0
Get the currently configure PeekInValue callback.
virtual void SetPokeOutValueCallback(boost::function< void(const T &, const TimeSpec &, const uint32_t &)> function)=0
Set the PokeOutValue callback function.
virtual void SetPeekOutValueCallback(boost::function< T(const uint32_t &)> function)=0
Set the PeekOutValue callback function.
virtual void PokeOutValue(const T &value)=0
Poke the OutValue.
virtual void AsyncPeekOutValue(boost::function< void(const T &, const TimeSpec &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously peek the current OutValue.
virtual void AsyncPokeOutValue(const T &value, boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously poke the OutValue.
virtual T PeekOutValue(TimeSpec &ts)=0
Peek the current OutValue.
Base class for WireSubscription.
Definition: Subscription.h:1083
Receive the InValue from the most recent connection.
Definition: WireMember.h:1912