MCPcopy
hub / github.com/dlt-hub/dlt / with_load_id_col

Method with_load_id_col

dlt/dataset/relation.py:733–780  ·  view source on GitHub ↗

Return the relation with the `_dlt_load_id` included. This only works on relations created via `.table()`. If the relation already includes `_dlt_load_id`, it is returned unchanged. Otherwise, the root table is joined to add the column to the current relation. Rais

(self)

Source from the content-addressed store, hash-verified

731
732 # TODO could be refactored to join any column from `_dlt_loads` table
733 def with_load_id_col(self) -> dlt.Relation:
734 """Return the relation with the `_dlt_load_id` included.
735
736 This only works on relations created via `.table()`.
737
738 If the relation already includes `_dlt_load_id`, it is returned unchanged.
739 Otherwise, the root table is joined to add the column to the current relation.
740
741 Raises:
742 ValueError: If called on a non-table relation, a root table without
743 `_dlt_load_id`, or a relation whose root load ID column cannot be located.
744 """
745 if not self._table_name or self._query is not None:
746 raise ValueError(
747 "`with_load_id_col()` only works on relations created via .table()."
748 " It can't be applied to arbitrary relation."
749 )
750
751 normalized_load_id = self._dataset.schema.naming.normalize_identifier(C_DLT_LOAD_ID)
752
753 if normalized_load_id in self.columns:
754 return self
755
756 root_table_name = schema_utils.get_root_table(
757 self._dataset.schema.tables, self._table_name
758 )["name"]
759 if root_table_name == self._table_name:
760 raise ValueError(
761 f"{root_table_name} is a root table, but load id column is not present."
762 )
763
764 join_alias = "_dlt_root"
765 joined = self.join(root_table_name, alias=join_alias)
766 joined_expression = joined.sqlglot_expression.copy()
767 left_projection = joined_expression.selects[: len(self.sqlglot_expression.selects)]
768 load_id_output_name = f"{join_alias}__{normalized_load_id}"
769 load_id_expr = next(
770 (expr for expr in joined_expression.selects if expr.output_name == load_id_output_name),
771 None,
772 )
773 if load_id_expr is None:
774 raise ValueError(f"Could not locate column {normalized_load_id}")
775
776 joined_expression.set("expressions", [*left_projection, load_id_expr.this.copy()])
777
778 rel = self.__copy__()
779 rel._sqlglot_expression = joined_expression
780 return rel
781
782 def from_loads(
783 self,

Calls 5

joinMethod · 0.95
__copy__Method · 0.95
setMethod · 0.80
normalize_identifierMethod · 0.45
copyMethod · 0.45