()
| 600 | } |
| 601 | |
| 602 | func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { |
| 603 | tc := NewTemporalClient(s.t) |
| 604 | |
| 605 | srcTableName := s.attachSchemaSuffix("test_simple_cpkey") |
| 606 | dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey") |
| 607 | |
| 608 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 609 | CREATE TABLE IF NOT EXISTS %s ( |
| 610 | id INT GENERATED ALWAYS AS IDENTITY, |
| 611 | c1 INT GENERATED BY DEFAULT AS IDENTITY, |
| 612 | c2 INT, |
| 613 | t TEXT, |
| 614 | PRIMARY KEY(id,t) |
| 615 | ); |
| 616 | `, srcTableName)) |
| 617 | require.NoError(s.t, err) |
| 618 | |
| 619 | connectionGen := FlowConnectionGenerationConfig{ |
| 620 | FlowJobName: s.attachSuffix("test_cpkey_flow"), |
| 621 | TableNameMapping: map[string]string{srcTableName: dstTableName}, |
| 622 | Destination: s.Peer().Name, |
| 623 | } |
| 624 | |
| 625 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 626 | flowConnConfig.MaxBatchSize = 100 |
| 627 | |
| 628 | // wait for PeerFlowStatusQuery to finish setup |
| 629 | // and then insert, update and delete rows in the table. |
| 630 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 631 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 632 | // insert 10 rows into the source table |
| 633 | for i := range 10 { |
| 634 | testValue := fmt.Sprintf("test_value_%d", i) |
| 635 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 636 | INSERT INTO %s(c2,t) VALUES ($1,$2) |
| 637 | `, srcTableName), i, testValue) |
| 638 | EnvNoError(s.t, env, err) |
| 639 | } |
| 640 | s.t.Log("Inserted 10 rows into the source table") |
| 641 | |
| 642 | EnvWaitForEqualTables(env, s, "normalize table", "test_simple_cpkey", "id,c1,c2,t") |
| 643 | |
| 644 | _, err = s.Conn().Exec(s.t.Context(), |
| 645 | fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) |
| 646 | EnvNoError(s.t, env, err) |
| 647 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) |
| 648 | EnvNoError(s.t, env, err) |
| 649 | EnvWaitForEqualTables(env, s, "normalize update/delete", "test_simple_cpkey", "id,c1,c2,t") |
| 650 | |
| 651 | env.Cancel(s.t.Context()) |
| 652 | |
| 653 | RequireEnvCanceled(s.t, env) |
| 654 | } |
| 655 | |
| 656 | func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { |
| 657 | tc := NewTemporalClient(s.t) |
nothing calls this directly
no test coverage detected