MCPcopy
hub / github.com/dlt-hub/dlt / test_aliased_column

Function test_aliased_column

tests/load/test_model_item_format.py:356–413  ·  view source on GitHub ↗

Test that a column in a SQL query can be aliased correctly and processed by the pipeline. Specifically, this test ensures the resulting table contains the aliased column with the correct data.

(destination_config: DestinationTestConfiguration)

Source from the content-addressed store, hash-verified

354 ids=lambda x: x.name,
355)
356def test_aliased_column(destination_config: DestinationTestConfiguration) -> None:
357 """
358 Test that a column in a SQL query can be aliased correctly and processed by the pipeline.
359 Specifically, this test ensures the resulting table contains the aliased column with the correct data.
360 """
361 pipeline = destination_config.setup_pipeline("test_aliased_column", dev_mode=True)
362 pipeline.run(
363 [{"a": i, "b": i + 1} for i in range(10)],
364 table_name="example_table",
365 **destination_config.run_kwargs,
366 )
367
368 dataset = pipeline.dataset()
369 example_table_columns = dataset.schema.tables["example_table"]["columns"]
370
371 # Define a resource that aliases column "a" as "b"
372 @dlt.resource()
373 def copied_table_with_a_as_b() -> Any:
374 rel = dataset("SELECT a as b, _dlt_load_id, _dlt_id FROM example_table")
375 # parsed = rel._qualified_query
376
377 # # sqlglot.parse_one(query, read=select_dialect)
378 # # Get first expression in the SELECT statement (e.g "a")
379 # first_expr = parsed.expressions[0]
380 # # Clickhouse aliases by default, so special handling is needed
381 # if isinstance(first_expr, sqlglot.exp.Alias):
382 # original_expr = first_expr.this
383 # else:
384 # original_expr = first_expr
385 # # Wrap the first expression with an alias: "a AS b"
386 # parsed.expressions[0] = sqlglot.exp.Alias(this=original_expr, alias="b")
387 # # Convert back to an SQL
388 # query = rel.to_sql()
389 yield dlt.mark.with_hints(
390 rel,
391 hints=make_hints(columns={k: v for k, v in example_table_columns.items() if k != "a"}),
392 )
393
394 pipeline.run(
395 [copied_table_with_a_as_b()],
396 loader_file_format="model",
397 table_format=destination_config.run_kwargs["table_format"],
398 )
399
400 assert load_table_counts(pipeline, "copied_table_with_a_as_b", "example_table") == {
401 "copied_table_with_a_as_b": 10,
402 "example_table": 10,
403 }
404
405 assert set(pipeline.default_schema.tables["copied_table_with_a_as_b"]["columns"].keys()) == {
406 "b",
407 "_dlt_id",
408 "_dlt_load_id",
409 }
410
411 # The sum of "b" should match the sum of the original "a"
412 result_df = dataset["copied_table_with_a_as_b"].df()
413 assert result_df["b"].sum() == sum(i for i in range(10))

Callers

nothing calls this directly

Calls 6

load_table_countsFunction · 0.90
copied_table_with_a_as_bFunction · 0.85
setup_pipelineMethod · 0.80
datasetMethod · 0.80
runMethod · 0.45
dfMethod · 0.45

Tested by

no test coverage detected