MCPcopy
hub / github.com/apache/airflow / start_new_processes

Method start_new_processes

airflow/utils/dag_processing.py:992–1011  ·  view source on GitHub ↗

Start more processors if we have enough slots and files to process

(self)

Source from the content-addressed store, hash-verified

990 self.log.debug("%s file paths queued for processing", len(self._file_path_queue))
991
992 def start_new_processes(self):
993 """Start more processors if we have enough slots and files to process"""
994 while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
995 file_path = self._file_path_queue.pop(0)
996 # Stop creating duplicate processor i.e. processor with the same filepath
997 if file_path in self._processors.keys():
998 continue
999
1000 callback_to_execute_for_file = self._callback_to_execute[file_path]
1001 processor = self._processor_factory(
1002 file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags
1003 )
1004
1005 del self._callback_to_execute[file_path]
1006 Stats.incr('dag_processing.processes')
1007
1008 processor.start()
1009 self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path)
1010 self._processors[file_path] = processor
1011 self.waitables[processor.waitable_handle] = processor
1012
1013 def prepare_file_path_queue(self):
1014 """Generate more file paths to process. Result are saved in _file_path_queue."""

Calls 3

_processor_factoryMethod · 0.80
incrMethod · 0.45
startMethod · 0.45