Create OpenLineageConfig from a dictionary. Args: config_dict: Dictionary containing configuration values Returns: OpenLineageConfig instance
(cls, config_dict: Dict[str, Any])
| 53 | |
| 54 | @classmethod |
| 55 | def from_dict(cls, config_dict: Dict[str, Any]) -> "OpenLineageConfig": |
| 56 | """ |
| 57 | Create OpenLineageConfig from a dictionary. |
| 58 | |
| 59 | Args: |
| 60 | config_dict: Dictionary containing configuration values |
| 61 | |
| 62 | Returns: |
| 63 | OpenLineageConfig instance |
| 64 | """ |
| 65 | return cls( |
| 66 | enabled=config_dict.get("enabled", True), |
| 67 | transport_type=config_dict.get("transport_type"), |
| 68 | transport_url=config_dict.get("transport_url"), |
| 69 | transport_endpoint=config_dict.get("transport_endpoint", "api/v1/lineage"), |
| 70 | api_key=config_dict.get("api_key"), |
| 71 | namespace=config_dict.get("namespace", "feast"), |
| 72 | producer=config_dict.get("producer", "feast"), |
| 73 | emit_on_apply=config_dict.get("emit_on_apply", True), |
| 74 | emit_on_materialize=config_dict.get("emit_on_materialize", True), |
| 75 | additional_config=config_dict.get("additional_config", {}), |
| 76 | ) |
| 77 | |
| 78 | @classmethod |
| 79 | def from_env(cls) -> "OpenLineageConfig": |