MCPcopy
hub / github.com/dlt-hub/dlt / _apply_join_projection

Function _apply_join_projection

dlt/dataset/_join.py:288–344  ·  view source on GitHub ↗

Apply join projection contract onto `query`. Preserves the left-side projection and appends only columns from the explicitly joined `target_table` as `{projection_prefix}__{column}` aliases. `allow_existing_target_projection` is used for idempotent re-joins: when a join call contri

(
    query: sge.Select,
    *,
    schema: Schema,
    left_table: str,
    target_table: str,
    target_qualifier: str,
    projection_prefix: str,
    allow_existing_target_projection: bool,
)

Source from the content-addressed store, hash-verified

286
287
288def _apply_join_projection(
289 query: sge.Select,
290 *,
291 schema: Schema,
292 left_table: str,
293 target_table: str,
294 target_qualifier: str,
295 projection_prefix: str,
296 allow_existing_target_projection: bool,
297) -> None:
298 """Apply join projection contract onto `query`.
299
300 Preserves the left-side projection and appends only columns from the explicitly
301 joined `target_table` as `{projection_prefix}__{column}` aliases.
302
303 `allow_existing_target_projection` is used for idempotent re-joins: when a
304 join call contributes no new join edges, all target-prefixed columns may already
305 exist in the left projection and should be accepted as a no-op instead of raising
306 a collision error.
307 """
308 normalized_left_expressions = _normalize_left_projection(query, left_table)
309
310 existing_projection_column_names = {
311 expr.output_name
312 for expr in normalized_left_expressions
313 if expr.output_name not in {"", "*"}
314 }
315
316 target_columns = schema.get_table_columns(target_table)
317 target_output_names = {
318 f"{projection_prefix}__{column_name}" for column_name in target_columns.keys()
319 }
320 duplicate_output_names = target_output_names & existing_projection_column_names
321 if duplicate_output_names:
322 if duplicate_output_names == target_output_names and allow_existing_target_projection:
323 # no-op: all target columns are already projected (on duplicate join call for example)
324 return
325 duplicate_names_list = ", ".join(sorted(duplicate_output_names))
326 raise ValueError(
327 "Join projection output names conflict with existing columns: "
328 f"{duplicate_names_list}. Choose a different `alias` for `join(...)`."
329 )
330
331 appended_target_columns: list[sge.Expression] = []
332 for column_name in target_columns.keys():
333 output_name = f"{projection_prefix}__{column_name}"
334 appended_target_columns.append(
335 sge.Alias(
336 this=sge.Column(
337 table=sge.to_identifier(target_qualifier, quoted=False),
338 this=sge.to_identifier(column_name, quoted=True),
339 ),
340 alias=sge.to_identifier(output_name, quoted=True),
341 )
342 )
343
344 query.set("expressions", [*normalized_left_expressions, *appended_target_columns])
345

Callers 1

_apply_joinFunction · 0.85

Calls 4

get_table_columnsMethod · 0.80
joinMethod · 0.80
setMethod · 0.80

Tested by

no test coverage detected