Apply join projection contract onto `query`. Preserves the left-side projection and appends only columns from the explicitly joined `target_table` as `{projection_prefix}__{column}` aliases. `allow_existing_target_projection` is used for idempotent re-joins: when a join call contri
(
query: sge.Select,
*,
schema: Schema,
left_table: str,
target_table: str,
target_qualifier: str,
projection_prefix: str,
allow_existing_target_projection: bool,
)
| 286 | |
| 287 | |
| 288 | def _apply_join_projection( |
| 289 | query: sge.Select, |
| 290 | *, |
| 291 | schema: Schema, |
| 292 | left_table: str, |
| 293 | target_table: str, |
| 294 | target_qualifier: str, |
| 295 | projection_prefix: str, |
| 296 | allow_existing_target_projection: bool, |
| 297 | ) -> None: |
| 298 | """Apply join projection contract onto `query`. |
| 299 | |
| 300 | Preserves the left-side projection and appends only columns from the explicitly |
| 301 | joined `target_table` as `{projection_prefix}__{column}` aliases. |
| 302 | |
| 303 | `allow_existing_target_projection` is used for idempotent re-joins: when a |
| 304 | join call contributes no new join edges, all target-prefixed columns may already |
| 305 | exist in the left projection and should be accepted as a no-op instead of raising |
| 306 | a collision error. |
| 307 | """ |
| 308 | normalized_left_expressions = _normalize_left_projection(query, left_table) |
| 309 | |
| 310 | existing_projection_column_names = { |
| 311 | expr.output_name |
| 312 | for expr in normalized_left_expressions |
| 313 | if expr.output_name not in {"", "*"} |
| 314 | } |
| 315 | |
| 316 | target_columns = schema.get_table_columns(target_table) |
| 317 | target_output_names = { |
| 318 | f"{projection_prefix}__{column_name}" for column_name in target_columns.keys() |
| 319 | } |
| 320 | duplicate_output_names = target_output_names & existing_projection_column_names |
| 321 | if duplicate_output_names: |
| 322 | if duplicate_output_names == target_output_names and allow_existing_target_projection: |
| 323 | # no-op: all target columns are already projected (on duplicate join call for example) |
| 324 | return |
| 325 | duplicate_names_list = ", ".join(sorted(duplicate_output_names)) |
| 326 | raise ValueError( |
| 327 | "Join projection output names conflict with existing columns: " |
| 328 | f"{duplicate_names_list}. Choose a different `alias` for `join(...)`." |
| 329 | ) |
| 330 | |
| 331 | appended_target_columns: list[sge.Expression] = [] |
| 332 | for column_name in target_columns.keys(): |
| 333 | output_name = f"{projection_prefix}__{column_name}" |
| 334 | appended_target_columns.append( |
| 335 | sge.Alias( |
| 336 | this=sge.Column( |
| 337 | table=sge.to_identifier(target_qualifier, quoted=False), |
| 338 | this=sge.to_identifier(column_name, quoted=True), |
| 339 | ), |
| 340 | alias=sge.to_identifier(output_name, quoted=True), |
| 341 | ) |
| 342 | ) |
| 343 | |
| 344 | query.set("expressions", [*normalized_left_expressions, *appended_target_columns]) |
| 345 |
no test coverage detected