Applies the given diffs to the metadata store and infrastructure. Args: registry_diff: The diff between the current registry and the desired registry. infra_diff: The diff between the current infra and the desired infra. new_infra: The desired infra.
(
self,
registry_diff: RegistryDiff,
infra_diff: InfraDiff,
new_infra: Infra,
progress_ctx: Optional["ApplyProgressContext"] = None,
no_promote: bool = False,
)
| 1235 | return registry_diff, infra_diff, new_infra |
| 1236 | |
| 1237 | def _apply_diffs( |
| 1238 | self, |
| 1239 | registry_diff: RegistryDiff, |
| 1240 | infra_diff: InfraDiff, |
| 1241 | new_infra: Infra, |
| 1242 | progress_ctx: Optional["ApplyProgressContext"] = None, |
| 1243 | no_promote: bool = False, |
| 1244 | ): |
| 1245 | """Applies the given diffs to the metadata store and infrastructure. |
| 1246 | |
| 1247 | Args: |
| 1248 | registry_diff: The diff between the current registry and the desired registry. |
| 1249 | infra_diff: The diff between the current infra and the desired infra. |
| 1250 | new_infra: The desired infra. |
| 1251 | progress_ctx: Optional progress context for tracking apply progress. |
| 1252 | """ |
| 1253 | try: |
| 1254 | # Infrastructure phase |
| 1255 | if progress_ctx: |
| 1256 | infra_ops_count = len(infra_diff.infra_object_diffs) |
| 1257 | progress_ctx.start_phase("Updating infrastructure", infra_ops_count) |
| 1258 | |
| 1259 | infra_diff.update(progress_ctx=progress_ctx) |
| 1260 | |
| 1261 | if progress_ctx: |
| 1262 | progress_ctx.complete_phase() |
| 1263 | progress_ctx.start_phase("Updating registry", 2) |
| 1264 | |
| 1265 | # Registry phase |
| 1266 | apply_diff_to_registry( |
| 1267 | self.registry, |
| 1268 | registry_diff, |
| 1269 | self.project, |
| 1270 | commit=False, |
| 1271 | no_promote=no_promote, |
| 1272 | ) |
| 1273 | |
| 1274 | if progress_ctx: |
| 1275 | progress_ctx.update_phase_progress("Committing registry changes") |
| 1276 | |
| 1277 | self.registry.update_infra(new_infra, self.project, commit=True) |
| 1278 | |
| 1279 | if progress_ctx: |
| 1280 | progress_ctx.update_phase_progress("Registry update complete") |
| 1281 | progress_ctx.complete_phase() |
| 1282 | finally: |
| 1283 | # Always cleanup progress bars |
| 1284 | if progress_ctx: |
| 1285 | progress_ctx.cleanup() |
| 1286 | |
| 1287 | # Emit OpenLineage events for applied objects |
| 1288 | self._emit_openlineage_apply_diffs(registry_diff) |
| 1289 | |
| 1290 | # Emit MLflow events for applied objects (Phase 7) |
| 1291 | self._mlflow_log_apply_diffs(registry_diff) |
| 1292 | |
| 1293 | def _mlflow_log_apply_diffs(self, registry_diff: RegistryDiff): |
| 1294 | """Log apply operation to MLflow ops experiment.""" |
no test coverage detected