Return the relation with the `_dlt_load_id` included. This only works on relations created via `.table()`. If the relation already includes `_dlt_load_id`, it is returned unchanged. Otherwise, the root table is joined to add the column to the current relation. Rais
(self)
| 731 | |
| 732 | # TODO could be refactored to join any column from `_dlt_loads` table |
| 733 | def with_load_id_col(self) -> dlt.Relation: |
| 734 | """Return the relation with the `_dlt_load_id` included. |
| 735 | |
| 736 | This only works on relations created via `.table()`. |
| 737 | |
| 738 | If the relation already includes `_dlt_load_id`, it is returned unchanged. |
| 739 | Otherwise, the root table is joined to add the column to the current relation. |
| 740 | |
| 741 | Raises: |
| 742 | ValueError: If called on a non-table relation, a root table without |
| 743 | `_dlt_load_id`, or a relation whose root load ID column cannot be located. |
| 744 | """ |
| 745 | if not self._table_name or self._query is not None: |
| 746 | raise ValueError( |
| 747 | "`with_load_id_col()` only works on relations created via .table()." |
| 748 | " It can't be applied to arbitrary relation." |
| 749 | ) |
| 750 | |
| 751 | normalized_load_id = self._dataset.schema.naming.normalize_identifier(C_DLT_LOAD_ID) |
| 752 | |
| 753 | if normalized_load_id in self.columns: |
| 754 | return self |
| 755 | |
| 756 | root_table_name = schema_utils.get_root_table( |
| 757 | self._dataset.schema.tables, self._table_name |
| 758 | )["name"] |
| 759 | if root_table_name == self._table_name: |
| 760 | raise ValueError( |
| 761 | f"{root_table_name} is a root table, but load id column is not present." |
| 762 | ) |
| 763 | |
| 764 | join_alias = "_dlt_root" |
| 765 | joined = self.join(root_table_name, alias=join_alias) |
| 766 | joined_expression = joined.sqlglot_expression.copy() |
| 767 | left_projection = joined_expression.selects[: len(self.sqlglot_expression.selects)] |
| 768 | load_id_output_name = f"{join_alias}__{normalized_load_id}" |
| 769 | load_id_expr = next( |
| 770 | (expr for expr in joined_expression.selects if expr.output_name == load_id_output_name), |
| 771 | None, |
| 772 | ) |
| 773 | if load_id_expr is None: |
| 774 | raise ValueError(f"Could not locate column {normalized_load_id}") |
| 775 | |
| 776 | joined_expression.set("expressions", [*left_projection, load_id_expr.this.copy()]) |
| 777 | |
| 778 | rel = self.__copy__() |
| 779 | rel._sqlglot_expression = joined_expression |
| 780 | return rel |
| 781 | |
| 782 | def from_loads( |
| 783 | self, |