MCPcopy
hub / github.com/PeerDB-io/peerdb / Test_Composite_PKey_SF

Method Test_Composite_PKey_SF

flow/e2e/snowflake_test.go:602–654  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

600}
601
602func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
603 tc := NewTemporalClient(s.t)
604
605 srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
606 dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey")
607
608 _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
609 CREATE TABLE IF NOT EXISTS %s (
610 id INT GENERATED ALWAYS AS IDENTITY,
611 c1 INT GENERATED BY DEFAULT AS IDENTITY,
612 c2 INT,
613 t TEXT,
614 PRIMARY KEY(id,t)
615 );
616 `, srcTableName))
617 require.NoError(s.t, err)
618
619 connectionGen := FlowConnectionGenerationConfig{
620 FlowJobName: s.attachSuffix("test_cpkey_flow"),
621 TableNameMapping: map[string]string{srcTableName: dstTableName},
622 Destination: s.Peer().Name,
623 }
624
625 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
626 flowConnConfig.MaxBatchSize = 100
627
628 // wait for PeerFlowStatusQuery to finish setup
629 // and then insert, update and delete rows in the table.
630 env := ExecutePeerflow(s.t, tc, flowConnConfig)
631 SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
632 // insert 10 rows into the source table
633 for i := range 10 {
634 testValue := fmt.Sprintf("test_value_%d", i)
635 _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
636 INSERT INTO %s(c2,t) VALUES ($1,$2)
637 `, srcTableName), i, testValue)
638 EnvNoError(s.t, env, err)
639 }
640 s.t.Log("Inserted 10 rows into the source table")
641
642 EnvWaitForEqualTables(env, s, "normalize table", "test_simple_cpkey", "id,c1,c2,t")
643
644 _, err = s.Conn().Exec(s.t.Context(),
645 fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
646 EnvNoError(s.t, env, err)
647 _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
648 EnvNoError(s.t, env, err)
649 EnvWaitForEqualTables(env, s, "normalize update/delete", "test_simple_cpkey", "id,c1,c2,t")
650
651 env.Cancel(s.t.Context())
652
653 RequireEnvCanceled(s.t, env)
654}
655
656func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() {
657 tc := NewTemporalClient(s.t)

Callers

nothing calls this directly

Calls 14

attachSchemaSuffixMethod · 0.95
ConnMethod · 0.95
attachSuffixMethod · 0.95
PeerMethod · 0.95
NewTemporalClientFunction · 0.85
ExecutePeerflowFunction · 0.85
SetupCDCFlowStatusQueryFunction · 0.85
EnvNoErrorFunction · 0.85
EnvWaitForEqualTablesFunction · 0.85
RequireEnvCanceledFunction · 0.85
LogMethod · 0.80

Tested by

no test coverage detected