The plan with a DAG of logical operators.
| 8 | |
| 9 | |
| 10 | class LogicalPlan(Plan): |
| 11 | """The plan with a DAG of logical operators.""" |
| 12 | |
| 13 | def __init__(self, dag: LogicalOperator, context: "DataContext"): |
| 14 | super().__init__(context) |
| 15 | self._dag = dag |
| 16 | |
| 17 | @property |
| 18 | def dag(self) -> LogicalOperator: |
| 19 | """Get the DAG of logical operators.""" |
| 20 | return self._dag |
| 21 | |
| 22 | def sources(self) -> List[LogicalOperator]: |
| 23 | """List of operators that are sources for this plan's DAG.""" |
| 24 | # If an operator has no input dependencies, it's a source. |
| 25 | if not any(self._dag.input_dependencies): |
| 26 | return [self._dag] |
| 27 | |
| 28 | sources = [] |
| 29 | for op in self._dag.input_dependencies: |
| 30 | sources.extend(LogicalPlan(op, self.context).sources()) |
| 31 | return sources |
| 32 | |
| 33 | def has_lazy_input(self) -> bool: |
| 34 | """Return whether this plan has lazy input blocks.""" |
| 35 | from ray.data._internal.logical.operators import Read |
| 36 | |
| 37 | return all(isinstance(op, Read) for op in self.sources()) |
| 38 | |
| 39 | def require_preserve_order(self) -> bool: |
| 40 | """Whether this plan requires to preserve order.""" |
| 41 | from ray.data._internal.logical.operators import Zip |
| 42 | |
| 43 | return any(isinstance(op, Zip) for op in self.dag.post_order_iter()) |
| 44 | |
| 45 | def input_files(self) -> Optional[List[str]]: |
| 46 | """Get the input files of the dataset, if available.""" |
| 47 | input_files = self.dag.infer_metadata().input_files |
| 48 | if input_files is None: |
| 49 | return None |
| 50 | return list(set(input_files)) |
| 51 | |
| 52 | def initial_num_blocks(self) -> Optional[int]: |
| 53 | """Get the estimated number of blocks from the logical plan |
| 54 | after applying execution plan optimizations, but prior to |
| 55 | fully executing the dataset.""" |
| 56 | return self.dag.estimated_num_outputs() |
no outgoing calls
searching dependent graphs…