MCPcopy
hub / github.com/PeerDB-io/peerdb / SetupCDCFlowStatusQuery

Function SetupCDCFlowStatusQuery

flow/e2e/test_utils.go:264–292  ·  view source on GitHub ↗
(t *testing.T, env WorkflowRun, config *protos.FlowConnectionConfigs)

Source from the content-addressed store, hash-verified

262}
263
264func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowConnectionConfigs) {
265 t.Helper()
266 pool, err := internal.GetCatalogConnectionPoolFromEnv(t.Context())
267 if err != nil {
268 env.Cancel(t.Context())
269 t.Fatal("could not get catalog connection", err)
270 }
271 // errors expected while PeerFlowStatusQuery is setup
272 counter := 0
273 for {
274 time.Sleep(time.Second)
275 counter++
276 status, err := internal.GetWorkflowStatus(t.Context(), pool, env.GetID())
277 if err == nil {
278 if status == protos.FlowStatus_STATUS_RUNNING || status == protos.FlowStatus_STATUS_COMPLETED {
279 return
280 } else if counter > 60 {
281 env.Cancel(t.Context())
282 t.Fatal("UNEXPECTED STATUS TIMEOUT", status)
283 }
284 } else if counter > 30 {
285 env.Cancel(t.Context())
286 t.Fatal("UNEXPECTED STATUS QUERY TIMEOUT", err.Error())
287 } else if counter > 5 {
288 // log the error for informational purposes
289 t.Log(err.Error())
290 }
291 }
292}
293
294func CreateTableForQRep(ctx context.Context, conn *pgx.Conn, suffix string, tableName string) error {
295 createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"

Calls 3

LogMethod · 0.80
CancelMethod · 0.65
ErrorMethod · 0.45