MCPcopy
hub / github.com/PeerDB-io/peerdb / Test_Multi_Table_SF

Method Test_Multi_Table_SF

flow/e2e/snowflake_test.go:550–600  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

548}
549
550func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() {
551 tc := NewTemporalClient(s.t)
552
553 srcTable1Name := s.attachSchemaSuffix("test1_sf")
554 srcTable2Name := s.attachSchemaSuffix("test2_sf")
555 dstTable1Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test1_sf")
556 dstTable2Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test2_sf")
557
558 _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
559 CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text);
560 CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text);
561 `, srcTable1Name, srcTable2Name))
562 require.NoError(s.t, err)
563
564 connectionGen := FlowConnectionGenerationConfig{
565 FlowJobName: s.attachSuffix("test_multi_table"),
566 TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name},
567 Destination: s.Peer().Name,
568 }
569
570 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
571 flowConnConfig.MaxBatchSize = 100
572
573 // wait for PeerFlowStatusQuery to finish setup
574 // and execute a transaction touching toast columns
575 env := ExecutePeerflow(s.t, tc, flowConnConfig)
576 SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
577 /* inserting across multiple tables*/
578 _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
579 INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
580 INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
581 `, srcTable1Name, srcTable2Name))
582 EnvNoError(s.t, env, err)
583
584 EnvWaitFor(s.t, env, 2*time.Minute, "normalize both tables", func() bool {
585 count1, err := s.sfHelper.CountRows(s.t.Context(), "test1_sf")
586 if err != nil {
587 return false
588 }
589 count2, err := s.sfHelper.CountRows(s.t.Context(), "test2_sf")
590 if err != nil {
591 return false
592 }
593
594 return count1 == 1 && count2 == 1
595 })
596
597 env.Cancel(s.t.Context())
598
599 RequireEnvCanceled(s.t, env)
600}
601
602func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
603 tc := NewTemporalClient(s.t)

Callers

nothing calls this directly

Calls 14

attachSchemaSuffixMethod · 0.95
ConnMethod · 0.95
attachSuffixMethod · 0.95
PeerMethod · 0.95
NewTemporalClientFunction · 0.85
ExecutePeerflowFunction · 0.85
SetupCDCFlowStatusQueryFunction · 0.85
EnvNoErrorFunction · 0.85
EnvWaitForFunction · 0.85
RequireEnvCanceledFunction · 0.85
ExecMethod · 0.65

Tested by

no test coverage detected