| 671 | } |
| 672 | |
| 673 | func ExecutePeerflow(t *testing.T, tc client.Client, config *protos.FlowConnectionConfigs) WorkflowRun { |
| 674 | t.Helper() |
| 675 | |
| 676 | client, err := NewApiClient() |
| 677 | require.NoError(t, err) |
| 678 | res, err := client.CreateCDCFlow(t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: config}) |
| 679 | require.NoError(t, err) |
| 680 | return WorkflowRun{ |
| 681 | WorkflowRun: tc.GetWorkflow(t.Context(), res.WorkflowId, ""), |
| 682 | c: tc, |
| 683 | } |
| 684 | } |
| 685 | |
| 686 | func ExecuteDropFlow(ctx context.Context, tc client.Client, config *protos.FlowConnectionConfigs, tableMappingsVersion int64) WorkflowRun { |
| 687 | return ExecuteWorkflow(ctx, tc, shared.PeerFlowTaskQueue, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ |