MCPcopy Index your code
hub / github.com/tensorpack/tensorpack / send_dataflow_zmq

Function send_dataflow_zmq

tensorpack/dataflow/remote.py:25–84  ·  view source on GitHub ↗

Run DataFlow and send data to a ZMQ socket addr. It will serialize and send each datapoint to this address with a PUSH socket. This function never returns. Args: df (DataFlow): Will infinitely loop over the DataFlow. addr: a ZMQ socket endpoint. hwm (int): Z

(df, addr, hwm=50, format=None, bind=False)

Source from the content-addressed store, hash-verified

23
24
25def send_dataflow_zmq(df, addr, hwm=50, format=None, bind=False):
26 """
27 Run DataFlow and send data to a ZMQ socket addr.
28 It will serialize and send each datapoint to this address with a PUSH socket.
29 This function never returns.
30
31 Args:
32 df (DataFlow): Will infinitely loop over the DataFlow.
33 addr: a ZMQ socket endpoint.
34 hwm (int): ZMQ high-water mark (buffer size)
35 format (str): The serialization format.
36 Default format uses :mod:`utils.serialize`.
37 This format works with :class:`dataflow.RemoteDataZMQ`.
38 An alternate format is 'zmq_ops', used by https://github.com/tensorpack/zmq_ops
39 and :class:`input_source.ZMQInput`.
40 bind (bool): whether to bind or connect to the endpoint address.
41 """
42 assert format in [None, 'zmq_op', 'zmq_ops']
43 if format is None:
44 dump_fn = dumps
45 else:
46 from zmq_ops import dump_arrays
47 dump_fn = dump_arrays
48
49 ctx = zmq.Context()
50 socket = ctx.socket(zmq.PUSH)
51 socket.set_hwm(hwm)
52 if bind:
53 socket.bind(addr)
54 else:
55 socket.connect(addr)
56 try:
57 df.reset_state()
58 logger.info("Serving data to {} with {} format ...".format(
59 addr, 'default' if format is None else 'zmq_ops'))
60 INTERVAL = 200
61 q = deque(maxlen=INTERVAL)
62
63 try:
64 total = len(df)
65 except NotImplementedError:
66 total = 0
67 tqdm_args = get_tqdm_kwargs(leave=True, smoothing=0.8)
68 tqdm_args['bar_format'] = tqdm_args['bar_format'] + "{postfix}"
69 while True:
70 with tqdm.trange(total, **tqdm_args) as pbar:
71 for dp in df:
72 start = time.time()
73 socket.send(dump_fn(dp), copy=False)
74 q.append(time.time() - start)
75 pbar.update(1)
76 if pbar.n % INTERVAL == 0:
77 avg = "{:.3f}".format(sum(q) / len(q))
78 pbar.set_postfix({'AvgSendLat': avg})
79 finally:
80 logger.info("Exiting send_dataflow_zmq ...")
81 socket.setsockopt(zmq.LINGER, 0)
82 socket.close()

Callers 1

remote.pyFile · 0.85

Calls 5

get_tqdm_kwargsFunction · 0.85
formatMethod · 0.80
appendMethod · 0.80
updateMethod · 0.80
reset_stateMethod · 0.45

Tested by

no test coverage detected