MCPcopy
hub / github.com/dlt-hub/dlt / w_normalize_files

Function w_normalize_files

dlt/normalize/worker.py:64–285  ·  view source on GitHub ↗
(
    config: NormalizeConfiguration,
    normalize_storage_config: NormalizeStorageConfiguration,
    loader_storage_config: LoadStorageConfiguration,
    stored_schema: TStoredSchema,
    load_id: str,
    extracted_items_files: Sequence[str],
    report_progress: bool = False,
    collector: Collector = NULL_COLLECTOR,
)

Source from the content-addressed store, hash-verified

62
63
64def w_normalize_files(
65 config: NormalizeConfiguration,
66 normalize_storage_config: NormalizeStorageConfiguration,
67 loader_storage_config: LoadStorageConfiguration,
68 stored_schema: TStoredSchema,
69 load_id: str,
70 extracted_items_files: Sequence[str],
71 report_progress: bool = False,
72 collector: Collector = NULL_COLLECTOR,
73) -> TWorkerRV:
74 destination_caps = config.destination_capabilities
75 schema_updates: List[TSchemaUpdate] = []
76 # normalizers are cached per {table_name}.{item_format}
77 item_normalizers: Dict[str, ItemsNormalizer] = {}
78
79 preferred_file_format = (
80 destination_caps.preferred_loader_file_format
81 or destination_caps.preferred_staging_file_format
82 )
83 # TODO: capabilities.supported_*_formats can be None, it should have defaults
84 supported_file_formats = destination_caps.supported_loader_file_formats or []
85
86 # process all files with data items and write to buffered item storage
87 with Container().injectable_context(destination_caps):
88 schema = Schema.from_stored_schema(stored_schema)
89 normalize_storage = NormalizeStorage(False, normalize_storage_config)
90 load_storage = LoadStorage(False, supported_file_formats, loader_storage_config)
91
92 def _get_items_normalizer(
93 parsed_file_name: ParsedLoadJobFileName, table_schema: TTableSchema
94 ) -> ItemsNormalizer:
95 item_format = DataWriter.item_format_from_file_extension(parsed_file_name.file_format)
96
97 table_name = table_schema["name"]
98 normalizer_key = f"{table_name}.{item_format}"
99
100 if normalizer_key in item_normalizers:
101 return item_normalizers[normalizer_key]
102
103 # TODO: extract code that resolves file_format from preferred to utils
104
105 items_preferred_file_format = preferred_file_format
106 items_supported_file_formats = supported_file_formats
107 if destination_caps.loader_file_format_selector is not None:
108 items_preferred_file_format, items_supported_file_formats = (
109 destination_caps.loader_file_format_selector(
110 preferred_file_format,
111 (
112 supported_file_formats.copy()
113 if isinstance(supported_file_formats, list)
114 else supported_file_formats
115 ),
116 table_schema=table_schema,
117 )
118 )
119
120 best_writer_spec = None
121 if item_format == "file":

Callers 1

map_singleMethod · 0.90

Calls 15

ContainerClass · 0.90
NormalizeStorageClass · 0.90
LoadStorageClass · 0.90
new_tableFunction · 0.90
prepare_load_tableFunction · 0.90
NormalizeJobFailedClass · 0.90
_get_items_normalizerFunction · 0.85
TWorkerRVClass · 0.85
injectable_contextMethod · 0.80
from_stored_schemaMethod · 0.80
parseMethod · 0.80

Tested by

no test coverage detected