Build the producer_step to feed data from reader into the queue, and return the reader interface. Inputs: reader: read data which will be stored in the queue. process: preprocess data before enqueue. Outputs: rea
(self, reader, process=None)
| 78 | exit_net) |
| 79 | |
| 80 | def build(self, reader, process=None): |
| 81 | """ |
| 82 | Build the producer_step to feed data from reader into the queue, and |
| 83 | return the reader interface. |
| 84 | Inputs: |
| 85 | reader: read data which will be stored in the queue. |
| 86 | process: preprocess data before enqueue. |
| 87 | Outputs: |
| 88 | reader: reader to fetch the data from the queue. |
| 89 | producer_step: the step insert the data into the queue. Should be |
| 90 | run with comsume_step together. |
| 91 | exit_step: the step to close queue |
| 92 | schema: the schema for the reader. |
| 93 | """ |
| 94 | producer_steps = [] |
| 95 | for i in range(self.num_threads): |
| 96 | name = 'reader_' + str(i) |
| 97 | net_reader = core.Net(name) |
| 98 | should_stop, fields = reader.read_record(net_reader) |
| 99 | step_read = core.execution_step(name, net_reader) |
| 100 | |
| 101 | name = 'queue_writer' + str(i) |
| 102 | net_prod = core.Net(name) |
| 103 | field_blobs = fields.field_blobs() |
| 104 | if process: |
| 105 | field_blobs = process(net_prod, fields).field_blobs() |
| 106 | |
| 107 | self.writer.write(net_prod, field_blobs) |
| 108 | step_prod = core.execution_step(name, net_prod) |
| 109 | step = core.execution_step( |
| 110 | 'producer_' + str(i), |
| 111 | [step_read, step_prod], |
| 112 | should_stop_blob=should_stop) |
| 113 | producer_steps.append(step) |
| 114 | producer_step = core.execution_step( |
| 115 | 'producers', |
| 116 | producer_steps, |
| 117 | concurrent_substeps=True) |
| 118 | return self.reader, producer_step, self.exit_step, self.schema |