Given a possibly nested Pydantic class, return a flattened version of it, by constructing top-level fields, whose names are formed from the path through the nested structure, separated by double underscores. Args: model (Type[BaseModel]): The Pydantic model to flatten.
(
model: Type[BaseModel],
base_model: Type[BaseModel] = BaseModel,
)
| 102 | |
| 103 | |
| 104 | def flatten_pydantic_model( |
| 105 | model: Type[BaseModel], |
| 106 | base_model: Type[BaseModel] = BaseModel, |
| 107 | ) -> Type[BaseModel]: |
| 108 | """ |
| 109 | Given a possibly nested Pydantic class, return a flattened version of it, |
| 110 | by constructing top-level fields, whose names are formed from the path |
| 111 | through the nested structure, separated by double underscores. |
| 112 | |
| 113 | Args: |
| 114 | model (Type[BaseModel]): The Pydantic model to flatten. |
| 115 | base_model (Type[BaseModel], optional): The base model to use for the |
| 116 | flattened model. Defaults to BaseModel. |
| 117 | |
| 118 | Returns: |
| 119 | Type[BaseModel]: The flattened Pydantic model. |
| 120 | """ |
| 121 | |
| 122 | flattened_fields: Dict[str, Any] = {} |
| 123 | models_to_process = [(model, "")] |
| 124 | |
| 125 | while models_to_process: |
| 126 | current_model, current_prefix = models_to_process.pop() |
| 127 | |
| 128 | for name, field in current_model.model_fields.items(): |
| 129 | field_type = field.annotation if hasattr(field, "annotation") else field |
| 130 | if isinstance(field_type, type) and issubclass(field_type, BaseModel): |
| 131 | new_prefix = ( |
| 132 | f"{current_prefix}{name}__" if current_prefix else f"{name}__" |
| 133 | ) |
| 134 | models_to_process.append((field_type, new_prefix)) |
| 135 | else: |
| 136 | flattened_name = f"{current_prefix}{name}" |
| 137 | |
| 138 | if ( |
| 139 | hasattr(field, "default_factory") |
| 140 | and field.default_factory is not None |
| 141 | ): |
| 142 | flattened_fields[flattened_name] = ( |
| 143 | field_type, |
| 144 | field.default_factory, |
| 145 | ) |
| 146 | elif hasattr(field, "default") and field.default is not ...: |
| 147 | flattened_fields[flattened_name] = ( |
| 148 | field_type, |
| 149 | field.default, |
| 150 | ) |
| 151 | else: |
| 152 | flattened_fields[flattened_name] = (field_type, ...) |
| 153 | |
| 154 | return create_model("FlatModel", __base__=base_model, **flattened_fields) |
| 155 | |
| 156 | |
| 157 | def get_field_names(model: Type[BaseModel]) -> List[str]: |
no outgoing calls
no test coverage detected
searching dependent graphs…