| 596 | return self._run_parsing_loop() |
| 597 | |
| 598 | def _run_parsing_loop(self): |
| 599 | |
| 600 | # In sync mode we want timeout=None -- wait forever until a message is received |
| 601 | if self._async_mode: |
| 602 | poll_time = 0.0 |
| 603 | else: |
| 604 | poll_time = None |
| 605 | |
| 606 | self._refresh_dag_dir() |
| 607 | self.prepare_file_path_queue() |
| 608 | |
| 609 | if self._async_mode: |
| 610 | # If we're in async mode, we can start up straight away. If we're |
| 611 | # in sync mode we need to be told to start a "loop" |
| 612 | self.start_new_processes() |
| 613 | |
| 614 | while True: |
| 615 | loop_start_time = time.monotonic() |
| 616 | |
| 617 | # pylint: disable=no-else-break |
| 618 | ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time) |
| 619 | if self._signal_conn in ready: |
| 620 | agent_signal = self._signal_conn.recv() |
| 621 | self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) |
| 622 | if agent_signal == DagParsingSignal.TERMINATE_MANAGER: |
| 623 | self.terminate() |
| 624 | break |
| 625 | elif agent_signal == DagParsingSignal.END_MANAGER: |
| 626 | self.end() |
| 627 | sys.exit(os.EX_OK) |
| 628 | elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: |
| 629 | # continue the loop to parse dags |
| 630 | pass |
| 631 | elif isinstance(agent_signal, CallbackRequest): |
| 632 | self._add_callback_to_queue(agent_signal) |
| 633 | else: |
| 634 | raise ValueError(f"Invalid message {type(agent_signal)}") |
| 635 | |
| 636 | if not ready and not self._async_mode: |
| 637 | # In "sync" mode we don't want to parse the DAGs until we |
| 638 | # are told to (as that would open another connection to the |
| 639 | # SQLite DB which isn't a good practice |
| 640 | |
| 641 | # This shouldn't happen, as in sync mode poll should block for |
| 642 | # ever. Lets be defensive about that. |
| 643 | self.log.warning( |
| 644 | "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time |
| 645 | ) |
| 646 | |
| 647 | continue |
| 648 | |
| 649 | for sentinel in ready: |
| 650 | if sentinel is self._signal_conn: |
| 651 | continue |
| 652 | |
| 653 | processor = self.waitables.get(sentinel) |
| 654 | if not processor: |
| 655 | continue |