Pipe Member

Pipe Class

class RobotRaconteur.Pipe

“pipe” member type interface

The Pipe class implements the “pipe” member type. Pipes are declared in service definition files using the “pipe” keyword within object declarations. Pipes provide reliable packet streaming between clients and services. They work by creating pipe endpoint pairs (peers), with one endpoint in the client, and one in the service. Packets are transmitted between endpoint pairs. Packets sent by one endpoint are received by the other, where they are placed in a receive queue. Received packets can then be retrieved from the receive queue.

Pipe endpoints are created by the client using the Connect() or AsyncConnect() functions. Services receive incoming connection requests through a callback function. This callback is configured using the PipeConnectCallback property. Services may also use the PipeBroadcaster class to automate managing pipe endpoint lifecycles and sending packets to all connected client endpoints. If the PipeConnectCallback function is used, the service is responsible for keeping track of endpoints as the connect and disconnect. See PipeEndpoint for details on sending and receiving packets.

Pipe endpoints are indexed, meaning that more than one endpoint pair can be created between the client and the service.

Pipes may be unreliable, meaning that packets may arrive out of order or be dropped. Use IsUnreliable to check for unreliable pipes. The member modifier unreliable is used to specify that a pipe should be unreliable.

Pipes may be declared readonly or writeonly. If neither is specified, the pipe is assumed to be full duplex. readonly pipes may only send packets from service to client. writeonly pipes may only send packets from client to service. Use Direction to determine the direction of the pipe.

The PipeBroadcaster is often used to simplify the use of Pipes. See PipeBroadcaster for more information.

This class is instantiated by the node. It should not be instantiated by the user.

AsyncConnect(index, handler, timeout=-1)

Asynchronously connect a pipe endpoint.

Same as Connect(), but returns asynchronously.

Only valid on clients. Will throw InvalidOperationException on the service side.

If handler is None, returns an awaitable future.

Parameters:
  • index (int) – The index of the pipe endpoint, or -1 to automatically select an index

  • handler (Callable[[PipeEndpoint,Exception],None]) – A handler function to receive the connected endpoint, or an exception

  • timeout – Timeout in seconds, or -1 for no timeout

Connect(index)

Connect a pipe endpoint

Creates a connected pipe endpoint pair, and returns the local endpoint. Use to create the streaming data connection to the service. Pipe endpoints are indexed, meaning that Connect() may be called multiple times for the same client connection to create multple pipe endpoint pairs. For most cases Pipe.ANY_INDEX (-1) can be used to automatically select an available index.

Only valid on clients. Will throw InvalidOperationException on the service side.

Parameters:

index (int) – The index of the pipe endpoint, or -1 to automatically select an index

Returns:

The connected pipe endpoint

Return type:

RobotRaconteur.PipeEndpoint

property Direction

The direction of the pipe

Pipes may be declared “readonly” or “writeonly” in the service definition file. (If neither is specified, the pipe is assumed to be full duplex.) “readonly” pipes may only send packets from service to client. “writeonly” pipes may only send packets from client to service.

See MemberDefinition_Direction constants for possible return values.

Return type:

int

property MemberName

Get the member name of the pipe

Return type:

str

property PipeConnectCallback

Set the pipe endpoint connected callback function

Callback function invoked when a client attempts to connect a pipe endpoint. The callback will receive the incoming pipe endpoint as a parameter. The service must maintain a reference to the pipe endpoint, but the pipe will retain ownership of the endpoint until it is closed.

The callback may throw an exception to reject incoming connect request.

Note: Connect callback is configured automatically by PipeBroadcaster

Only valid for services. Will throw InvalidOperationException on the client side.

Return type:

Callable[[PipeEndpoint],None]

PipeEndpoint Class

class RobotRaconteur.PipeEndpoint

Pipe endpoint used to transmit reliable or unreliable data streams

Pipe endpoints are used to communicate data between connected pipe members. See Pipe for more information on pipe members.

Pipe endpoints are created by clients using the Pipe.Connect() or Pipe.AsyncConnect() functions. Services receive incoming pipe endpoint connection requests through a callback function specified using the Pipe.PipeConnectCallback property. Services may also use the PipeBroadcaster class to automate managing pipe endpoint lifecycles and sending packets to all connected client endpoints.

Pipe endpoints are indexed, meaning that more than one pipe endpoint pair can be created using the same member. This means that multiple data streams can be created independent of each other between the client and service using the same member.

Pipes send reliable packet streams between connected client/service endpoint pairs. Packets are sent using the SendPacket() or AsyncSendPacket() functions. Packets are read from the receive queue using the ReceivePacket(), ReceivePacketWait(), TryReceivePacketWait(), TryReceivePacketWait(), or PeekNextPacket(). The endpoint is closed using the Close() or AsyncClose() function.

This class is instantiated by the Pipe class. It should not be instantiated by the user.

AsyncClose(handler, timeout=2)

Asynchronously close the pipe endpoint

Same as Close() but returns asynchronously

If handler is None, returns an awaitable future.

Parameters:
  • handler (Callable[[Exception],None]) – A handler function to call on completion, possibly with an exception

  • timeout (float) – Timeout in seconds, or -1 for no timeout

AsyncSendPacket(packet, handler)

Send a packet to the peer endpoint asynchronously

Same as SendPacket(), but returns asynchronously.

If handler is None, returns an awaitable future.

Parameters:
  • packet – The packet to send

  • handler (Callable[[Exception],None]) – A handler function to receive the sent packet number or an exception

property Available

Return number of packets in the receive queue

Invalid for writeonly pipes.

Return type:

int

Close()

Close the pipe endpoint

Close the pipe endpoint. Blocks until close complete. The peer endpoint is destroyed automatically.

property Direction

The direction of the pipe

Pipes may be declared “readonly” or “writeonly” in the service definition file. (If neither is specified, the pipe is assumed to be full duplex.) “readonly” pipes may only send packets from service to client. “writeonly” pipes may only send packets from client to service.

See MemberDefinition_Direction constants for possible return values.

Return type:

int

property Endpoint

the endpoint associated with the ClientContext or ServerEndpoint associated with the pipe endpoint.

Return type:

int

property IgnoreReceived

Set if pipe endpoint is ignoring incoming packets

If true, pipe endpoint is ignoring incoming packets and is not adding incoming packets to the receive queue.

Return type:

bool

property Index

The pipe endpoint index used when endpoint connected

Return type:

int

property IsUnreliable

Get if pipe endpoint is unreliable

Pipe members may be declared as unreliable using member modifiers in the service definition. Pipes confirm unreliable operation when pipe endpoints are connected.

Return type:

bool

property PacketAckReceivedEvent

Event hook for received packets. Use to add handlers to be called when packets are received by the endpoint.

def my_ack_handler(ep, packet_num):
   # Do something with packet_num info
   pass

my_endpoint.PacketAckReceivedEvent += my_ack_handler

Handler must have signature Callable[[RobotRaconteur.PipeEndpoint,T],None]

property PacketReceivedEvent

Event hook for received packets. Use to add handlers to be called when packets are received by the endpoint.

def my_handler(ep):
   # Receive packets
   while ep.Available > 0:
       packet = ep.ReceivePacket()
       # Do something with packet

my_endpoint.PacketReceivedEvent += my_handler

Handler must have signature Callable[[RobotRaconteur.PipeEndpoint],None]

PeekNextPacket()

Peeks the next packet in the receive queue

Returns the first packet in the receive queue, but does not remove it from the queue. Throws an InvalidOperationException if there are no packets in the receive queue.

Returns:

The next packet in the receive queue

PeekNextPacketWait(timeout=-1)

Peek the next packet in the receive queue, block if queue is empty

Same as PeekPacket(), but blocks if queue is empty

Parameters:

timeout – Timeout in seconds to wait for a packet, or -1 for infinite

Returns:

The received packet

PipeEndpointClosedCallback

(Callable[[RobotRaconteur.PipeEndpoint],None]) The function to invoke when the pipe endpoint has been closed.

ReceivePacket()

Receive the next packet in the receive queue

Receive the next packet from the receive queue. This function will throw an InvalidOperationException if there are no packets in the receive queue. Use ReceivePacketWait() to block until a packet has been received.

Returns:

The received packet

ReceivePacketWait(timeout=-1)

Receive the next packet in the receive queue, block if queue is empty

Same as ReceivePacket(), but blocks if queue is empty

Parameters:

timeout (float) – Timeout in seconds to wait for a packet, or -1 for infinite

Returns:

The received packet

property RequestPacketAck

Get if pipe endpoint should request packet acks

Packet acks are generated by receiving endpoints to inform the sender that a packet has been received. The ack contains the packet index, the sequence number of the packet. Packet acks are used for flow control by PipeBroadcaster.

Return type:

bool

SendPacket(packet)

Sends a packet to the peer endpoint

Sends a packet to the peer endpoint. If the pipe is reliable, the packetsare guaranteed to arrive in order. If the pipe is set to unreliable, “best effort” is made to deliver packets, and they are not guaranteed to arrive in order. This function will block until the packet has been transmitted by the transport. It will return before the peer endpoint has received the packet.

Parameters:

packet – The packet to send

Returns:

The packet number of the sent packet

Return type:

int

TryReceivePacketWait(timeout=-1, peek=False)

Try receiving a packet, optionally blocking if the queue is empty

Try receiving a packet with various options. Returns True if a packet has been received, or False if no packet is available instead of throwing an exception on failure. The timeout and peek parameters can be used to modify behavior to provide functionality similar to the various Receive and Peek functions.

Parameters:
  • timeout (float) – The timeout in seconds. Set to zero for non-blocking operation, an arbitrary value in seconds for a finite duration timeout, or -1 for infinite

  • peek (bool) – If true, the packet is not removed from the receive queue

Returns:

Tuple of success and received packet

Return type:

Tuple[bool,T]

PipeBroadcaster Class

class RobotRaconteur.PipeBroadcaster(pipe, maximum_backlog=-1)

Broadcaster to send packets to all connected clients

PipeBroadcaster is used by services to send packets to all connected client endpoints. It attaches to the pipe on the service side, and manages the lifecycle of connected endpoints. PipeBroadcaster should only be used with pipes that are declared readonly, since it has no provisions for receiving incoming packets from the client.

PipeBroadcaster is initialized by the user, or by default if the service object has no attribute for readonly pipes. If default an attribute is specified for the pipe not used, the broadcaster must be instantiated manually. It is recommended this be done using the RRServiceObjectInit() function. If present in the service object, function is called after the pipes have been instantiated by the service.

Use SendPacket() or AsyncSendPacket() to broadcast packets to all connected clients.

PipeBroadcaster provides flow control by optionally tracking how many packets are in flight to each client pipe endpoint. (This is accomplished using packet acks.) If a maximum backlog is specified, pipe endpoints exceeding this count will stop sending packets. Specify the maximum backlog on construction or the MaxBacklog property.

The rate that packets are sent can be regulated using a callback function configured with the SetPredicate() function, or using the BroadcastDownsampler class.

Parameters:
  • pipe (RobotRaconteur.Pipe) – The pipe to use for broadcasting. Must be a pipe from a service object. Specifying a client pipe will result in an exception.

  • maximum_backlog (int) – The maximum number of packets in flight, or -1 for unlimited

property ActivePipeEndpointCount

Get the number of active pipe endpoints

AsyncSendPacket(packet, handler)

Asynchronously send packet to all connected pipe endpoint clients

Asynchronous version of SendPacket()

Parameters:
  • packet – The packet to send

  • handler (Callable[[],None]) – A handler function for when packet has been sent by all endpoints

property MaxBacklog

Set the maximum backlog

PipeBroadcaster provides flow control by optionally tracking how many packets are in flight to each client pipe endpoint. (This is accomplished using packet acks.) If a maximum backlog is specified, pipe endpoints exceeding this count will stop sending packets.

Set -1 for no flow control.

Return type:

int

SendPacket(packet)

Send a packet to all connected pipe endpoint clients

Blocks until packet has been sent by all endpoints

Parameters:

packet – The packet to send

SetPredicate(f)

Set the predicate callback function

A predicate is optionally used to regulate when packets are sent to clients. This is used by the BroadcastDownsampler to regulate update rates of packets sent to clients.

The predicate callback is invoked before the broadcaster sends a packet to an endpoint. If the predicate returns true, the packet will be sent. If it is false, the packet will not be sent to that endpoint.

The predicate receives the broadcaster, the client endpoint ID, and the pipe endpoint index. It returns true to send the packet, or false to not send the packet.

Parameters:

f (Callable[[RobotRaconteur.PipeBroadcaster,int,int],bool]) – The predicate