TestTerminateDuringResyncDropFlow creates a pipe, resyncs it, then sends a TERMINATING signal while the DropFlowWorkflow is blocked dropping the replication slot. An idle open transaction on the source keeps the slot active (pg_drop_replication_slot blocks while the slot's xmin cannot advance). Once
()
| 19 | // signal goroutine has already set TerminateSignal, so it does NOT ContinueAsNew back |
| 20 | // to CDCFlowWorkflow and the pipe is fully torn down. |
| 21 | func (s APITestSuite) TestTerminateDuringResyncDropFlow() { |
| 22 | if _, ok := s.source.(*PostgresSource); !ok { |
| 23 | s.t.Skip("only testing with PostgreSQL source") |
| 24 | } |
| 25 | |
| 26 | // Open a separate raw PG connection and start a transaction that will block |
| 27 | // pg_drop_replication_slot during the resync drop flow. |
| 28 | pgCfg := internal.GetAncillaryPostgresConfigFromEnv() |
| 29 | blockConn, err := pgx.Connect(s.t.Context(), internal.GetPGConnectionString(pgCfg, "")) |
| 30 | require.NoError(s.t, err) |
| 31 | defer blockConn.Close(s.t.Context()) |
| 32 | |
| 33 | blockTx, err := blockConn.Begin(s.t.Context()) |
| 34 | require.NoError(s.t, err) |
| 35 | // Acquire a transaction ID; keeping this transaction open prevents the replication |
| 36 | // slot's xmin from advancing, causing pg_drop_replication_slot to block. |
| 37 | var xid uint32 |
| 38 | require.NoError(s.t, blockTx.QueryRow(s.t.Context(), "SELECT txid_current()").Scan(&xid)) |
| 39 | s.t.Logf("blocking transaction XID: %d", xid) |
| 40 | |
| 41 | tableName := "terminate_resync" |
| 42 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 43 | fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) |
| 44 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 45 | fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) |
| 46 | |
| 47 | connectionGen := FlowConnectionGenerationConfig{ |
| 48 | FlowJobName: "terminate_resync_" + s.suffix, |
| 49 | TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, |
| 50 | Destination: s.ch.Peer().Name, |
| 51 | } |
| 52 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 53 | flowConnConfig.DoInitialSnapshot = true |
| 54 | |
| 55 | response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) |
| 56 | require.NoError(s.t, err) |
| 57 | require.NotNil(s.t, response) |
| 58 | |
| 59 | tc := NewTemporalClient(s.t) |
| 60 | env, err := GetPeerflow(s.t.Context(), s.catalog, tc, flowConnConfig.FlowJobName) |
| 61 | require.NoError(s.t, err) |
| 62 | |
| 63 | // Wait for STATUS_SNAPSHOT without using SetupCDCFlowStatusQuery, which would |
| 64 | // t.Fatal after 60s if the flow stays in STATUS_SNAPSHOT. We query the catalog |
| 65 | // directly so "not found yet" errors are silently skipped. |
| 66 | EnvWaitFor(s.t, env, 3*time.Minute, "wait for initial snapshot to start", func() bool { |
| 67 | status, err := internal.GetWorkflowStatus(s.t.Context(), s.catalog, env.GetID()) |
| 68 | return err == nil && status == protos.FlowStatus_STATUS_SNAPSHOT |
| 69 | }) |
| 70 | |
| 71 | // Trigger resync: CDC flow ContinueAsNew → DropFlowWorkflow(Resync=true). |
| 72 | // The drop flow will block on pg_drop_replication_slot because of the open transaction. |
| 73 | _, err = s.FlowStateChange(s.t.Context(), &protos.FlowStateChangeRequest{ |
| 74 | FlowJobName: flowConnConfig.FlowJobName, |
| 75 | RequestedFlowState: protos.FlowStatus_STATUS_RESYNC, |
| 76 | }) |
| 77 | require.NoError(s.t, err) |
| 78 |
nothing calls this directly
no test coverage detected