MCPcopy
hub / github.com/PeerDB-io/peerdb / TestTerminateDuringResyncDropFlow

Method TestTerminateDuringResyncDropFlow

flow/e2e/drop_flow_test.go:21–101  ·  view source on GitHub ↗

TestTerminateDuringResyncDropFlow creates a pipe, resyncs it, then sends a TERMINATING signal while the DropFlowWorkflow is blocked dropping the replication slot. An idle open transaction on the source keeps the slot active (pg_drop_replication_slot blocks while the slot's xmin cannot advance). Once

()

Source from the content-addressed store, hash-verified

19// signal goroutine has already set TerminateSignal, so it does NOT ContinueAsNew back
20// to CDCFlowWorkflow and the pipe is fully torn down.
21func (s APITestSuite) TestTerminateDuringResyncDropFlow() {
22 if _, ok := s.source.(*PostgresSource); !ok {
23 s.t.Skip("only testing with PostgreSQL source")
24 }
25
26 // Open a separate raw PG connection and start a transaction that will block
27 // pg_drop_replication_slot during the resync drop flow.
28 pgCfg := internal.GetAncillaryPostgresConfigFromEnv()
29 blockConn, err := pgx.Connect(s.t.Context(), internal.GetPGConnectionString(pgCfg, ""))
30 require.NoError(s.t, err)
31 defer blockConn.Close(s.t.Context())
32
33 blockTx, err := blockConn.Begin(s.t.Context())
34 require.NoError(s.t, err)
35 // Acquire a transaction ID; keeping this transaction open prevents the replication
36 // slot's xmin from advancing, causing pg_drop_replication_slot to block.
37 var xid uint32
38 require.NoError(s.t, blockTx.QueryRow(s.t.Context(), "SELECT txid_current()").Scan(&xid))
39 s.t.Logf("blocking transaction XID: %d", xid)
40
41 tableName := "terminate_resync"
42 require.NoError(s.t, s.source.Exec(s.t.Context(),
43 fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName))))
44 require.NoError(s.t, s.source.Exec(s.t.Context(),
45 fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName))))
46
47 connectionGen := FlowConnectionGenerationConfig{
48 FlowJobName: "terminate_resync_" + s.suffix,
49 TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName},
50 Destination: s.ch.Peer().Name,
51 }
52 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
53 flowConnConfig.DoInitialSnapshot = true
54
55 response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
56 require.NoError(s.t, err)
57 require.NotNil(s.t, response)
58
59 tc := NewTemporalClient(s.t)
60 env, err := GetPeerflow(s.t.Context(), s.catalog, tc, flowConnConfig.FlowJobName)
61 require.NoError(s.t, err)
62
63 // Wait for STATUS_SNAPSHOT without using SetupCDCFlowStatusQuery, which would
64 // t.Fatal after 60s if the flow stays in STATUS_SNAPSHOT. We query the catalog
65 // directly so "not found yet" errors are silently skipped.
66 EnvWaitFor(s.t, env, 3*time.Minute, "wait for initial snapshot to start", func() bool {
67 status, err := internal.GetWorkflowStatus(s.t.Context(), s.catalog, env.GetID())
68 return err == nil && status == protos.FlowStatus_STATUS_SNAPSHOT
69 })
70
71 // Trigger resync: CDC flow ContinueAsNew → DropFlowWorkflow(Resync=true).
72 // The drop flow will block on pg_drop_replication_slot because of the open transaction.
73 _, err = s.FlowStateChange(s.t.Context(), &protos.FlowStateChangeRequest{
74 FlowJobName: flowConnConfig.FlowJobName,
75 RequestedFlowState: protos.FlowStatus_STATUS_RESYNC,
76 })
77 require.NoError(s.t, err)
78

Callers

nothing calls this directly

Calls 15

waitForFlowDroppedMethod · 0.95
AttachSchemaFunction · 0.85
NewTemporalClientFunction · 0.85
GetPeerflowFunction · 0.85
EnvWaitForFunction · 0.85
CreateCDCFlowMethod · 0.80
FlowStateChangeMethod · 0.80
RollbackMethod · 0.80
CloseMethod · 0.65
ExecMethod · 0.65
PeerMethod · 0.65

Tested by

no test coverage detected