MCPcopy
hub / github.com/pathwaycom/pathway / construct_schema_and_data_format

Function construct_schema_and_data_format

python/pathway/io/_utils.py:237–359  ·  view source on GitHub ↗
(
    format: str,
    *,
    schema: type[Schema] | None = None,
    with_metadata: bool = False,
    autogenerate_key: bool = False,
    csv_settings: CsvParserSettings | None = None,
    json_field_paths: dict[str, str] | None = None,
    schema_registry_settings: SchemaRegistrySettings | None = None,
    with_native_record_key: bool = False,
    _stacklevel: int = 1,
)

Source from the content-addressed store, hash-verified

235
236
237def construct_schema_and_data_format(
238 format: str,
239 *,
240 schema: type[Schema] | None = None,
241 with_metadata: bool = False,
242 autogenerate_key: bool = False,
243 csv_settings: CsvParserSettings | None = None,
244 json_field_paths: dict[str, str] | None = None,
245 schema_registry_settings: SchemaRegistrySettings | None = None,
246 with_native_record_key: bool = False,
247 _stacklevel: int = 1,
248) -> tuple[type[Schema], api.DataFormat]:
249 data_format_type = get_data_format_type(format, SUPPORTED_INPUT_FORMATS)
250
251 if data_format_type == "identity":
252 kwargs = locals()
253 unexpected_params = [
254 "schema",
255 "csv_settings",
256 "json_field_paths",
257 "schema_registry_settings",
258 ]
259 for param in unexpected_params:
260 if param in kwargs and kwargs[param] is not None:
261 raise ValueError(f"Unexpected argument for {format!r} format: {param}")
262
263 parse_utf8 = format not in ("binary", "only_metadata", "raw")
264 schema = construct_raw_data_schema_by_flags(
265 with_native_record_key=with_native_record_key,
266 parse_utf8=parse_utf8,
267 with_metadata=with_metadata,
268 )
269 schema, api_schema = read_schema(schema)
270
271 return schema, api.DataFormat(
272 format_type=data_format_type,
273 **api_schema,
274 parse_utf8=parse_utf8,
275 key_generation_policy=(
276 api.KeyGenerationPolicy.ALWAYS_AUTOGENERATE
277 if autogenerate_key
278 else api.KeyGenerationPolicy.PREFER_MESSAGE_KEY
279 ),
280 schema_registry_settings=maybe_schema_registry_settings(
281 schema_registry_settings
282 ),
283 message_queue_key_field=(
284 MESSAGE_QUEUE_KEY_COLUMN_NAME if with_native_record_key else None
285 ),
286 )
287
288 schema = assert_schema_not_none(schema, data_format_type)
289 if METADATA_COLUMN_NAME in schema.column_names():
290 if with_metadata:
291 raise ValueError(
292 f"The schema already declares a {METADATA_COLUMN_NAME!r} column, "
293 f"which conflicts with 'with_metadata=True'. Either remove "
294 f"{METADATA_COLUMN_NAME!r} from the schema or set "

Callers 9

readFunction · 0.90
readFunction · 0.90
read_from_digital_oceanFunction · 0.90
read_from_wasabiFunction · 0.90
readFunction · 0.90
readFunction · 0.90
readFunction · 0.90
readFunction · 0.90
readFunction · 0.90

Calls 8

get_data_format_typeFunction · 0.85
read_schemaFunction · 0.85
assert_schema_not_noneFunction · 0.85
itemsMethod · 0.80
startswithMethod · 0.80
column_namesMethod · 0.45

Tested by

no test coverage detected