(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=None, final_outputs=None)
| 312 | |
| 313 | |
| 314 | def _pipe_step( |
| 315 | input, output=None, num_threads=1, processor=None, name=None, |
| 316 | capacity=None, group=None, num_runtime_threads=None, final_outputs=None): |
| 317 | """ |
| 318 | """ |
| 319 | assert num_threads <= 1 or num_runtime_threads <= 1, ( |
| 320 | 'Only one of num_threads or num_runtime_threads must be set.') |
| 321 | |
| 322 | if isinstance(input, Reader): |
| 323 | reader = input |
| 324 | elif hasattr(input, 'reader'): |
| 325 | reader = input.reader() |
| 326 | else: |
| 327 | raise ValueError( |
| 328 | 'Input must be a reader, queue or stream. Got {}'.format(type(input))) |
| 329 | |
| 330 | if processor is not None: |
| 331 | reader = ProcessingReader(reader, processor) |
| 332 | |
| 333 | if num_threads == 0 or num_runtime_threads == 0: |
| 334 | assert output is None |
| 335 | return reader, None |
| 336 | |
| 337 | if name is None and processor is not None: |
| 338 | name = processor_name(processor) |
| 339 | if name is None and output is not None: |
| 340 | name = 'pipe_into:%s' % processor_name(output) |
| 341 | if name is None: |
| 342 | name = 'pipe_from:%s' % processor_name(input) |
| 343 | |
| 344 | if num_threads > 1: |
| 345 | return _static_threads_task( |
| 346 | name, group, final_outputs, reader, num_threads, output, capacity) |
| 347 | else: |
| 348 | return _runtime_threads_task( |
| 349 | name, group, final_outputs, reader, num_runtime_threads, output, |
| 350 | capacity) |
| 351 | |
| 352 | |
| 353 | class ProcessingReader(Reader): |
no test coverage detected
searching dependent graphs…