Run DataFlow and send data to a ZMQ socket addr. It will serialize and send each datapoint to this address with a PUSH socket. This function never returns. Args: df (DataFlow): Will infinitely loop over the DataFlow. addr: a ZMQ socket endpoint. hwm (int): Z
(df, addr, hwm=50, format=None, bind=False)
| 23 | |
| 24 | |
| 25 | def send_dataflow_zmq(df, addr, hwm=50, format=None, bind=False): |
| 26 | """ |
| 27 | Run DataFlow and send data to a ZMQ socket addr. |
| 28 | It will serialize and send each datapoint to this address with a PUSH socket. |
| 29 | This function never returns. |
| 30 | |
| 31 | Args: |
| 32 | df (DataFlow): Will infinitely loop over the DataFlow. |
| 33 | addr: a ZMQ socket endpoint. |
| 34 | hwm (int): ZMQ high-water mark (buffer size) |
| 35 | format (str): The serialization format. |
| 36 | Default format uses :mod:`utils.serialize`. |
| 37 | This format works with :class:`dataflow.RemoteDataZMQ`. |
| 38 | An alternate format is 'zmq_ops', used by https://github.com/tensorpack/zmq_ops |
| 39 | and :class:`input_source.ZMQInput`. |
| 40 | bind (bool): whether to bind or connect to the endpoint address. |
| 41 | """ |
| 42 | assert format in [None, 'zmq_op', 'zmq_ops'] |
| 43 | if format is None: |
| 44 | dump_fn = dumps |
| 45 | else: |
| 46 | from zmq_ops import dump_arrays |
| 47 | dump_fn = dump_arrays |
| 48 | |
| 49 | ctx = zmq.Context() |
| 50 | socket = ctx.socket(zmq.PUSH) |
| 51 | socket.set_hwm(hwm) |
| 52 | if bind: |
| 53 | socket.bind(addr) |
| 54 | else: |
| 55 | socket.connect(addr) |
| 56 | try: |
| 57 | df.reset_state() |
| 58 | logger.info("Serving data to {} with {} format ...".format( |
| 59 | addr, 'default' if format is None else 'zmq_ops')) |
| 60 | INTERVAL = 200 |
| 61 | q = deque(maxlen=INTERVAL) |
| 62 | |
| 63 | try: |
| 64 | total = len(df) |
| 65 | except NotImplementedError: |
| 66 | total = 0 |
| 67 | tqdm_args = get_tqdm_kwargs(leave=True, smoothing=0.8) |
| 68 | tqdm_args['bar_format'] = tqdm_args['bar_format'] + "{postfix}" |
| 69 | while True: |
| 70 | with tqdm.trange(total, **tqdm_args) as pbar: |
| 71 | for dp in df: |
| 72 | start = time.time() |
| 73 | socket.send(dump_fn(dp), copy=False) |
| 74 | q.append(time.time() - start) |
| 75 | pbar.update(1) |
| 76 | if pbar.n % INTERVAL == 0: |
| 77 | avg = "{:.3f}".format(sum(q) / len(q)) |
| 78 | pbar.set_postfix({'AvgSendLat': avg}) |
| 79 | finally: |
| 80 | logger.info("Exiting send_dataflow_zmq ...") |
| 81 | socket.setsockopt(zmq.LINGER, 0) |
| 82 | socket.close() |
no test coverage detected