(name, group, final_outputs, reader, num_threads,
output, capacity)
| 187 | |
| 188 | |
| 189 | def _runtime_threads_task(name, group, final_outputs, reader, num_threads, |
| 190 | output, capacity): |
| 191 | node_name = str(Node.current()) |
| 192 | profiler_name = "{0}/{1}/{2}/{3}/{4}".format( |
| 193 | node_name, |
| 194 | "pipe", |
| 195 | name, |
| 196 | processor_name(input) if input else "NoInput", |
| 197 | processor_name(output) if output else "NoOutput") |
| 198 | |
| 199 | with Task(name=name, group=group, outputs=final_outputs, |
| 200 | num_instances=num_threads) as task: |
| 201 | global_exit_net = core.Net('pipe:exit') |
| 202 | global_init_net = core.Net('pipe:init') |
| 203 | reader.setup_ex(global_init_net, global_exit_net) |
| 204 | |
| 205 | init_net = core.Net('pipe:instance:init') |
| 206 | exit_net = core.Net('pipe:instance:exit') |
| 207 | read_nets, status, rec = reader.read_record_ex(init_net, exit_net) |
| 208 | init_net.ConstantFill( |
| 209 | [], [status], |
| 210 | shape=[], |
| 211 | value=False, |
| 212 | dtype=core.DataType.BOOL |
| 213 | ) |
| 214 | |
| 215 | if rec is not None: |
| 216 | out_queue, writer = _init_output( |
| 217 | output, capacity, global_init_net, global_exit_net) |
| 218 | write_nets, _ = writer.write_record_ex( |
| 219 | rec, init_net, exit_net, status) |
| 220 | else: |
| 221 | out_queue = None |
| 222 | write_nets = [] |
| 223 | |
| 224 | with ops.task_init(): |
| 225 | ops.net(global_init_net) |
| 226 | with ops.task_instance_init(): |
| 227 | ops.net(init_net) |
| 228 | |
| 229 | timer_start_net = core.Net('timer_start') |
| 230 | timer = timer_start_net.TimerBegin([], counter_name=profiler_name) |
| 231 | timer_end_net = core.Net('timer_end') |
| 232 | timer_end_net.TimerEnd(timer, []) |
| 233 | |
| 234 | ops.net(core.execution_step( |
| 235 | 'body', |
| 236 | [timer_start_net] + list(read_nets) + list(write_nets) + |
| 237 | [timer_end_net], |
| 238 | should_stop_blob=status)) |
| 239 | ops.net(timer_end_net) |
| 240 | |
| 241 | with ops.task_instance_exit(): |
| 242 | ops.net(exit_net) |
| 243 | with ops.task_exit(): |
| 244 | ops.net(global_exit_net) |
| 245 | |
| 246 | return out_queue, task |
no test coverage detected
searching dependent graphs…