MCPcopy
hub / github.com/PeerDB-io/peerdb / CreateQRepFlow

Method CreateQRepFlow

flow/cmd/handler.go:249–299  ·  view source on GitHub ↗
(
	ctx context.Context, req *protos.CreateQRepFlowRequest,
)

Source from the content-addressed store, hash-verified

247}
248
249func (h *FlowRequestHandler) CreateQRepFlow(
250 ctx context.Context, req *protos.CreateQRepFlowRequest,
251) (*protos.CreateQRepFlowResponse, APIError) {
252 cfg := req.QrepConfig
253 if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, cfg.Env); err != nil {
254 return nil, NewInternalApiError(err)
255 } else {
256 cfg.Version = internalVersion
257 }
258 if flags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName); err != nil {
259 return nil, NewInternalApiError(err)
260 } else {
261 cfg.Flags = flags
262 }
263
264 workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
265 workflowOptions := client.StartWorkflowOptions{
266 ID: workflowID,
267 TaskQueue: h.peerflowTaskQueueID,
268 TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName),
269 }
270 if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil {
271 slog.ErrorContext(ctx, "unable to create flow job entry",
272 slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
273 return nil, NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err))
274 }
275 dbtype, err := connectors.LoadPeerType(ctx, h.pool, cfg.SourceName)
276 if err != nil {
277 return nil, NewInternalApiError(err)
278 }
279 var workflowFn any
280 if dbtype == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
281 workflowFn = peerflow.XminFlowWorkflow
282 } else {
283 workflowFn = peerflow.QRepFlowWorkflow
284 }
285
286 cfg.ParentMirrorName = cfg.FlowJobName
287
288 if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil); err != nil {
289 slog.ErrorContext(ctx, "unable to start QRepFlow workflow",
290 slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
291 return nil, NewInternalApiError(fmt.Errorf("unable to start QRepFlow workflow: %w", err))
292 }
293
294 telemetry.LogActivityCreateFlow(ctx, cfg.FlowJobName)
295
296 return &protos.CreateQRepFlowResponse{
297 WorkflowId: workflowID,
298 }, nil
299}
300
301func (h *FlowRequestHandler) dropFlow(
302 ctx context.Context,

Callers 3

RunQRepFlowWorkflowFunction · 0.80
TestQRepMethod · 0.80
TestDropQRepMethod · 0.80

Calls 5

determineFlagsMethod · 0.95
createQRepJobEntryMethod · 0.95
NewInternalApiErrorFunction · 0.85
ExecuteWorkflowMethod · 0.80
StringMethod · 0.45

Tested by 2

TestQRepMethod · 0.64
TestDropQRepMethod · 0.64