Robot Raconteur Core C++ Library
PipeMember.h
Go to the documentation of this file.
1 
24 #pragma once
25 
26 #include "RobotRaconteur/Message.h"
30 #include <boost/function.hpp>
31 #include <boost/unordered_map.hpp>
32 #include <boost/call_traits.hpp>
33 #include <list>
34 
35 #pragma warning(push)
36 #pragma warning(disable : 4250)
37 #pragma warning(disable : 4996)
38 
39 #include <boost/signals2.hpp>
40 
41 namespace RobotRaconteur
42 {
43 
44 class ROBOTRACONTEUR_CORE_API PipeBase;
45 class ROBOTRACONTEUR_CORE_API PipeEndpointBaseListener;
46 namespace detail
47 {
48 class PipeSubscription_connection;
49 }
50 
57 class ROBOTRACONTEUR_CORE_API PipeEndpointBase : public RR_ENABLE_SHARED_FROM_THIS<PipeEndpointBase>,
58  private boost::noncopyable
59 {
60  friend class PipeBase;
61  friend class PipeClientBase;
62  friend class PipeServerBase;
63  friend class PipeBroadcasterBase;
64  friend class PipeSubscriptionBase;
65  friend class detail::PipeSubscription_connection;
66 
67  public:
68  virtual ~PipeEndpointBase() {}
69 
75  virtual int32_t GetIndex();
76 
85  virtual uint32_t GetEndpoint();
86 
94 
104  void SetRequestPacketAck(bool ack);
105 
113  virtual void Close();
114 
123  virtual void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
124  int32_t timeout = RR_TIMEOUT_INFINITE);
125 
133  virtual size_t Available();
134 
144  bool IsUnreliable();
145 
156 
177  void SetIgnoreReceived(bool ignore);
178 
179  virtual void AddListener(const RR_SHARED_PTR<PipeEndpointBaseListener>& listener);
180 
181  RR_SHARED_PTR<RobotRaconteurNode> GetNode();
182 
183  protected:
184  virtual void RemoteClose();
185 
186  PipeEndpointBase(const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0,
187  bool unreliable = false, MemberDefinition_Direction direction = MemberDefinition_Direction_both);
188 
189  bool unreliable;
190  MemberDefinition_Direction direction;
191 
192  bool RequestPacketAck;
193 
194  void AsyncSendPacketBase(const RR_INTRUSIVE_PTR<RRValue>& packet,
195  RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
196  handler);
197 
198  RR_INTRUSIVE_PTR<RRValue> ReceivePacketBase();
199  RR_INTRUSIVE_PTR<RRValue> PeekPacketBase();
200 
201  RR_INTRUSIVE_PTR<RRValue> ReceivePacketBaseWait(int32_t timeout = RR_TIMEOUT_INFINITE);
202  RR_INTRUSIVE_PTR<RRValue> PeekPacketBaseWait(int32_t timeout = RR_TIMEOUT_INFINITE);
203 
204  bool TryReceivePacketBaseWait(RR_INTRUSIVE_PTR<RRValue>& packet, int32_t timeout = RR_TIMEOUT_INFINITE,
205  bool peek = false);
206 
207  boost::mutex sendlock;
208  boost::mutex recvlock;
209 
210  std::deque<RR_INTRUSIVE_PTR<RRValue> > recv_packets;
211  boost::condition_variable recv_packets_wait;
212 
213  uint32_t increment_packet_number(uint32_t packetnum);
214 
215  void PipePacketReceived(const RR_INTRUSIVE_PTR<RRValue>& packet, uint32_t packetnum);
216 
217  void PipePacketAckReceived(uint32_t packetnum);
218 
219  void Shutdown();
220 
221  virtual void fire_PipeEndpointClosedCallback() = 0;
222 
223  virtual void fire_PacketReceivedEvent() = 0;
224 
225  virtual void fire_PacketAckReceivedEvent(uint32_t packetnum) = 0;
226 
227  RR_SHARED_PTR<PipeBase> GetParent();
228 
229  bool closed;
230 
231  uint32_t send_packet_number;
232  uint32_t recv_packet_number;
233 
234  RR_WEAK_PTR<PipeBase> parent;
235  int32_t index;
236  uint32_t endpoint;
237  std::string service_path;
238  std::string member_name;
239 
240  RR_UNORDERED_MAP<uint32_t, RR_INTRUSIVE_PTR<RRValue> > out_of_order_packets;
241 
242  bool ignore_incoming_packets;
243 
244  boost::mutex listeners_lock;
245  std::list<RR_WEAK_PTR<PipeEndpointBaseListener> > listeners;
246 
247  detail::async_signal_semaphore pipe_packet_received_semaphore;
248 
249  RR_WEAK_PTR<RobotRaconteurNode> node;
250 };
251 
280 template <typename T>
282 {
283  private:
284  boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> PipeEndpointClosedCallback;
285  boost::mutex PipeEndpointClosedCallback_lock;
286 
287  public:
293  boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> GetPipeEndpointClosedCallback()
294  {
295  boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
296  return PipeEndpointClosedCallback;
297  }
298 
309  void SetPipeEndpointClosedCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> callback)
310  {
311  boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
312  PipeEndpointClosedCallback = callback;
313  }
314 
321  boost::signals2::signal<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> PacketReceivedEvent;
322 
332  boost::signals2::signal<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, uint32_t)> PacketAckReceivedEvent;
333 
345  virtual uint32_t SendPacket(typename boost::call_traits<T>::param_type packet)
346  {
347  ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
348 
349  RR_SHARED_PTR<detail::sync_async_handler<uint32_t> > t =
350  RR_MAKE_SHARED<detail::sync_async_handler<uint32_t> >();
351  boost::function<void(const RR_SHARED_PTR<uint32_t>&, const RR_SHARED_PTR<RobotRaconteurException>&)> h =
352  boost::bind(&detail::sync_async_handler<uint32_t>::operator(), t, RR_BOOST_PLACEHOLDERS(_1),
353  RR_BOOST_PLACEHOLDERS(_2));
355  packet, boost::bind(&PipeEndpoint::send_handler, RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2), h));
356  return *t->end();
357  }
358 
367  virtual void AsyncSendPacket(typename boost::call_traits<T>::param_type packet,
368  boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)> handler)
369  {
370  AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
371  }
372 
382  virtual T ReceivePacket() { return RRPrimUtil<T>::PreUnpack(ReceivePacketBase()); }
383 
393  virtual T PeekNextPacket() { return RRPrimUtil<T>::PreUnpack(PeekPacketBase()); }
394 
403  virtual T ReceivePacketWait(int32_t timeout = RR_TIMEOUT_INFINITE)
404  {
405  return RRPrimUtil<T>::PreUnpack(ReceivePacketBaseWait(timeout));
406  }
407 
416  virtual T PeekNextPacketWait(int32_t timeout = RR_TIMEOUT_INFINITE)
417  {
418  return RRPrimUtil<T>::PreUnpack(PeekPacketBaseWait(timeout));
419  }
420 
436  virtual bool TryReceivePacketWait(T& val, int32_t timeout = RR_TIMEOUT_INFINITE, bool peek = false)
437  {
438  RR_INTRUSIVE_PTR<RRValue> o;
439  if (!TryReceivePacketBaseWait(o, timeout, peek))
440  return false;
441  val = RRPrimUtil<T>::PreUnpack(o);
442  return true;
443  }
444 
445  PipeEndpoint(const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0, bool unreliable = false,
446  MemberDefinition_Direction direction = MemberDefinition_Direction_both)
447  : PipeEndpointBase(parent, index, endpoint, unreliable, direction){};
448 
449  protected:
450  RR_OVIRTUAL void fire_PipeEndpointClosedCallback() RR_OVERRIDE
451  {
452  boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> c = GetPipeEndpointClosedCallback();
453  if (!c)
454  return;
455  c(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()));
456  }
457 
458  RR_OVIRTUAL void fire_PacketReceivedEvent() RR_OVERRIDE
459  {
460  PacketReceivedEvent(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()));
461  }
462 
463  RR_OVIRTUAL void fire_PacketAckReceivedEvent(uint32_t packetnum) RR_OVERRIDE
464  {
465  PacketAckReceivedEvent(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()), packetnum);
466  }
467 
468  static void send_handler(uint32_t packetnumber, const RR_SHARED_PTR<RobotRaconteurException>& err,
469  const boost::function<void(const RR_SHARED_PTR<uint32_t>&,
470  const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
471  {
472  handler(RR_MAKE_SHARED<uint32_t>(packetnumber), err);
473  }
474 
475  public:
476  RR_OVIRTUAL void Close() RR_OVERRIDE
477  {
479  {
480 
481  boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
482  PipeEndpointClosedCallback.clear();
483  }
484  PacketReceivedEvent.disconnect_all_slots();
485  PacketAckReceivedEvent.disconnect_all_slots();
486  }
487 
488  protected:
489  virtual void AsyncClose1(const RR_SHARED_PTR<RobotRaconteurException>& err,
490  const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
491  {
492  try
493  {
494  {
495  boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
496  PipeEndpointClosedCallback.clear();
497  }
498  PacketReceivedEvent.disconnect_all_slots();
499  PacketAckReceivedEvent.disconnect_all_slots();
500  }
501  catch (std::exception&)
502  {}
503 
504  handler(err);
505  }
506 
507  public:
508  RR_OVIRTUAL void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
509  int32_t timeout = 2000) RR_OVERRIDE
510  {
512  RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()),
513  RR_BOOST_PLACEHOLDERS(_1), handler),
514  timeout);
515  }
516 
517  protected:
518  RR_OVIRTUAL void RemoteClose() RR_OVERRIDE
519  {
520  PipeEndpointBase::RemoteClose();
521  {
522  boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
523  PipeEndpointClosedCallback.clear();
524  }
525  PacketReceivedEvent.disconnect_all_slots();
526  PacketAckReceivedEvent.disconnect_all_slots();
527  }
528 };
529 
536 class ROBOTRACONTEUR_CORE_API PipeBase : public RR_ENABLE_SHARED_FROM_THIS<PipeBase>, private boost::noncopyable
537 {
538  friend class PipeEndpointBase;
539 
540  public:
541  virtual ~PipeBase() {}
542 
548  static const int32_t ANY_INDEX = -1;
549 
555  virtual std::string GetMemberName() = 0;
556 
557  virtual void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) = 0;
558 
559  virtual void Shutdown() = 0;
560 
561  virtual std::string GetServicePath() = 0;
562 
563  virtual void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint, bool remote, uint32_t ee,
564  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
565  int32_t timeout) = 0;
566 
567  protected:
568  PipeBase();
569 
570  bool unreliable;
571 
572  virtual void AsyncSendPipePacket(
573  const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber, bool requestack, uint32_t endpoint,
574  bool unreliable,
575  RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>) handler) = 0;
576 
577  bool rawelements;
578 
579  void DispatchPacketAck(const RR_INTRUSIVE_PTR<MessageElement>& me, const RR_SHARED_PTR<PipeEndpointBase>& e);
580 
581  bool DispatchPacket(const RR_INTRUSIVE_PTR<MessageElement>& me, const RR_SHARED_PTR<PipeEndpointBase>& e,
582  uint32_t& packetnumber);
583 
584  RR_INTRUSIVE_PTR<MessageElement> PackPacket(const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index,
585  uint32_t packetnumber, bool requestack);
586 
587  virtual void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
588 
589  virtual RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data)
590  {
591  return GetNode()->PackVarType(data);
592  }
593 
594  virtual RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata)
595  {
596  return GetNode()->UnpackVarType(mdata);
597  }
598 
599  RR_WEAK_PTR<RobotRaconteurNode> node;
600 
601  MemberDefinition_Direction direction;
602 
603  public:
604  RR_SHARED_PTR<RobotRaconteurNode> GetNode();
605 
616 
626  bool IsUnreliable();
627 };
628 
662 template <typename T>
663 class Pipe : public virtual PipeBase
664 {
665  public:
666  friend class PipeEndpointBase;
667 
668  Pipe(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify) { this->verify = RR_MOVE(verify); }
669 
670  RR_OVIRTUAL ~Pipe() RR_OVERRIDE {}
671 
679  virtual boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() = 0;
680 
697  virtual void SetPipeConnectCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> function) = 0;
698 
712  virtual RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) = 0;
713 
725  virtual void AsyncConnect(
726  int32_t index,
727  boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
728  handler,
729  int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
730 
741  virtual void AsyncConnect(
742  boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
743  handler,
744  int32_t timeout = RR_TIMEOUT_INFINITE)
745  {
746  AsyncConnect(-1, RR_MOVE(handler), timeout);
747  }
748 
749  RR_OVIRTUAL RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data) RR_OVERRIDE
750  {
751  if (verify)
752  {
753  verify(data);
754  }
755  return GetNode()->template PackAnyType<typename RRPrimUtil<T>::BoxedType>(data);
756  }
757 
758  RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata) RR_OVERRIDE
759  {
760  if (!verify)
761  {
762  return GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
763  }
764  else
765  {
766  RR_INTRUSIVE_PTR<RRValue> ret = GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
767  verify(ret);
768  return ret;
769  }
770  }
771 
772  protected:
773  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify;
774 };
775 
776 class ROBOTRACONTEUR_CORE_API ServiceStub;
777 
778 class ROBOTRACONTEUR_CORE_API PipeClientBase : public virtual PipeBase
779 {
780  public:
781  friend class PipeSubscriptionBase;
782  friend class detail::PipeSubscription_connection;
783 
784  RR_OVIRTUAL ~PipeClientBase() RR_OVERRIDE {}
785 
786  RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
787 
788  RR_OVIRTUAL void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
789 
790  RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
791 
792  RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint, bool remote, uint32_t ee,
793  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
794  handler,
795  int32_t timeout) RR_OVERRIDE;
796 
797  RR_SHARED_PTR<ServiceStub> GetStub();
798 
799  RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
800 
801  protected:
802  RR_OVIRTUAL void AsyncSendPipePacket(
803  const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber, bool requestack, uint32_t endpoint,
804  bool unreliable,
805  RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
806  handler) RR_OVERRIDE;
807 
808  std::string m_MemberName;
809 
810  RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > pipeendpoints;
811  boost::mutex pipeendpoints_lock;
812 
813  RR_WEAK_PTR<ServiceStub> stub;
814 
815  std::list<boost::tuple<int32_t, int32_t> > connecting_endpoints;
816  int32_t connecting_key_count;
817  RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > early_endpoints;
818  std::string service_path;
819  uint32_t endpoint;
820 
821  void AsyncConnect_internal(int32_t index,
822  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<PipeEndpointBase>&,
823  const RR_SHARED_PTR<RobotRaconteurException>&)>)
824  handler,
825  int32_t timeout);
826 
827  void AsyncConnect_internal1(const RR_INTRUSIVE_PTR<MessageEntry>& ret,
828  const RR_SHARED_PTR<RobotRaconteurException>& err, int32_t index, int32_t key,
829  boost::function<void(const RR_SHARED_PTR<PipeEndpointBase>&,
830  const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
831 
832  PipeClientBase(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub, bool unreliable,
833  MemberDefinition_Direction direction);
834 
835  virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, bool unreliable,
836  MemberDefinition_Direction direction) = 0;
837 
838  RR_OVIRTUAL void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
839 };
840 
841 template <typename T>
842 class PipeClient : public virtual Pipe<T>, public virtual PipeClientBase
843 {
844  public:
845  RR_OVIRTUAL ~PipeClient() RR_OVERRIDE {}
846 
847  RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
848  {
849  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
850  "GetPipeConnectCallback is not valid for PipeClient");
851  throw InvalidOperationException("Not valid for client");
852  }
853 
854  RR_OVIRTUAL void SetPipeConnectCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> function)
855  RR_OVERRIDE
856  {
857  RR_UNUSED(function);
858  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
859  "SetPipeConnectCallback is not valid for PipeClient");
860  throw InvalidOperationException("Not valid for client");
861  }
862 
863  RR_OVIRTUAL void AsyncConnect(
864  int32_t index,
865  boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
866  handler,
867  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
868  {
869 
870  AsyncConnect_internal(index,
871  boost::bind(handler,
872  boost::bind(&PipeClient<T>::AsyncConnect_cast, RR_BOOST_PLACEHOLDERS(_1)),
873  RR_BOOST_PLACEHOLDERS(_2)),
874  timeout);
875  }
876 
877  RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
878  {
879  ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
880 
881  RR_SHARED_PTR<detail::sync_async_handler<PipeEndpoint<T> > > t =
882  RR_MAKE_SHARED<detail::sync_async_handler<PipeEndpoint<T> > >();
883  AsyncConnect(index,
884  boost::bind(&detail::sync_async_handler<PipeEndpoint<T> >::operator(), t,
885  RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2)),
886  GetNode()->GetRequestTimeout());
887  return t->end();
888  }
889 
890  PipeClient(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub, bool unreliable = false,
891  MemberDefinition_Direction direction = MemberDefinition_Direction_both,
892  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
893  : PipeClientBase(name, stub, unreliable, direction), Pipe<T>(verify)
894  {
895  rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
896  }
897 
898  using PipeClientBase::AsyncClose;
899  using PipeClientBase::AsyncSendPipePacket;
900  using PipeClientBase::GetMemberName;
901  using PipeClientBase::PipePacketReceived;
902  using PipeClientBase::Shutdown;
903 
904  protected:
905  static RR_SHARED_PTR<PipeEndpoint<T> > AsyncConnect_cast(const RR_SHARED_PTR<PipeEndpointBase>& b)
906  {
907  return rr_cast<PipeEndpoint<T> >(b);
908  }
909 
910  RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, bool unreliable,
911  MemberDefinition_Direction direction) RR_OVERRIDE
912  {
913  return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, 0,
914  unreliable, direction);
915  }
916 };
917 
918 class ROBOTRACONTEUR_CORE_API ServiceSkel;
919 class ROBOTRACONTEUR_CORE_API PipeServerBase : public virtual PipeBase
920 {
921  public:
922  RR_OVIRTUAL ~PipeServerBase() RR_OVERRIDE {}
923 
924  RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
925 
926  RR_OVIRTUAL void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
927 
928  RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
929 
930  RR_OVIRTUAL void AsyncSendPipePacket(
931  const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber, bool requestack, uint32_t endpoint,
932  bool unreliable,
933  RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
934  handler) RR_OVERRIDE;
935 
936  RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint, bool remote, uint32_t ee,
937  RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
938  handler,
939  int32_t timeout) RR_OVERRIDE;
940 
941  virtual RR_INTRUSIVE_PTR<MessageEntry> PipeCommand(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e);
942 
943  RR_SHARED_PTR<ServiceSkel> GetSkel();
944 
945  RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
946 
947  protected:
948  std::string m_MemberName;
949  std::string service_path;
950 
951  struct pipe_endpoint_server_id
952  {
953  pipe_endpoint_server_id(uint32_t endpoint, int32_t index)
954  {
955  this->endpoint = endpoint;
956  this->index = index;
957  }
958 
959  uint32_t endpoint;
960  int32_t index;
961 
962  bool operator==(const pipe_endpoint_server_id& rhs) const
963  {
964  return (endpoint == rhs.endpoint && index == rhs.index);
965  }
966  };
967 
968  struct hash_value
969  {
970  std::size_t operator()(pipe_endpoint_server_id const& e) const
971  {
972  std::size_t seed = 0;
973  boost::hash_combine(seed, e.endpoint);
974  boost::hash_combine(seed, e.index);
975  return seed;
976  }
977  };
978 
979  RR_UNORDERED_MAP<pipe_endpoint_server_id, RR_SHARED_PTR<PipeEndpointBase>, hash_value> pipeendpoints;
980  boost::mutex pipeendpoints_lock;
981 
982  RR_WEAK_PTR<ServiceSkel> skel;
983 
984  PipeServerBase(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel, bool unreliable,
985  MemberDefinition_Direction direction);
986 
987  virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint, bool unreliable,
988  MemberDefinition_Direction direction) = 0;
989 
990  RR_OVIRTUAL void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
991 
992  virtual void fire_PipeConnectCallback(const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
993 
994  bool init;
995  boost::signals2::connection listener_connection;
996 
997  public:
998  void ClientDisconnected(const RR_SHARED_PTR<ServerContext>& context, ServerServiceListenerEventType ev,
999  const RR_SHARED_PTR<void>& param);
1000 };
1001 
1002 template <typename T>
1003 class PipeServer : public virtual PipeServerBase, public virtual Pipe<T>
1004 {
1005 
1006  public:
1007  RR_OVIRTUAL ~PipeServer() RR_OVERRIDE {}
1008 
1009  RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
1010  {
1011  return callback;
1012  }
1013 
1014  RR_OVIRTUAL void SetPipeConnectCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> function)
1015  RR_OVERRIDE
1016  {
1017  callback = function;
1018  }
1019 
1020  RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
1021  {
1022  RR_UNUSED(index);
1023  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1024  "Connect is not valid for PipeServer");
1025  throw InvalidOperationException("Not valid for server");
1026  }
1027 
1028  RR_OVIRTUAL void AsyncConnect(
1029  int32_t index,
1030  boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
1031  handler,
1032  int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1033  {
1034  RR_UNUSED(index);
1035  RR_UNUSED(handler);
1036  RR_UNUSED(timeout);
1037  ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1038  "AsyncConnect is not valid for PipeServer");
1039  throw InvalidOperationException("Not valid for server");
1040  }
1041 
1042  PipeServer(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel, bool unreliable = false,
1043  MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1044  boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1045  : PipeServerBase(name, skel, unreliable, direction), Pipe<T>(verify)
1046  {
1047  rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1048  }
1049 
1050  protected:
1051  RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint, bool unreliable,
1052  MemberDefinition_Direction direction) RR_OVERRIDE
1053  {
1054  return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, endpoint,
1055  unreliable, direction);
1056  }
1057 
1058  boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> callback;
1059 
1060  RR_OVIRTUAL void fire_PipeConnectCallback(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE
1061  {
1062  if (!callback)
1063  return;
1064  callback(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(e));
1065  }
1066 
1067  public:
1068  RR_OVIRTUAL void Shutdown() RR_OVERRIDE
1069  {
1070  PipeServerBase::Shutdown();
1071 
1072  callback.clear();
1073  }
1074 };
1075 
1076 namespace detail
1077 {
1078 class PipeBroadcasterBase_connected_endpoint;
1079 struct PipeBroadcasterBase_async_send_operation;
1080 } // namespace detail
1081 
1088 class ROBOTRACONTEUR_CORE_API PipeBroadcasterBase : public RR_ENABLE_SHARED_FROM_THIS<PipeBroadcasterBase>,
1089  private boost::noncopyable
1090 {
1091  public:
1092  virtual ~PipeBroadcasterBase();
1093 
1094  size_t GetActivePipeEndpointCount();
1095 
1102  boost::function<bool(const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> GetPredicate();
1103 
1122  void SetPredicate(boost::function<bool(const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> f);
1123 
1129  int32_t GetMaxBacklog();
1130 
1140  void SetMaxBacklog(int32_t maximum_backlog);
1141 
1142  protected:
1144 
1145  void InitBase(const RR_SHARED_PTR<PipeBase>& pipe, int32_t maximum_backlog = -1);
1146 
1147  void EndpointConnectedBase(const RR_SHARED_PTR<PipeEndpointBase>& ep);
1148 
1149  void EndpointClosedBase(const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep);
1150 
1151  void PacketAckReceivedBase(const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep, uint32_t id);
1152 
1153  void handle_send(int32_t id, const RR_SHARED_PTR<RobotRaconteurException>& err,
1154  const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep,
1155  const RR_SHARED_PTR<detail::PipeBroadcasterBase_async_send_operation>& op, int32_t key,
1156  int32_t send_key, const boost::function<void()>& handler);
1157 
1158  void SendPacketBase(const RR_INTRUSIVE_PTR<RRValue>& packet);
1159 
1160  void AsyncSendPacketBase(const RR_INTRUSIVE_PTR<RRValue>& packet, RR_MOVE_ARG(boost::function<void()>) handler);
1161 
1162  virtual void AttachPipeServerEvents(const RR_SHARED_PTR<PipeServerBase>& p);
1163 
1164  virtual void AttachPipeEndpointEvents(const RR_SHARED_PTR<PipeEndpointBase>& p,
1165  const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep);
1166 
1167  RR_SHARED_PTR<PipeBase> GetPipeBase();
1168 
1169  std::list<RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint> > endpoints;
1170  boost::mutex endpoints_lock;
1171 
1172  RR_WEAK_PTR<PipeServerBase> pipe;
1173  RR_WEAK_PTR<RobotRaconteurNode> node;
1174  int32_t maximum_backlog;
1175  std::string service_path;
1176  std::string member_name;
1177 
1178  bool copy_element;
1179 
1180  boost::function<bool(const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> predicate;
1181 };
1182 
1214 template <typename T>
1216 {
1217 
1218  public:
1227 
1237  void Init(RR_SHARED_PTR<Pipe<T> > pipe, int32_t maximum_backlog = -1) { InitBase(pipe, maximum_backlog); }
1238 
1246  void SendPacket(T packet) { SendPacketBase(RRPrimUtil<T>::PrePack(packet)); }
1247 
1256  void AsyncSendPacket(T packet, boost::function<void()> handler)
1257  {
1258  AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
1259  }
1260 
1266  RR_SHARED_PTR<Pipe<T> > GetPipe() { return rr_cast<Pipe<T> >(GetPipeBase()); }
1267 
1268  protected:
1269  RR_OVIRTUAL void AttachPipeServerEvents(const RR_SHARED_PTR<PipeServerBase>& p) RR_OVERRIDE
1270  {
1271  RR_SHARED_PTR<PipeServer<T> > p_T = rr_cast<PipeServer<T> >(p);
1272 
1273  p_T->SetPipeConnectCallback(
1274  boost::bind(&PipeBroadcaster::EndpointConnectedBase, shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1275  }
1276 
1277  RR_OVIRTUAL void AttachPipeEndpointEvents(const RR_SHARED_PTR<PipeEndpointBase>& ep,
1278  const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep)
1279  RR_OVERRIDE
1280  {
1281  RR_SHARED_PTR<PipeEndpoint<T> > ep_T = rr_cast<PipeEndpoint<T> >(ep);
1282 
1283  ep_T->SetPipeEndpointClosedCallback(boost::bind(&PipeBroadcaster::EndpointClosedBase, shared_from_this(), cep));
1284  ep_T->PacketAckReceivedEvent.connect(
1285  boost::bind(&PipeBroadcaster::PacketAckReceivedBase, shared_from_this(), cep, RR_BOOST_PLACEHOLDERS(_2)));
1286  }
1287 };
1288 #ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
1290 using PipeEndpointBasePtr = RR_SHARED_PTR<PipeEndpointBase>;
1292 template <typename T>
1293 using PipeEndpointPtr = RR_SHARED_PTR<PipeEndpoint<T> >;
1295 using PipeBasePtr = RR_SHARED_PTR<PipeBase>;
1297 template <typename T>
1298 using PipePtr = RR_SHARED_PTR<Pipe<T> >;
1300 template <typename T>
1301 using PipeBroadcasterPtr = RR_SHARED_PTR<PipeBroadcaster<T> >;
1302 #endif
1303 } // namespace RobotRaconteur
1304 
1305 #pragma warning(pop)
boost::shared_ptr< PipeBase > PipeBasePtr
Convenience alias for PipeBase shared_ptr.
Definition: PipeMember.h:1295
boost::shared_ptr< PipeBroadcaster< T > > PipeBroadcasterPtr
Convenience alias for PipeBroadcaster shared_ptr.
Definition: PipeMember.h:1301
boost::shared_ptr< Pipe< T > > PipePtr
Convenience alias for Pipe shared_ptr.
Definition: PipeMember.h:1298
boost::shared_ptr< PipeEndpointBase > PipeEndpointBasePtr
Convenience alias for PipeEndpointBase shared_ptr.
Definition: PipeMember.h:1290
boost::shared_ptr< PipeEndpoint< T > > PipeEndpointPtr
Convenience alias for PipeEndpoint shared_ptr.
Definition: PipeMember.h:1293
MemberDefinition_Direction
Member direction enum.
Definition: RobotRaconteurConstants.h:534
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition: RobotRaconteurConstants.h:566
Base class for Pipe.
Definition: PipeMember.h:537
virtual std::string GetMemberName()=0
Get the member name of the pipe.
bool IsUnreliable()
Get if pipe is declared unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
Base class for PipeBroadcaster.
Definition: PipeMember.h:1090
void SetPredicate(boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> f)
Set the predicate callback function.
void SetMaxBacklog(int32_t maximum_backlog)
Set the maximum backlog.
boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> GetPredicate()
Get the current predicate callback function.
int32_t GetMaxBacklog()
Gets the currently configured maximum backlog.
Broadcaster to send packets to all connected clients.
Definition: PipeMember.h:1216
void Init(boost::shared_ptr< Pipe< T > > pipe, int32_t maximum_backlog=-1)
Initialize the PipeBroadcaster.
Definition: PipeMember.h:1237
void AsyncSendPacket(T packet, boost::function< void()> handler)
Asynchronously send packet to all connected pipe endpoint clients.
Definition: PipeMember.h:1256
void SendPacket(T packet)
Send a packet to all connected pipe endpoint clients.
Definition: PipeMember.h:1246
PipeBroadcaster()
Construct a new PipeBroadcaster.
Definition: PipeMember.h:1226
boost::shared_ptr< Pipe< T > > GetPipe()
Get the assosciated pipe.
Definition: PipeMember.h:1266
Base class for PipeEndpoint.
Definition: PipeMember.h:59
bool GetRequestPacketAck()
Get if pipe endpoint is requesting acks.
bool IsUnreliable()
Get if pipe endpoint is unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
void SetRequestPacketAck(bool ack)
Set if pipe endpoint should request packet acks.
virtual int32_t GetIndex()
Returns the pipe endpoint index used when endpoint connected.
void SetIgnoreReceived(bool ignore)
Set whether pipe endpoint should ignore incoming packets.
virtual void Close()
Close the pipe endpoint.
bool GetIgnoreReceived()
Get if pipe endpoint is ignoring incoming packets.
virtual void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously close the pipe endpoint.
virtual uint32_t GetEndpoint()
Returns the Robot Raconteur node Endpoint ID.
virtual size_t Available()
Return number of packets in the receive queue.
Pipe endpoint used to transmit reliable or unreliable data streams.
Definition: PipeMember.h:282
virtual T PeekNextPacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Peek the next packet in the receive queue, block if queue is empty.
Definition: PipeMember.h:416
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &, uint32_t)> PacketAckReceivedEvent
Signal called when a packet ack has been received.
Definition: PipeMember.h:332
RR_OVIRTUAL void Close() RR_OVERRIDE
Close the pipe endpoint.
Definition: PipeMember.h:476
boost::function< void(boost::shared_ptr< PipeEndpoint< T > >)> GetPipeEndpointClosedCallback()
Get the currently configured endpoint closed callback function.
Definition: PipeMember.h:293
virtual void AsyncSendPacket(typename boost::call_traits< T >::param_type packet, boost::function< void(uint32_t, const boost::shared_ptr< RobotRaconteurException > &)> handler)
Send a packet to the peer endpoint asynchronously.
Definition: PipeMember.h:367
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &)> PacketReceivedEvent
Signal called when a packet has been received.
Definition: PipeMember.h:321
virtual T ReceivePacket()
Receive the next packet in the receive queue.
Definition: PipeMember.h:382
void SetPipeEndpointClosedCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> callback)
Set the endpoint closed callback function.
Definition: PipeMember.h:309
RR_OVIRTUAL void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=2000) RR_OVERRIDE
Asynchronously close the pipe endpoint.
Definition: PipeMember.h:508
virtual bool TryReceivePacketWait(T &val, int32_t timeout=RR_TIMEOUT_INFINITE, bool peek=false)
Try receiving a packet, optionally blocking if the queue is empty.
Definition: PipeMember.h:436
virtual T ReceivePacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Receive the next packet in the receive queue, block if queue is empty.
Definition: PipeMember.h:403
virtual uint32_t SendPacket(typename boost::call_traits< T >::param_type packet)
Sends a packet to the peer endpoint.
Definition: PipeMember.h:345
virtual T PeekNextPacket()
Peeks the next packet in the receive queue.
Definition: PipeMember.h:393
pipe member type interface
Definition: PipeMember.h:664
virtual void AsyncConnect(int32_t index, boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously connect a pipe endpoint.
virtual boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> GetPipeConnectCallback()=0
Get the currently configured pipe endpoint connected callback function.
virtual void SetPipeConnectCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> function)=0
Set the pipe endpoint connected callback function.
virtual boost::shared_ptr< PipeEndpoint< T > > Connect(int32_t index)=0
Connect a pipe endpoint.
virtual void AsyncConnect(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously connect a pipe endpoint.
Definition: PipeMember.h:741
Base class for PipeSubscription.
Definition: Subscription.h:1337