30 #include <boost/function.hpp>
31 #include <boost/unordered_map.hpp>
32 #include <boost/call_traits.hpp>
36 #pragma warning(disable : 4250)
37 #pragma warning(disable : 4996)
39 #include <boost/signals2.hpp>
41 namespace RobotRaconteur
44 class ROBOTRACONTEUR_CORE_API PipeBase;
45 class ROBOTRACONTEUR_CORE_API PipeEndpointBaseListener;
48 class PipeSubscription_connection;
57 class ROBOTRACONTEUR_CORE_API
PipeEndpointBase :
public RR_ENABLE_SHARED_FROM_THIS<PipeEndpointBase>,
58 private boost::noncopyable
61 friend class PipeClientBase;
62 friend class PipeServerBase;
65 friend class detail::PipeSubscription_connection;
123 virtual void AsyncClose(boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
179 virtual void AddListener(
const RR_SHARED_PTR<PipeEndpointBaseListener>& listener);
181 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
184 virtual void RemoteClose();
186 PipeEndpointBase(
const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0,
192 bool RequestPacketAck;
194 void AsyncSendPacketBase(
const RR_INTRUSIVE_PTR<RRValue>& packet,
195 RR_MOVE_ARG(boost::function<
void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)>)
198 RR_INTRUSIVE_PTR<RRValue> ReceivePacketBase();
199 RR_INTRUSIVE_PTR<RRValue> PeekPacketBase();
204 bool TryReceivePacketBaseWait(RR_INTRUSIVE_PTR<RRValue>& packet, int32_t timeout =
RR_TIMEOUT_INFINITE,
207 boost::mutex sendlock;
208 boost::mutex recvlock;
210 std::deque<RR_INTRUSIVE_PTR<RRValue> > recv_packets;
211 boost::condition_variable recv_packets_wait;
213 uint32_t increment_packet_number(uint32_t packetnum);
215 void PipePacketReceived(
const RR_INTRUSIVE_PTR<RRValue>& packet, uint32_t packetnum);
217 void PipePacketAckReceived(uint32_t packetnum);
221 virtual void fire_PipeEndpointClosedCallback() = 0;
223 virtual void fire_PacketReceivedEvent() = 0;
225 virtual void fire_PacketAckReceivedEvent(uint32_t packetnum) = 0;
227 RR_SHARED_PTR<PipeBase> GetParent();
231 uint32_t send_packet_number;
232 uint32_t recv_packet_number;
234 RR_WEAK_PTR<PipeBase> parent;
237 std::string service_path;
238 std::string member_name;
240 RR_UNORDERED_MAP<uint32_t, RR_INTRUSIVE_PTR<RRValue> > out_of_order_packets;
242 bool ignore_incoming_packets;
244 boost::mutex listeners_lock;
245 std::list<RR_WEAK_PTR<PipeEndpointBaseListener> > listeners;
247 detail::async_signal_semaphore pipe_packet_received_semaphore;
249 RR_WEAK_PTR<RobotRaconteurNode> node;
280 template <
typename T>
284 boost::function<void(RR_SHARED_PTR<
PipeEndpoint<T> >)> PipeEndpointClosedCallback;
285 boost::mutex PipeEndpointClosedCallback_lock;
295 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
296 return PipeEndpointClosedCallback;
311 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
312 PipeEndpointClosedCallback = callback;
345 virtual uint32_t
SendPacket(
typename boost::call_traits<T>::param_type packet)
347 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
349 RR_SHARED_PTR<detail::sync_async_handler<uint32_t> > t =
350 RR_MAKE_SHARED<detail::sync_async_handler<uint32_t> >();
351 boost::function<void(
const RR_SHARED_PTR<uint32_t>&,
const RR_SHARED_PTR<RobotRaconteurException>&)> h =
352 boost::bind(&detail::sync_async_handler<uint32_t>::operator(), t, RR_BOOST_PLACEHOLDERS(_1),
353 RR_BOOST_PLACEHOLDERS(_2));
355 packet, boost::bind(&PipeEndpoint::send_handler, RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2), h));
368 boost::function<
void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)> handler)
370 AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
382 virtual T
ReceivePacket() {
return RRPrimUtil<T>::PreUnpack(ReceivePacketBase()); }
393 virtual T
PeekNextPacket() {
return RRPrimUtil<T>::PreUnpack(PeekPacketBase()); }
405 return RRPrimUtil<T>::PreUnpack(ReceivePacketBaseWait(timeout));
418 return RRPrimUtil<T>::PreUnpack(PeekPacketBaseWait(timeout));
438 RR_INTRUSIVE_PTR<RRValue> o;
439 if (!TryReceivePacketBaseWait(o, timeout, peek))
441 val = RRPrimUtil<T>::PreUnpack(o);
445 PipeEndpoint(
const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0,
bool unreliable =
false,
450 RR_OVIRTUAL
void fire_PipeEndpointClosedCallback() RR_OVERRIDE
455 c(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()));
458 RR_OVIRTUAL
void fire_PacketReceivedEvent() RR_OVERRIDE
463 RR_OVIRTUAL
void fire_PacketAckReceivedEvent(uint32_t packetnum) RR_OVERRIDE
468 static void send_handler(uint32_t packetnumber,
const RR_SHARED_PTR<RobotRaconteurException>& err,
469 const boost::function<
void(
const RR_SHARED_PTR<uint32_t>&,
470 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
472 handler(RR_MAKE_SHARED<uint32_t>(packetnumber), err);
476 RR_OVIRTUAL
void Close() RR_OVERRIDE
481 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
482 PipeEndpointClosedCallback.clear();
489 virtual void AsyncClose1(
const RR_SHARED_PTR<RobotRaconteurException>& err,
490 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
495 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
496 PipeEndpointClosedCallback.clear();
501 catch (std::exception&)
508 RR_OVIRTUAL
void AsyncClose(boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
509 int32_t timeout = 2000) RR_OVERRIDE
513 RR_BOOST_PLACEHOLDERS(_1), handler),
518 RR_OVIRTUAL
void RemoteClose() RR_OVERRIDE
520 PipeEndpointBase::RemoteClose();
522 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
523 PipeEndpointClosedCallback.clear();
536 class ROBOTRACONTEUR_CORE_API
PipeBase :
public RR_ENABLE_SHARED_FROM_THIS<PipeBase>,
private boost::noncopyable
548 static const int32_t ANY_INDEX = -1;
557 virtual void PipePacketReceived(
const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) = 0;
559 virtual void Shutdown() = 0;
561 virtual std::string GetServicePath() = 0;
563 virtual void AsyncClose(
const RR_SHARED_PTR<PipeEndpointBase>& endpoint,
bool remote, uint32_t ee,
564 RR_MOVE_ARG(boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
565 int32_t timeout) = 0;
572 virtual void AsyncSendPipePacket(
573 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber,
bool requestack, uint32_t endpoint,
575 RR_MOVE_ARG(boost::function<
void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)>) handler) = 0;
579 void DispatchPacketAck(
const RR_INTRUSIVE_PTR<MessageElement>& me,
const RR_SHARED_PTR<PipeEndpointBase>& e);
581 bool DispatchPacket(
const RR_INTRUSIVE_PTR<MessageElement>& me,
const RR_SHARED_PTR<PipeEndpointBase>& e,
582 uint32_t& packetnumber);
584 RR_INTRUSIVE_PTR<MessageElement> PackPacket(
const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index,
585 uint32_t packetnumber,
bool requestack);
587 virtual void DeleteEndpoint(
const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
589 virtual RR_INTRUSIVE_PTR<MessageElementData> PackData(
const RR_INTRUSIVE_PTR<RRValue>& data)
591 return GetNode()->PackVarType(data);
594 virtual RR_INTRUSIVE_PTR<RRValue> UnpackData(
const RR_INTRUSIVE_PTR<MessageElement>& mdata)
596 return GetNode()->UnpackVarType(mdata);
599 RR_WEAK_PTR<RobotRaconteurNode> node;
601 MemberDefinition_Direction direction;
604 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
662 template <
typename T>
668 Pipe(boost::function<
void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify) { this->verify = RR_MOVE(verify); }
670 RR_OVIRTUAL ~
Pipe() RR_OVERRIDE {}
712 virtual RR_SHARED_PTR<PipeEndpoint<T> >
Connect(int32_t index) = 0;
727 boost::function<
void(
const RR_SHARED_PTR<
PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
742 boost::function<
void(
const RR_SHARED_PTR<
PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
749 RR_OVIRTUAL RR_INTRUSIVE_PTR<MessageElementData> PackData(
const RR_INTRUSIVE_PTR<RRValue>& data) RR_OVERRIDE
755 return GetNode()->template PackAnyType<typename RRPrimUtil<T>::BoxedType>(data);
758 RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> UnpackData(
const RR_INTRUSIVE_PTR<MessageElement>& mdata) RR_OVERRIDE
762 return GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
766 RR_INTRUSIVE_PTR<RRValue> ret = GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
773 boost::function<void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify;
776 class ROBOTRACONTEUR_CORE_API ServiceStub;
778 class ROBOTRACONTEUR_CORE_API PipeClientBase :
public virtual PipeBase
781 friend class PipeSubscriptionBase;
782 friend class detail::PipeSubscription_connection;
784 RR_OVIRTUAL ~PipeClientBase() RR_OVERRIDE {}
786 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
788 RR_OVIRTUAL
void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
790 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE;
792 RR_OVIRTUAL
void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint,
bool remote, uint32_t ee,
793 RR_MOVE_ARG(boost::function<
void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
795 int32_t timeout) RR_OVERRIDE;
797 RR_SHARED_PTR<ServiceStub> GetStub();
799 RR_OVIRTUAL std::
string GetServicePath() RR_OVERRIDE;
802 RR_OVIRTUAL
void AsyncSendPipePacket(
803 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber,
bool requestack, uint32_t endpoint,
805 RR_MOVE_ARG(boost::function<
void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
806 handler) RR_OVERRIDE;
808 std::
string m_MemberName;
810 RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > pipeendpoints;
811 boost::mutex pipeendpoints_lock;
813 RR_WEAK_PTR<ServiceStub> stub;
815 std::list<boost::tuple<int32_t, int32_t> > connecting_endpoints;
816 int32_t connecting_key_count;
817 RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > early_endpoints;
818 std::
string service_path;
821 void AsyncConnect_internal(int32_t index,
822 RR_MOVE_ARG(boost::function<
void(const RR_SHARED_PTR<PipeEndpointBase>&,
823 const RR_SHARED_PTR<RobotRaconteurException>&)>)
827 void AsyncConnect_internal1(const RR_INTRUSIVE_PTR<MessageEntry>& ret,
828 const RR_SHARED_PTR<RobotRaconteurException>& err, int32_t index, int32_t key,
829 boost::function<
void(const RR_SHARED_PTR<PipeEndpointBase>&,
830 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
832 PipeClientBase(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub,
bool unreliable,
833 MemberDefinition_Direction direction);
835 virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index,
bool unreliable,
836 MemberDefinition_Direction direction) = 0;
838 RR_OVIRTUAL
void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
841 template <typename T>
842 class PipeClient : public virtual Pipe<T>, public virtual PipeClientBase
845 RR_OVIRTUAL ~PipeClient() RR_OVERRIDE {}
847 RR_OVIRTUAL boost::function<void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
849 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
850 "GetPipeConnectCallback is not valid for PipeClient");
851 throw InvalidOperationException(
"Not valid for client");
854 RR_OVIRTUAL
void SetPipeConnectCallback(boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)>
function)
858 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
859 "SetPipeConnectCallback is not valid for PipeClient");
860 throw InvalidOperationException(
"Not valid for client");
863 RR_OVIRTUAL
void AsyncConnect(
865 boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
870 AsyncConnect_internal(index,
872 boost::bind(&PipeClient<T>::AsyncConnect_cast, RR_BOOST_PLACEHOLDERS(_1)),
873 RR_BOOST_PLACEHOLDERS(_2)),
877 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
879 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
881 RR_SHARED_PTR<detail::sync_async_handler<PipeEndpoint<T> > > t =
882 RR_MAKE_SHARED<detail::sync_async_handler<PipeEndpoint<T> > >();
884 boost::bind(&detail::sync_async_handler<PipeEndpoint<T> >::
operator(), t,
885 RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2)),
886 GetNode()->GetRequestTimeout());
890 PipeClient(boost::string_ref name,
const RR_SHARED_PTR<ServiceStub>& stub,
bool unreliable =
false,
891 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
892 boost::function<
void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
893 : PipeClientBase(name, stub, unreliable, direction), Pipe<T>(verify)
895 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
898 using PipeClientBase::AsyncClose;
899 using PipeClientBase::AsyncSendPipePacket;
900 using PipeClientBase::GetMemberName;
901 using PipeClientBase::PipePacketReceived;
902 using PipeClientBase::Shutdown;
905 static RR_SHARED_PTR<PipeEndpoint<T> > AsyncConnect_cast(
const RR_SHARED_PTR<PipeEndpointBase>& b)
907 return rr_cast<PipeEndpoint<T> >(b);
910 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index,
bool unreliable,
911 MemberDefinition_Direction direction) RR_OVERRIDE
913 return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, 0,
914 unreliable, direction);
918 class ROBOTRACONTEUR_CORE_API ServiceSkel;
919 class ROBOTRACONTEUR_CORE_API PipeServerBase :
public virtual PipeBase
922 RR_OVIRTUAL ~PipeServerBase() RR_OVERRIDE {}
924 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
926 RR_OVIRTUAL
void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
928 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE;
930 RR_OVIRTUAL
void AsyncSendPipePacket(
931 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber,
bool requestack, uint32_t endpoint,
933 RR_MOVE_ARG(boost::function<
void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
934 handler) RR_OVERRIDE;
936 RR_OVIRTUAL
void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint,
bool remote, uint32_t ee,
937 RR_MOVE_ARG(boost::function<
void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
939 int32_t timeout) RR_OVERRIDE;
941 virtual RR_INTRUSIVE_PTR<MessageEntry> PipeCommand(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e);
943 RR_SHARED_PTR<ServiceSkel> GetSkel();
945 RR_OVIRTUAL std::
string GetServicePath() RR_OVERRIDE;
948 std::
string m_MemberName;
949 std::
string service_path;
951 struct pipe_endpoint_server_id
953 pipe_endpoint_server_id(uint32_t endpoint, int32_t index)
955 this->endpoint = endpoint;
962 bool operator==(
const pipe_endpoint_server_id& rhs)
const
964 return (endpoint == rhs.endpoint && index == rhs.index);
970 std::size_t operator()(pipe_endpoint_server_id
const& e)
const
972 std::size_t seed = 0;
973 boost::hash_combine(seed, e.endpoint);
974 boost::hash_combine(seed, e.index);
979 RR_UNORDERED_MAP<pipe_endpoint_server_id, RR_SHARED_PTR<PipeEndpointBase>, hash_value> pipeendpoints;
980 boost::mutex pipeendpoints_lock;
982 RR_WEAK_PTR<ServiceSkel> skel;
984 PipeServerBase(boost::string_ref name,
const RR_SHARED_PTR<ServiceSkel>& skel,
bool unreliable,
985 MemberDefinition_Direction direction);
987 virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint,
bool unreliable,
988 MemberDefinition_Direction direction) = 0;
990 RR_OVIRTUAL
void DeleteEndpoint(
const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
992 virtual void fire_PipeConnectCallback(
const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
995 boost::signals2::connection listener_connection;
998 void ClientDisconnected(
const RR_SHARED_PTR<ServerContext>& context, ServerServiceListenerEventType ev,
999 const RR_SHARED_PTR<void>& param);
1002 template <
typename T>
1003 class PipeServer :
public virtual PipeServerBase,
public virtual Pipe<T>
1007 RR_OVIRTUAL ~PipeServer() RR_OVERRIDE {}
1009 RR_OVIRTUAL boost::function<void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
1014 RR_OVIRTUAL
void SetPipeConnectCallback(boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)>
function)
1017 callback =
function;
1020 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
1023 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1024 "Connect is not valid for PipeServer");
1025 throw InvalidOperationException(
"Not valid for server");
1028 RR_OVIRTUAL
void AsyncConnect(
1030 boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
1037 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1038 "AsyncConnect is not valid for PipeServer");
1039 throw InvalidOperationException(
"Not valid for server");
1042 PipeServer(boost::string_ref name,
const RR_SHARED_PTR<ServiceSkel>& skel,
bool unreliable =
false,
1043 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1044 boost::function<
void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1045 : PipeServerBase(name, skel, unreliable, direction), Pipe<T>(verify)
1047 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1051 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint,
bool unreliable,
1052 MemberDefinition_Direction direction) RR_OVERRIDE
1054 return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, endpoint,
1055 unreliable, direction);
1058 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> callback;
1060 RR_OVIRTUAL
void fire_PipeConnectCallback(
const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE
1064 callback(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(e));
1068 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE
1070 PipeServerBase::Shutdown();
1078 class PipeBroadcasterBase_connected_endpoint;
1079 struct PipeBroadcasterBase_async_send_operation;
1089 private boost::noncopyable
1094 size_t GetActivePipeEndpointCount();
1102 boost::function<bool(
const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)>
GetPredicate();
1122 void SetPredicate(boost::function<
bool(
const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> f);
1145 void InitBase(
const RR_SHARED_PTR<PipeBase>& pipe, int32_t maximum_backlog = -1);
1147 void EndpointConnectedBase(
const RR_SHARED_PTR<PipeEndpointBase>& ep);
1149 void EndpointClosedBase(
const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep);
1151 void PacketAckReceivedBase(
const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep, uint32_t
id);
1153 void handle_send(int32_t
id,
const RR_SHARED_PTR<RobotRaconteurException>& err,
1154 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep,
1155 const RR_SHARED_PTR<detail::PipeBroadcasterBase_async_send_operation>& op, int32_t key,
1156 int32_t send_key,
const boost::function<
void()>& handler);
1158 void SendPacketBase(
const RR_INTRUSIVE_PTR<RRValue>& packet);
1160 void AsyncSendPacketBase(
const RR_INTRUSIVE_PTR<RRValue>& packet, RR_MOVE_ARG(boost::function<
void()>) handler);
1162 virtual void AttachPipeServerEvents(
const RR_SHARED_PTR<PipeServerBase>& p);
1164 virtual void AttachPipeEndpointEvents(
const RR_SHARED_PTR<PipeEndpointBase>& p,
1165 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep);
1167 RR_SHARED_PTR<PipeBase> GetPipeBase();
1169 std::list<RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint> > endpoints;
1170 boost::mutex endpoints_lock;
1172 RR_WEAK_PTR<PipeServerBase> pipe;
1173 RR_WEAK_PTR<RobotRaconteurNode> node;
1174 int32_t maximum_backlog;
1175 std::string service_path;
1176 std::string member_name;
1180 boost::function<bool(
const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> predicate;
1214 template <
typename T>
1237 void Init(RR_SHARED_PTR<
Pipe<T> > pipe, int32_t maximum_backlog = -1) { InitBase(pipe, maximum_backlog); }
1246 void SendPacket(T packet) { SendPacketBase(RRPrimUtil<T>::PrePack(packet)); }
1258 AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
1266 RR_SHARED_PTR<Pipe<T> >
GetPipe() {
return rr_cast<Pipe<T> >(GetPipeBase()); }
1269 RR_OVIRTUAL
void AttachPipeServerEvents(
const RR_SHARED_PTR<PipeServerBase>& p) RR_OVERRIDE
1271 RR_SHARED_PTR<PipeServer<T> > p_T = rr_cast<PipeServer<T> >(p);
1273 p_T->SetPipeConnectCallback(
1274 boost::bind(&PipeBroadcaster::EndpointConnectedBase, shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1277 RR_OVIRTUAL
void AttachPipeEndpointEvents(
const RR_SHARED_PTR<PipeEndpointBase>& ep,
1278 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep)
1281 RR_SHARED_PTR<PipeEndpoint<T> > ep_T = rr_cast<PipeEndpoint<T> >(ep);
1283 ep_T->SetPipeEndpointClosedCallback(boost::bind(&PipeBroadcaster::EndpointClosedBase, shared_from_this(), cep));
1284 ep_T->PacketAckReceivedEvent.connect(
1285 boost::bind(&PipeBroadcaster::PacketAckReceivedBase, shared_from_this(), cep, RR_BOOST_PLACEHOLDERS(_2)));
1288 #ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
1292 template <
typename T>
1297 template <
typename T>
1300 template <
typename T>
1305 #pragma warning(pop)
boost::shared_ptr< PipeBase > PipeBasePtr
Convenience alias for PipeBase shared_ptr.
Definition: PipeMember.h:1295
boost::shared_ptr< PipeBroadcaster< T > > PipeBroadcasterPtr
Convenience alias for PipeBroadcaster shared_ptr.
Definition: PipeMember.h:1301
boost::shared_ptr< Pipe< T > > PipePtr
Convenience alias for Pipe shared_ptr.
Definition: PipeMember.h:1298
boost::shared_ptr< PipeEndpointBase > PipeEndpointBasePtr
Convenience alias for PipeEndpointBase shared_ptr.
Definition: PipeMember.h:1290
boost::shared_ptr< PipeEndpoint< T > > PipeEndpointPtr
Convenience alias for PipeEndpoint shared_ptr.
Definition: PipeMember.h:1293
MemberDefinition_Direction
Member direction enum.
Definition: RobotRaconteurConstants.h:534
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition: RobotRaconteurConstants.h:566
Base class for Pipe.
Definition: PipeMember.h:537
virtual std::string GetMemberName()=0
Get the member name of the pipe.
bool IsUnreliable()
Get if pipe is declared unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
Base class for PipeBroadcaster.
Definition: PipeMember.h:1090
void SetPredicate(boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> f)
Set the predicate callback function.
void SetMaxBacklog(int32_t maximum_backlog)
Set the maximum backlog.
boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> GetPredicate()
Get the current predicate callback function.
int32_t GetMaxBacklog()
Gets the currently configured maximum backlog.
Broadcaster to send packets to all connected clients.
Definition: PipeMember.h:1216
void Init(boost::shared_ptr< Pipe< T > > pipe, int32_t maximum_backlog=-1)
Initialize the PipeBroadcaster.
Definition: PipeMember.h:1237
void AsyncSendPacket(T packet, boost::function< void()> handler)
Asynchronously send packet to all connected pipe endpoint clients.
Definition: PipeMember.h:1256
void SendPacket(T packet)
Send a packet to all connected pipe endpoint clients.
Definition: PipeMember.h:1246
PipeBroadcaster()
Construct a new PipeBroadcaster.
Definition: PipeMember.h:1226
boost::shared_ptr< Pipe< T > > GetPipe()
Get the assosciated pipe.
Definition: PipeMember.h:1266
Base class for PipeEndpoint.
Definition: PipeMember.h:59
bool GetRequestPacketAck()
Get if pipe endpoint is requesting acks.
bool IsUnreliable()
Get if pipe endpoint is unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
void SetRequestPacketAck(bool ack)
Set if pipe endpoint should request packet acks.
virtual int32_t GetIndex()
Returns the pipe endpoint index used when endpoint connected.
void SetIgnoreReceived(bool ignore)
Set whether pipe endpoint should ignore incoming packets.
virtual void Close()
Close the pipe endpoint.
bool GetIgnoreReceived()
Get if pipe endpoint is ignoring incoming packets.
virtual void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously close the pipe endpoint.
virtual uint32_t GetEndpoint()
Returns the Robot Raconteur node Endpoint ID.
virtual size_t Available()
Return number of packets in the receive queue.
Pipe endpoint used to transmit reliable or unreliable data streams.
Definition: PipeMember.h:282
virtual T PeekNextPacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Peek the next packet in the receive queue, block if queue is empty.
Definition: PipeMember.h:416
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &, uint32_t)> PacketAckReceivedEvent
Signal called when a packet ack has been received.
Definition: PipeMember.h:332
RR_OVIRTUAL void Close() RR_OVERRIDE
Close the pipe endpoint.
Definition: PipeMember.h:476
boost::function< void(boost::shared_ptr< PipeEndpoint< T > >)> GetPipeEndpointClosedCallback()
Get the currently configured endpoint closed callback function.
Definition: PipeMember.h:293
virtual void AsyncSendPacket(typename boost::call_traits< T >::param_type packet, boost::function< void(uint32_t, const boost::shared_ptr< RobotRaconteurException > &)> handler)
Send a packet to the peer endpoint asynchronously.
Definition: PipeMember.h:367
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &)> PacketReceivedEvent
Signal called when a packet has been received.
Definition: PipeMember.h:321
virtual T ReceivePacket()
Receive the next packet in the receive queue.
Definition: PipeMember.h:382
void SetPipeEndpointClosedCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> callback)
Set the endpoint closed callback function.
Definition: PipeMember.h:309
RR_OVIRTUAL void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=2000) RR_OVERRIDE
Asynchronously close the pipe endpoint.
Definition: PipeMember.h:508
virtual bool TryReceivePacketWait(T &val, int32_t timeout=RR_TIMEOUT_INFINITE, bool peek=false)
Try receiving a packet, optionally blocking if the queue is empty.
Definition: PipeMember.h:436
virtual T ReceivePacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Receive the next packet in the receive queue, block if queue is empty.
Definition: PipeMember.h:403
virtual uint32_t SendPacket(typename boost::call_traits< T >::param_type packet)
Sends a packet to the peer endpoint.
Definition: PipeMember.h:345
virtual T PeekNextPacket()
Peeks the next packet in the receive queue.
Definition: PipeMember.h:393
pipe member type interface
Definition: PipeMember.h:664
virtual void AsyncConnect(int32_t index, boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously connect a pipe endpoint.
virtual boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> GetPipeConnectCallback()=0
Get the currently configured pipe endpoint connected callback function.
virtual void SetPipeConnectCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> function)=0
Set the pipe endpoint connected callback function.
virtual boost::shared_ptr< PipeEndpoint< T > > Connect(int32_t index)=0
Connect a pipe endpoint.
virtual void AsyncConnect(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously connect a pipe endpoint.
Definition: PipeMember.h:741
Base class for PipeSubscription.
Definition: Subscription.h:1337