(
config: NormalizeConfiguration,
normalize_storage_config: NormalizeStorageConfiguration,
loader_storage_config: LoadStorageConfiguration,
stored_schema: TStoredSchema,
load_id: str,
extracted_items_files: Sequence[str],
report_progress: bool = False,
collector: Collector = NULL_COLLECTOR,
)
| 62 | |
| 63 | |
| 64 | def w_normalize_files( |
| 65 | config: NormalizeConfiguration, |
| 66 | normalize_storage_config: NormalizeStorageConfiguration, |
| 67 | loader_storage_config: LoadStorageConfiguration, |
| 68 | stored_schema: TStoredSchema, |
| 69 | load_id: str, |
| 70 | extracted_items_files: Sequence[str], |
| 71 | report_progress: bool = False, |
| 72 | collector: Collector = NULL_COLLECTOR, |
| 73 | ) -> TWorkerRV: |
| 74 | destination_caps = config.destination_capabilities |
| 75 | schema_updates: List[TSchemaUpdate] = [] |
| 76 | # normalizers are cached per {table_name}.{item_format} |
| 77 | item_normalizers: Dict[str, ItemsNormalizer] = {} |
| 78 | |
| 79 | preferred_file_format = ( |
| 80 | destination_caps.preferred_loader_file_format |
| 81 | or destination_caps.preferred_staging_file_format |
| 82 | ) |
| 83 | # TODO: capabilities.supported_*_formats can be None, it should have defaults |
| 84 | supported_file_formats = destination_caps.supported_loader_file_formats or [] |
| 85 | |
| 86 | # process all files with data items and write to buffered item storage |
| 87 | with Container().injectable_context(destination_caps): |
| 88 | schema = Schema.from_stored_schema(stored_schema) |
| 89 | normalize_storage = NormalizeStorage(False, normalize_storage_config) |
| 90 | load_storage = LoadStorage(False, supported_file_formats, loader_storage_config) |
| 91 | |
| 92 | def _get_items_normalizer( |
| 93 | parsed_file_name: ParsedLoadJobFileName, table_schema: TTableSchema |
| 94 | ) -> ItemsNormalizer: |
| 95 | item_format = DataWriter.item_format_from_file_extension(parsed_file_name.file_format) |
| 96 | |
| 97 | table_name = table_schema["name"] |
| 98 | normalizer_key = f"{table_name}.{item_format}" |
| 99 | |
| 100 | if normalizer_key in item_normalizers: |
| 101 | return item_normalizers[normalizer_key] |
| 102 | |
| 103 | # TODO: extract code that resolves file_format from preferred to utils |
| 104 | |
| 105 | items_preferred_file_format = preferred_file_format |
| 106 | items_supported_file_formats = supported_file_formats |
| 107 | if destination_caps.loader_file_format_selector is not None: |
| 108 | items_preferred_file_format, items_supported_file_formats = ( |
| 109 | destination_caps.loader_file_format_selector( |
| 110 | preferred_file_format, |
| 111 | ( |
| 112 | supported_file_formats.copy() |
| 113 | if isinstance(supported_file_formats, list) |
| 114 | else supported_file_formats |
| 115 | ), |
| 116 | table_schema=table_schema, |
| 117 | ) |
| 118 | ) |
| 119 | |
| 120 | best_writer_spec = None |
| 121 | if item_format == "file": |
no test coverage detected