MCPcopy
hub / github.com/PeerDB-io/peerdb / TestResyncSourceTableMissing

Method TestResyncSourceTableMissing

flow/e2e/api_test.go:1841–1952  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1839}
1840
1841func (s APITestSuite) TestResyncSourceTableMissing() {
1842 tableNames := []string{"missing_src_a", "missing_src_b"}
1843 qualifiedSourceTables := []string{AttachSchema(s, tableNames[0]), AttachSchema(s, tableNames[1])}
1844 var cols string
1845 switch s.source.(type) {
1846 case *PostgresSource, *MySqlSource:
1847 for _, qt := range qualifiedSourceTables {
1848 require.NoError(s.t, s.source.Exec(s.t.Context(),
1849 fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", qt)))
1850 require.NoError(s.t, s.source.Exec(s.t.Context(),
1851 fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", qt)))
1852 }
1853 cols = "id,val"
1854 case *MongoSource:
1855 for _, tn := range tableNames {
1856 res, err := s.Source().(*MongoSource).AdminClient().
1857 Database(Schema(s)).Collection(tn).
1858 InsertOne(s.t.Context(), bson.D{bson.E{Key: "id", Value: 1}, bson.E{Key: "val", Value: "first"}}, options.InsertOne())
1859 require.NoError(s.t, err)
1860 require.True(s.t, res.Acknowledged)
1861 }
1862 cols = fmt.Sprintf("%s,%s", connmongo.DefaultDocumentKeyColumnName, connmongo.DefaultFullDocumentColumnName)
1863 default:
1864 require.Fail(s.t, fmt.Sprintf("unknown source type %T", s.source))
1865 }
1866
1867 connectionGen := FlowConnectionGenerationConfig{
1868 FlowJobName: "resync_missing_" + s.suffix,
1869 TableNameMapping: map[string]string{
1870 qualifiedSourceTables[0]: tableNames[0],
1871 qualifiedSourceTables[1]: tableNames[1],
1872 },
1873 Destination: s.ch.Peer().Name,
1874 }
1875 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
1876 flowConnConfig.DoInitialSnapshot = true
1877 flowConnConfig.InitialSnapshotOnly = true
1878
1879 response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
1880 require.NoError(s.t, err)
1881 require.NotNil(s.t, response)
1882
1883 tc := NewTemporalClient(s.t)
1884 env, err := GetPeerflow(s.t.Context(), s.catalog, tc, flowConnConfig.FlowJobName)
1885 require.NoError(s.t, err)
1886 SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
1887 EnvWaitForFinished(s.t, env, 3*time.Minute)
1888 for _, tn := range tableNames {
1889 RequireEqualTables(s.ch, tn, cols)
1890 }
1891
1892 switch src := s.source.(type) {
1893 case *PostgresSource, *MySqlSource:
1894 for _, qt := range qualifiedSourceTables {
1895 require.NoError(s.t, s.source.Exec(s.t.Context(), "DROP TABLE "+qt))
1896 }
1897 case *MongoSource:
1898 for _, tn := range tableNames {

Callers

nothing calls this directly

Calls 15

SourceMethod · 0.95
AttachSchemaFunction · 0.85
SchemaFunction · 0.85
NewTemporalClientFunction · 0.85
GetPeerflowFunction · 0.85
SetupCDCFlowStatusQueryFunction · 0.85
EnvWaitForFinishedFunction · 0.85
RequireEqualTablesFunction · 0.85
AdminClientMethod · 0.80
CreateCDCFlowMethod · 0.80
FlowStateChangeMethod · 0.80

Tested by

no test coverage detected