Start more processors if we have enough slots and files to process
(self)
| 990 | self.log.debug("%s file paths queued for processing", len(self._file_path_queue)) |
| 991 | |
| 992 | def start_new_processes(self): |
| 993 | """Start more processors if we have enough slots and files to process""" |
| 994 | while self._parallelism - len(self._processors) > 0 and self._file_path_queue: |
| 995 | file_path = self._file_path_queue.pop(0) |
| 996 | # Stop creating duplicate processor i.e. processor with the same filepath |
| 997 | if file_path in self._processors.keys(): |
| 998 | continue |
| 999 | |
| 1000 | callback_to_execute_for_file = self._callback_to_execute[file_path] |
| 1001 | processor = self._processor_factory( |
| 1002 | file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags |
| 1003 | ) |
| 1004 | |
| 1005 | del self._callback_to_execute[file_path] |
| 1006 | Stats.incr('dag_processing.processes') |
| 1007 | |
| 1008 | processor.start() |
| 1009 | self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path) |
| 1010 | self._processors[file_path] = processor |
| 1011 | self.waitables[processor.waitable_handle] = processor |
| 1012 | |
| 1013 | def prepare_file_path_queue(self): |
| 1014 | """Generate more file paths to process. Result are saved in _file_path_queue.""" |