()
| 390 | } |
| 391 | |
| 392 | func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { |
| 393 | tc := NewTemporalClient(s.t) |
| 394 | |
| 395 | srcTableName := s.attachSchemaSuffix("test_toast_sf_5") |
| 396 | dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") |
| 397 | |
| 398 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 399 | CREATE TABLE IF NOT EXISTS %s ( |
| 400 | id SERIAL PRIMARY KEY, |
| 401 | t1 text, |
| 402 | t2 text, |
| 403 | k int |
| 404 | ); |
| 405 | `, srcTableName)) |
| 406 | require.NoError(s.t, err) |
| 407 | |
| 408 | connectionGen := FlowConnectionGenerationConfig{ |
| 409 | FlowJobName: s.attachSuffix("test_toast_sf_5"), |
| 410 | TableNameMapping: map[string]string{srcTableName: dstTableName}, |
| 411 | Destination: s.Peer().Name, |
| 412 | } |
| 413 | |
| 414 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 415 | flowConnConfig.MaxBatchSize = 100 |
| 416 | |
| 417 | // wait for PeerFlowStatusQuery to finish setup |
| 418 | // and execute a transaction touching toast columns |
| 419 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 420 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 421 | /* |
| 422 | transaction updating a single row |
| 423 | multiple times with changed/unchanged toast columns |
| 424 | */ |
| 425 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 426 | BEGIN; |
| 427 | INSERT INTO %s (t1,t2,k) SELECT random_string(9000),random_string(9000), |
| 428 | 1 FROM generate_series(1,1); |
| 429 | UPDATE %s SET k=102 WHERE id=1; |
| 430 | UPDATE %s SET t1='dummy' WHERE id=1; |
| 431 | UPDATE %s SET t2='dummy' WHERE id=1; |
| 432 | END; |
| 433 | `, srcTableName, srcTableName, srcTableName, srcTableName)) |
| 434 | EnvNoError(s.t, env, err) |
| 435 | s.t.Log("Executed a transaction touching toast columns") |
| 436 | |
| 437 | EnvWaitForEqualTables(env, s, "normalizing tx", "test_toast_sf_5", `id,t1,t2,k`) |
| 438 | env.Cancel(s.t.Context()) |
| 439 | |
| 440 | RequireEnvCanceled(s.t, env) |
| 441 | } |
| 442 | |
| 443 | func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { |
| 444 | tc := NewTemporalClient(s.t) |
nothing calls this directly
no test coverage detected