MCPcopy
hub / github.com/PeerDB-io/peerdb / FlowStateChange

Method FlowStateChange

flow/cmd/handler.go:423–561  ·  view source on GitHub ↗
(
	ctx context.Context,
	req *protos.FlowStateChangeRequest,
)

Source from the content-addressed store, hash-verified

421}
422
423func (h *FlowRequestHandler) FlowStateChange(
424 ctx context.Context,
425 req *protos.FlowStateChangeRequest,
426) (*protos.FlowStateChangeResponse, APIError) {
427 logs := slog.String(string(shared.FlowNameKey), req.FlowJobName)
428 slog.InfoContext(ctx, "FlowStateChange called", logs, slog.Any("req", req))
429 if underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil); err != nil {
430 slog.ErrorContext(ctx, "unable to check maintenance mode", logs, slog.Any("error", err))
431 return nil, NewInternalApiError(fmt.Errorf("unable to check maintenance mode: %w", err))
432 } else if underMaintenance {
433 slog.WarnContext(ctx, "Flow state change request denied due to maintenance", logs)
434 return nil, NewUnavailableApiError(ErrUnderMaintenance)
435 }
436
437 workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
438 if err != nil {
439 slog.ErrorContext(ctx, "[flow-state-change] unable to get workflowID", logs, slog.Any("error", err))
440 if _, ok := errors.AsType[*exceptions.NotFoundError](err); ok {
441 return nil, NewNotFoundApiError(fmt.Errorf("flow %s not found", req.FlowJobName))
442 }
443 return nil, NewInternalApiError(fmt.Errorf("unable to get workflowID: %w", err))
444 }
445 currState, err := h.getWorkflowStatus(ctx, workflowID)
446 if err != nil {
447 slog.ErrorContext(ctx, "[flow-state-change] unable to get workflow status", logs, slog.Any("error", err))
448 return nil, NewInternalApiError(err)
449 }
450
451 if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil &&
452 // Don't allow config updates if the flow is already in a terminal state since it can lead to confusion
453 // where the config is updated but the flow is not reflecting those changes since it's already completed/failed
454 (currState != protos.FlowStatus_STATUS_TERMINATED &&
455 currState != protos.FlowStatus_STATUS_TERMINATING &&
456 currState != protos.FlowStatus_STATUS_FAILED &&
457 currState != protos.FlowStatus_STATUS_COMPLETED) {
458 if err := model.CDCDynamicPropertiesSignal.SignalClientWorkflow(
459 ctx,
460 h.temporalClient,
461 workflowID,
462 "",
463 req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
464 ); err != nil {
465 slog.ErrorContext(ctx, "unable to signal workflow update", logs, slog.Any("error", err))
466 return nil, NewInternalApiError(fmt.Errorf("unable to signal workflow update: %w", err))
467 }
468 telemetry.LogActivityStartFlowConfigUpdate(ctx, req.FlowJobName, req.FlowConfigUpdate.GetCdcFlowConfigUpdate())
469 h.alerter.LogFlowInfo(ctx, req.FlowJobName, "Flow config update signaled")
470 }
471
472 slog.InfoContext(ctx, "[flow-state-change] received request", logs,
473 slog.Any("requestedFlowState", req.RequestedFlowState), slog.Any("currState", currState))
474 if req.RequestedFlowState != currState {
475 var changeErr error
476 switch req.RequestedFlowState {
477 case protos.FlowStatus_STATUS_PAUSED:
478 if currState == protos.FlowStatus_STATUS_RUNNING {
479 changeErr = model.FlowSignal.SignalClientWorkflow(ctx, h.temporalClient, workflowID, "", model.PauseSignal)
480 if changeErr == nil {

Calls 15

getWorkflowIDMethod · 0.95
getWorkflowStatusMethod · 0.95
isCDCFlowMethod · 0.95
ValidateCDCMirrorMethod · 0.95
dropFlowMethod · 0.95
shutdownFlowMethod · 0.95
NewInternalApiErrorFunction · 0.85
NewUnavailableApiErrorFunction · 0.85
NewNotFoundApiErrorFunction · 0.85