| 421 | } |
| 422 | |
| 423 | func (h *FlowRequestHandler) FlowStateChange( |
| 424 | ctx context.Context, |
| 425 | req *protos.FlowStateChangeRequest, |
| 426 | ) (*protos.FlowStateChangeResponse, APIError) { |
| 427 | logs := slog.String(string(shared.FlowNameKey), req.FlowJobName) |
| 428 | slog.InfoContext(ctx, "FlowStateChange called", logs, slog.Any("req", req)) |
| 429 | if underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil); err != nil { |
| 430 | slog.ErrorContext(ctx, "unable to check maintenance mode", logs, slog.Any("error", err)) |
| 431 | return nil, NewInternalApiError(fmt.Errorf("unable to check maintenance mode: %w", err)) |
| 432 | } else if underMaintenance { |
| 433 | slog.WarnContext(ctx, "Flow state change request denied due to maintenance", logs) |
| 434 | return nil, NewUnavailableApiError(ErrUnderMaintenance) |
| 435 | } |
| 436 | |
| 437 | workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) |
| 438 | if err != nil { |
| 439 | slog.ErrorContext(ctx, "[flow-state-change] unable to get workflowID", logs, slog.Any("error", err)) |
| 440 | if _, ok := errors.AsType[*exceptions.NotFoundError](err); ok { |
| 441 | return nil, NewNotFoundApiError(fmt.Errorf("flow %s not found", req.FlowJobName)) |
| 442 | } |
| 443 | return nil, NewInternalApiError(fmt.Errorf("unable to get workflowID: %w", err)) |
| 444 | } |
| 445 | currState, err := h.getWorkflowStatus(ctx, workflowID) |
| 446 | if err != nil { |
| 447 | slog.ErrorContext(ctx, "[flow-state-change] unable to get workflow status", logs, slog.Any("error", err)) |
| 448 | return nil, NewInternalApiError(err) |
| 449 | } |
| 450 | |
| 451 | if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil && |
| 452 | // Don't allow config updates if the flow is already in a terminal state since it can lead to confusion |
| 453 | // where the config is updated but the flow is not reflecting those changes since it's already completed/failed |
| 454 | (currState != protos.FlowStatus_STATUS_TERMINATED && |
| 455 | currState != protos.FlowStatus_STATUS_TERMINATING && |
| 456 | currState != protos.FlowStatus_STATUS_FAILED && |
| 457 | currState != protos.FlowStatus_STATUS_COMPLETED) { |
| 458 | if err := model.CDCDynamicPropertiesSignal.SignalClientWorkflow( |
| 459 | ctx, |
| 460 | h.temporalClient, |
| 461 | workflowID, |
| 462 | "", |
| 463 | req.FlowConfigUpdate.GetCdcFlowConfigUpdate(), |
| 464 | ); err != nil { |
| 465 | slog.ErrorContext(ctx, "unable to signal workflow update", logs, slog.Any("error", err)) |
| 466 | return nil, NewInternalApiError(fmt.Errorf("unable to signal workflow update: %w", err)) |
| 467 | } |
| 468 | telemetry.LogActivityStartFlowConfigUpdate(ctx, req.FlowJobName, req.FlowConfigUpdate.GetCdcFlowConfigUpdate()) |
| 469 | h.alerter.LogFlowInfo(ctx, req.FlowJobName, "Flow config update signaled") |
| 470 | } |
| 471 | |
| 472 | slog.InfoContext(ctx, "[flow-state-change] received request", logs, |
| 473 | slog.Any("requestedFlowState", req.RequestedFlowState), slog.Any("currState", currState)) |
| 474 | if req.RequestedFlowState != currState { |
| 475 | var changeErr error |
| 476 | switch req.RequestedFlowState { |
| 477 | case protos.FlowStatus_STATUS_PAUSED: |
| 478 | if currState == protos.FlowStatus_STATUS_RUNNING { |
| 479 | changeErr = model.FlowSignal.SignalClientWorkflow(ctx, h.temporalClient, workflowID, "", model.PauseSignal) |
| 480 | if changeErr == nil { |