Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
(self)
| 223 | self._last_parsing_stat_received_at: float = time.monotonic() |
| 224 | |
| 225 | def start(self) -> None: |
| 226 | """Launch DagFileProcessorManager processor and start DAG parsing loop in manager.""" |
| 227 | mp_start_method = self._get_multiprocessing_start_method() |
| 228 | context = multiprocessing.get_context(mp_start_method) |
| 229 | self._last_parsing_stat_received_at = time.monotonic() |
| 230 | |
| 231 | self._parent_signal_conn, child_signal_conn = context.Pipe() |
| 232 | process = context.Process( |
| 233 | target=type(self)._run_processor_manager, |
| 234 | args=( |
| 235 | self._dag_directory, |
| 236 | self._max_runs, |
| 237 | # getattr prevents error while pickling an instance method. |
| 238 | getattr(self, "_processor_factory"), |
| 239 | self._processor_timeout, |
| 240 | child_signal_conn, |
| 241 | self._dag_ids, |
| 242 | self._pickle_dags, |
| 243 | self._async_mode, |
| 244 | ), |
| 245 | ) |
| 246 | self._process = process |
| 247 | |
| 248 | process.start() |
| 249 | |
| 250 | self.log.info("Launched DagFileProcessorManager with pid: %s", process.pid) |
| 251 | |
| 252 | def run_single_parsing_loop(self) -> None: |
| 253 | """ |