Gather/Scatter operations
In many situations it is necessary to query data from dozens or hundreds of sensors. Thanks to the asynchronous
functionality discussed in Async this is possible and is limited only by memory and
network bandwidth. An example of a practical application of gather/scatter operations is that of an advanced lighting
system that needs to rapidly query large numbers of sensors, make control decisions, and then distribute updated
commands to a large number of lighting fixtures. Consider a list of connections that have already been connected and are
stored in a variable c_list
. Each connection is to a service that has the function ReadSensor()
that returns
some important data. The following example will query each sensor concurrently and call the
handler when all the sensors have been queried.
global_err=[]
global_data=[]
ev=threading.Event()
def read_finished(data, err):
# "data" now contains a list of data
# "err" contains a list with each element containing "None" or the exception that occurred for that read
# Store data in global variables
global global_err, global_data
global_err=err
global_data=data
# Notify "main()" that the read is complete
ev.set()
# c_list contains a list of connections created with RobotRaconteur.s.ConnectService
def start_read(c_list, handler, timeout):
N=len(c_list}
keys=[]
keys_lock=threading.Lock()
ret=[None]*N
err=[None]*N
def h(key, d, erri):
done=False
with keys_lock:
if (erri is not None):
err[key]=erri
else:
ret[key]=d
keys.remove(key)
if (len(keys)==0): done=True
if (done):
handler(ret,err)
with keys_lock:
for i in xrange(N):
try:
c_list[i].async_ReadSensor(functools.partial(h,i),timeout)
keys.append(i)
except Exception as erri:
err[i]=erri
if (len(keys)==0):
raise Exception("Could not read any sensors")
def main()
# Create all the c_list connections here
# Start the read with a 100 ms timeout
start_read(c_list,read_finished,0.1)
# Wait for completion
ev.wait()
# Do something with the results
print global_data
print global_err