(
format: str,
*,
schema: type[Schema] | None = None,
with_metadata: bool = False,
autogenerate_key: bool = False,
csv_settings: CsvParserSettings | None = None,
json_field_paths: dict[str, str] | None = None,
schema_registry_settings: SchemaRegistrySettings | None = None,
with_native_record_key: bool = False,
_stacklevel: int = 1,
)
| 235 | |
| 236 | |
| 237 | def construct_schema_and_data_format( |
| 238 | format: str, |
| 239 | *, |
| 240 | schema: type[Schema] | None = None, |
| 241 | with_metadata: bool = False, |
| 242 | autogenerate_key: bool = False, |
| 243 | csv_settings: CsvParserSettings | None = None, |
| 244 | json_field_paths: dict[str, str] | None = None, |
| 245 | schema_registry_settings: SchemaRegistrySettings | None = None, |
| 246 | with_native_record_key: bool = False, |
| 247 | _stacklevel: int = 1, |
| 248 | ) -> tuple[type[Schema], api.DataFormat]: |
| 249 | data_format_type = get_data_format_type(format, SUPPORTED_INPUT_FORMATS) |
| 250 | |
| 251 | if data_format_type == "identity": |
| 252 | kwargs = locals() |
| 253 | unexpected_params = [ |
| 254 | "schema", |
| 255 | "csv_settings", |
| 256 | "json_field_paths", |
| 257 | "schema_registry_settings", |
| 258 | ] |
| 259 | for param in unexpected_params: |
| 260 | if param in kwargs and kwargs[param] is not None: |
| 261 | raise ValueError(f"Unexpected argument for {format!r} format: {param}") |
| 262 | |
| 263 | parse_utf8 = format not in ("binary", "only_metadata", "raw") |
| 264 | schema = construct_raw_data_schema_by_flags( |
| 265 | with_native_record_key=with_native_record_key, |
| 266 | parse_utf8=parse_utf8, |
| 267 | with_metadata=with_metadata, |
| 268 | ) |
| 269 | schema, api_schema = read_schema(schema) |
| 270 | |
| 271 | return schema, api.DataFormat( |
| 272 | format_type=data_format_type, |
| 273 | **api_schema, |
| 274 | parse_utf8=parse_utf8, |
| 275 | key_generation_policy=( |
| 276 | api.KeyGenerationPolicy.ALWAYS_AUTOGENERATE |
| 277 | if autogenerate_key |
| 278 | else api.KeyGenerationPolicy.PREFER_MESSAGE_KEY |
| 279 | ), |
| 280 | schema_registry_settings=maybe_schema_registry_settings( |
| 281 | schema_registry_settings |
| 282 | ), |
| 283 | message_queue_key_field=( |
| 284 | MESSAGE_QUEUE_KEY_COLUMN_NAME if with_native_record_key else None |
| 285 | ), |
| 286 | ) |
| 287 | |
| 288 | schema = assert_schema_not_none(schema, data_format_type) |
| 289 | if METADATA_COLUMN_NAME in schema.column_names(): |
| 290 | if with_metadata: |
| 291 | raise ValueError( |
| 292 | f"The schema already declares a {METADATA_COLUMN_NAME!r} column, " |
| 293 | f"which conflicts with 'with_metadata=True'. Either remove " |
| 294 | f"{METADATA_COLUMN_NAME!r} from the schema or set " |
no test coverage detected