| 262 | } |
| 263 | |
| 264 | func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowConnectionConfigs) { |
| 265 | t.Helper() |
| 266 | pool, err := internal.GetCatalogConnectionPoolFromEnv(t.Context()) |
| 267 | if err != nil { |
| 268 | env.Cancel(t.Context()) |
| 269 | t.Fatal("could not get catalog connection", err) |
| 270 | } |
| 271 | // errors expected while PeerFlowStatusQuery is setup |
| 272 | counter := 0 |
| 273 | for { |
| 274 | time.Sleep(time.Second) |
| 275 | counter++ |
| 276 | status, err := internal.GetWorkflowStatus(t.Context(), pool, env.GetID()) |
| 277 | if err == nil { |
| 278 | if status == protos.FlowStatus_STATUS_RUNNING || status == protos.FlowStatus_STATUS_COMPLETED { |
| 279 | return |
| 280 | } else if counter > 60 { |
| 281 | env.Cancel(t.Context()) |
| 282 | t.Fatal("UNEXPECTED STATUS TIMEOUT", status) |
| 283 | } |
| 284 | } else if counter > 30 { |
| 285 | env.Cancel(t.Context()) |
| 286 | t.Fatal("UNEXPECTED STATUS QUERY TIMEOUT", err.Error()) |
| 287 | } else if counter > 5 { |
| 288 | // log the error for informational purposes |
| 289 | t.Log(err.Error()) |
| 290 | } |
| 291 | } |
| 292 | } |
| 293 | |
| 294 | func CreateTableForQRep(ctx context.Context, conn *pgx.Conn, suffix string, tableName string) error { |
| 295 | createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" |