(
sql_store: SqlZenStore,
*,
project_id: UUID,
run_id: UUID,
key: str,
value: object,
type_: MetadataTypeEnum,
)
| 120 | |
| 121 | |
| 122 | def _create_run_metadata( |
| 123 | sql_store: SqlZenStore, |
| 124 | *, |
| 125 | project_id: UUID, |
| 126 | run_id: UUID, |
| 127 | key: str, |
| 128 | value: object, |
| 129 | type_: MetadataTypeEnum, |
| 130 | ) -> None: |
| 131 | rm = RunMetadataSchema( |
| 132 | project_id=project_id, |
| 133 | key=key, |
| 134 | value=json.dumps(value), |
| 135 | type=type_.value, |
| 136 | ) |
| 137 | with Session(sql_store.engine) as session: |
| 138 | session.add(rm) |
| 139 | session.flush() |
| 140 | session.add( |
| 141 | RunMetadataResourceSchema( |
| 142 | resource_id=run_id, |
| 143 | resource_type=MetadataResourceTypes.PIPELINE_RUN.value, |
| 144 | run_metadata_id=rm.id, |
| 145 | ) |
| 146 | ) |
| 147 | session.commit() |
| 148 | |
| 149 | |
| 150 | def _create_pipeline(sql_store: SqlZenStore, project_id: UUID) -> UUID: |
no test coverage detected