MCPcopy
hub / github.com/dlt-hub/dlt / test_dependent_resource

Function test_dependent_resource

tests/sources/rest_api/test_rest_api_source.py:88–151  ·  view source on GitHub ↗
(destination_name: str, invocation_type: str)

Source from the content-addressed store, hash-verified

86@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
87@pytest.mark.parametrize("invocation_type", ("deco", "factory"))
88def test_dependent_resource(destination_name: str, invocation_type: str) -> None:
89 config: RESTAPIConfig = {
90 "client": {
91 "base_url": "https://pokeapi.apps.dlthub.com/api/v2/",
92 },
93 "resource_defaults": {
94 "endpoint": {
95 "params": {
96 "limit": 1000,
97 },
98 }
99 },
100 "resources": [
101 {
102 "name": "pokemon_list",
103 "endpoint": {
104 "path": "pokemon",
105 "paginator": SinglePagePaginator(),
106 "data_selector": "results",
107 "params": {
108 "limit": 2,
109 },
110 },
111 "selected": False,
112 },
113 {
114 "name": "pokemon",
115 "endpoint": {
116 "path": "pokemon/{name}",
117 "params": {
118 "name": {
119 "type": "resolve",
120 "resource": "pokemon_list",
121 "field": "name",
122 },
123 },
124 },
125 },
126 ],
127 }
128
129 if invocation_type == "deco":
130 data = rest_api(**config)
131 else:
132 data = rest_api_source(config)
133 pipeline = _make_pipeline(destination_name)
134 load_info = pipeline.run(data)
135 assert_load_info(load_info)
136 table_counts = load_table_counts(pipeline)
137
138 assert set(table_counts.keys()) == {
139 "pokemon",
140 "pokemon__types",
141 "pokemon__stats",
142 "pokemon__moves__version_group_details",
143 "pokemon__past_abilities",
144 "pokemon__past_abilities__abilities",
145 "pokemon__moves",

Callers

nothing calls this directly

Calls 7

SinglePagePaginatorClass · 0.90
rest_apiFunction · 0.90
rest_api_sourceFunction · 0.90
assert_load_infoFunction · 0.90
load_table_countsFunction · 0.90
_make_pipelineFunction · 0.70
runMethod · 0.45

Tested by

no test coverage detected