MCPcopy
hub / github.com/dlt-hub/dlt / setup

Method setup

tests/load/utils.py:264–291  ·  view source on GitHub ↗

Sets up environment variables for this destination configuration

(self)

Source from the content-addressed store, hash-verified

262 }
263
264 def setup(self) -> None:
265 """Sets up environment variables for this destination configuration"""
266 env_key_prefix = ( # "DESTINATION__" + (self.destination_name or self.destination_type).upper()
267 "DESTINATION"
268 )
269 for k, v in self.factory_kwargs.items():
270 if k == "naming_convention":
271 os.environ["SCHEMA__NAMING"] = str(v)
272 else:
273 os.environ[f"{env_key_prefix}__{k.upper()}"] = str(v)
274
275 # For the filesystem destinations we disable compression to make analyzing the result easier
276 os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = str(
277 self.destination_type == "filesystem"
278 if self.disable_compression is None
279 else self.disable_compression
280 )
281
282 if self.credentials is not None:
283 if isinstance(self.credentials, str):
284 os.environ[f"{env_key_prefix}__CREDENTIALS"] = self.credentials
285 else:
286 for key, value in dict(self.credentials).items():
287 os.environ[f"{env_key_prefix}__CREDENTIALS__{key.upper()}"] = str(value)
288
289 if self.env_vars is not None:
290 for k, v in self.env_vars.items():
291 os.environ[k] = v
292
293 def setup_pipeline(
294 self, pipeline_name: str, dataset_name: str = None, dev_mode: bool = False, **kwargs

Callers 10

destination_factoryMethod · 0.95
setup_pipelineMethod · 0.95
test_default_schema_nameFunction · 0.80
test_attach_pipelineFunction · 0.80
test_run_dev_modeFunction · 0.80
test_source_max_nestingFunction · 0.80
test_dataset_name_changeFunction · 0.80

Calls

no outgoing calls

Tested by 8

test_default_schema_nameFunction · 0.64
test_attach_pipelineFunction · 0.64
test_run_dev_modeFunction · 0.64
test_source_max_nestingFunction · 0.64
test_dataset_name_changeFunction · 0.64