()
| 548 | } |
| 549 | |
| 550 | func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { |
| 551 | tc := NewTemporalClient(s.t) |
| 552 | |
| 553 | srcTable1Name := s.attachSchemaSuffix("test1_sf") |
| 554 | srcTable2Name := s.attachSchemaSuffix("test2_sf") |
| 555 | dstTable1Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test1_sf") |
| 556 | dstTable2Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test2_sf") |
| 557 | |
| 558 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 559 | CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); |
| 560 | CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); |
| 561 | `, srcTable1Name, srcTable2Name)) |
| 562 | require.NoError(s.t, err) |
| 563 | |
| 564 | connectionGen := FlowConnectionGenerationConfig{ |
| 565 | FlowJobName: s.attachSuffix("test_multi_table"), |
| 566 | TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, |
| 567 | Destination: s.Peer().Name, |
| 568 | } |
| 569 | |
| 570 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 571 | flowConnConfig.MaxBatchSize = 100 |
| 572 | |
| 573 | // wait for PeerFlowStatusQuery to finish setup |
| 574 | // and execute a transaction touching toast columns |
| 575 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 576 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 577 | /* inserting across multiple tables*/ |
| 578 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 579 | INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); |
| 580 | INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); |
| 581 | `, srcTable1Name, srcTable2Name)) |
| 582 | EnvNoError(s.t, env, err) |
| 583 | |
| 584 | EnvWaitFor(s.t, env, 2*time.Minute, "normalize both tables", func() bool { |
| 585 | count1, err := s.sfHelper.CountRows(s.t.Context(), "test1_sf") |
| 586 | if err != nil { |
| 587 | return false |
| 588 | } |
| 589 | count2, err := s.sfHelper.CountRows(s.t.Context(), "test2_sf") |
| 590 | if err != nil { |
| 591 | return false |
| 592 | } |
| 593 | |
| 594 | return count1 == 1 && count2 == 1 |
| 595 | }) |
| 596 | |
| 597 | env.Cancel(s.t.Context()) |
| 598 | |
| 599 | RequireEnvCanceled(s.t, env) |
| 600 | } |
| 601 | |
| 602 | func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { |
| 603 | tc := NewTemporalClient(s.t) |
nothing calls this directly
no test coverage detected