Runs the graph (see parent class for full docstring).
(
self,
inputs: Optional[Dict[Text, Any]] = None,
targets: Optional[List[Text]] = None,
)
| 80 | return run_graph |
| 81 | |
| 82 | def run( |
| 83 | self, |
| 84 | inputs: Optional[Dict[Text, Any]] = None, |
| 85 | targets: Optional[List[Text]] = None, |
| 86 | ) -> Dict[Text, Any]: |
| 87 | """Runs the graph (see parent class for full docstring).""" |
| 88 | run_targets = targets if targets else self._graph_schema.target_names |
| 89 | minimal_schema = self._graph_schema.minimal_graph_schema(run_targets) |
| 90 | run_graph = self._build_dask_graph(minimal_schema) |
| 91 | |
| 92 | if inputs: |
| 93 | self._add_inputs_to_graph(inputs, run_graph) |
| 94 | |
| 95 | logger.debug( |
| 96 | f"Running graph with inputs: {inputs}, targets: {targets} " |
| 97 | f"and {self._execution_context}." |
| 98 | ) |
| 99 | |
| 100 | try: |
| 101 | dask_result = dask.get(run_graph, run_targets) |
| 102 | return dict(dask_result) |
| 103 | except RuntimeError as e: |
| 104 | raise GraphRunError("Error running runner.") from e |
| 105 | |
| 106 | @staticmethod |
| 107 | def _add_inputs_to_graph(inputs: Optional[Dict[Text, Any]], graph: Any) -> None: |