Test that a column in a SQL query can be aliased correctly and processed by the pipeline. Specifically, this test ensures the resulting table contains the aliased column with the correct data.
(destination_config: DestinationTestConfiguration)
| 354 | ids=lambda x: x.name, |
| 355 | ) |
| 356 | def test_aliased_column(destination_config: DestinationTestConfiguration) -> None: |
| 357 | """ |
| 358 | Test that a column in a SQL query can be aliased correctly and processed by the pipeline. |
| 359 | Specifically, this test ensures the resulting table contains the aliased column with the correct data. |
| 360 | """ |
| 361 | pipeline = destination_config.setup_pipeline("test_aliased_column", dev_mode=True) |
| 362 | pipeline.run( |
| 363 | [{"a": i, "b": i + 1} for i in range(10)], |
| 364 | table_name="example_table", |
| 365 | **destination_config.run_kwargs, |
| 366 | ) |
| 367 | |
| 368 | dataset = pipeline.dataset() |
| 369 | example_table_columns = dataset.schema.tables["example_table"]["columns"] |
| 370 | |
| 371 | # Define a resource that aliases column "a" as "b" |
| 372 | @dlt.resource() |
| 373 | def copied_table_with_a_as_b() -> Any: |
| 374 | rel = dataset("SELECT a as b, _dlt_load_id, _dlt_id FROM example_table") |
| 375 | # parsed = rel._qualified_query |
| 376 | |
| 377 | # # sqlglot.parse_one(query, read=select_dialect) |
| 378 | # # Get first expression in the SELECT statement (e.g "a") |
| 379 | # first_expr = parsed.expressions[0] |
| 380 | # # Clickhouse aliases by default, so special handling is needed |
| 381 | # if isinstance(first_expr, sqlglot.exp.Alias): |
| 382 | # original_expr = first_expr.this |
| 383 | # else: |
| 384 | # original_expr = first_expr |
| 385 | # # Wrap the first expression with an alias: "a AS b" |
| 386 | # parsed.expressions[0] = sqlglot.exp.Alias(this=original_expr, alias="b") |
| 387 | # # Convert back to an SQL |
| 388 | # query = rel.to_sql() |
| 389 | yield dlt.mark.with_hints( |
| 390 | rel, |
| 391 | hints=make_hints(columns={k: v for k, v in example_table_columns.items() if k != "a"}), |
| 392 | ) |
| 393 | |
| 394 | pipeline.run( |
| 395 | [copied_table_with_a_as_b()], |
| 396 | loader_file_format="model", |
| 397 | table_format=destination_config.run_kwargs["table_format"], |
| 398 | ) |
| 399 | |
| 400 | assert load_table_counts(pipeline, "copied_table_with_a_as_b", "example_table") == { |
| 401 | "copied_table_with_a_as_b": 10, |
| 402 | "example_table": 10, |
| 403 | } |
| 404 | |
| 405 | assert set(pipeline.default_schema.tables["copied_table_with_a_as_b"]["columns"].keys()) == { |
| 406 | "b", |
| 407 | "_dlt_id", |
| 408 | "_dlt_load_id", |
| 409 | } |
| 410 | |
| 411 | # The sum of "b" should match the sum of the original "a" |
| 412 | result_df = dataset["copied_table_with_a_as_b"].df() |
| 413 | assert result_df["b"].sum() == sum(i for i in range(10)) |
nothing calls this directly
no test coverage detected