MCPcopy
hub / github.com/dlt-hub/dlt / test_apply_hints_incremental

Function test_apply_hints_incremental

tests/extract/test_incremental.py:1720–1838  ·  view source on GitHub ↗
(item_type: TestDataItemFormat)

Source from the content-addressed store, hash-verified

1718
1719@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
1720def test_apply_hints_incremental(item_type: TestDataItemFormat) -> None:
1721 os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately
1722 p = dlt.pipeline(pipeline_name="p" + uniq_id(), destination="dummy")
1723 data = [{"created_at": 1}, {"created_at": 2}, {"created_at": 3}]
1724 source_items = data_to_item_format(item_type, data)
1725
1726 should_have_arg = True
1727
1728 @dlt.resource
1729 def some_data(created_at: Optional[dlt.sources.incremental[int]] = None):
1730 # make sure that incremental from apply_hints is here
1731 if should_have_arg:
1732 assert created_at is not None
1733 assert created_at.cursor_path == "created_at"
1734 assert created_at.last_value_func is max
1735 yield source_items
1736
1737 # the incremental wrapper is created for a resource and the incremental value is provided via apply hints
1738 r = some_data()
1739 assert r is not some_data
1740 assert r.incremental is not None
1741 assert r.incremental.incremental is None
1742 r.apply_hints(incremental=dlt.sources.incremental("created_at", last_value_func=max))
1743 if item_type == "pandas":
1744 assert list(r)[0].equals(source_items[0])
1745 else:
1746 assert list(r) == source_items
1747 p.extract(r)
1748 assert "incremental" in r.state
1749 assert r.incremental.incremental is not None
1750 assert len(r._pipe) == 2
1751 # no more elements
1752 assert list(r) == []
1753
1754 # same thing with explicit None
1755 r = some_data(created_at=None).with_name("copy")
1756 r.apply_hints(incremental=dlt.sources.incremental("created_at", last_value_func=max))
1757 if item_type == "pandas":
1758 assert list(r)[0].equals(source_items[0])
1759 else:
1760 assert list(r) == source_items
1761 p.extract(r)
1762 assert "incremental" in r.state
1763 assert list(r) == []
1764
1765 # remove incremental
1766 should_have_arg = False
1767 r.apply_hints(incremental=dlt.sources.incremental.EMPTY)
1768 assert r.incremental is not None
1769 assert r.incremental.incremental is None
1770 if item_type == "pandas":
1771 assert list(r)[0].equals(source_items[0])
1772 else:
1773 assert list(r) == source_items
1774
1775 # as above but we provide explicit incremental when creating resource
1776 p = p.drop()
1777 should_have_arg = True

Callers

nothing calls this directly

Calls 12

uniq_idFunction · 0.90
data_to_item_formatFunction · 0.90
some_data_w_defaultFunction · 0.85
some_data_no_incrementalFunction · 0.85
pipelineMethod · 0.80
apply_hintsMethod · 0.80
with_nameMethod · 0.80
dropMethod · 0.80
normalizeMethod · 0.80
some_dataFunction · 0.70
incrementalMethod · 0.45
extractMethod · 0.45

Tested by

no test coverage detected