MCPcopy
hub / github.com/apache/airflow / _run_parsing_loop

Method _run_parsing_loop

airflow/utils/dag_processing.py:598–711  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

596 return self._run_parsing_loop()
597
598 def _run_parsing_loop(self):
599
600 # In sync mode we want timeout=None -- wait forever until a message is received
601 if self._async_mode:
602 poll_time = 0.0
603 else:
604 poll_time = None
605
606 self._refresh_dag_dir()
607 self.prepare_file_path_queue()
608
609 if self._async_mode:
610 # If we're in async mode, we can start up straight away. If we're
611 # in sync mode we need to be told to start a "loop"
612 self.start_new_processes()
613
614 while True:
615 loop_start_time = time.monotonic()
616
617 # pylint: disable=no-else-break
618 ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
619 if self._signal_conn in ready:
620 agent_signal = self._signal_conn.recv()
621 self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
622 if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
623 self.terminate()
624 break
625 elif agent_signal == DagParsingSignal.END_MANAGER:
626 self.end()
627 sys.exit(os.EX_OK)
628 elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
629 # continue the loop to parse dags
630 pass
631 elif isinstance(agent_signal, CallbackRequest):
632 self._add_callback_to_queue(agent_signal)
633 else:
634 raise ValueError(f"Invalid message {type(agent_signal)}")
635
636 if not ready and not self._async_mode:
637 # In "sync" mode we don't want to parse the DAGs until we
638 # are told to (as that would open another connection to the
639 # SQLite DB which isn't a good practice
640
641 # This shouldn't happen, as in sync mode poll should block for
642 # ever. Lets be defensive about that.
643 self.log.warning(
644 "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time
645 )
646
647 continue
648
649 for sentinel in ready:
650 if sentinel is self._signal_conn:
651 continue
652
653 processor = self.waitables.get(sentinel)
654 if not processor:
655 continue

Callers 3

startMethod · 0.95

Calls 15

_refresh_dag_dirMethod · 0.95
start_new_processesMethod · 0.95
terminateMethod · 0.95
endMethod · 0.95
_find_zombiesMethod · 0.95
emit_metricsMethod · 0.95
wait_until_finishedMethod · 0.95
collect_resultsMethod · 0.95

Tested by 2