()
| 1839 | } |
| 1840 | |
| 1841 | func (s APITestSuite) TestResyncSourceTableMissing() { |
| 1842 | tableNames := []string{"missing_src_a", "missing_src_b"} |
| 1843 | qualifiedSourceTables := []string{AttachSchema(s, tableNames[0]), AttachSchema(s, tableNames[1])} |
| 1844 | var cols string |
| 1845 | switch s.source.(type) { |
| 1846 | case *PostgresSource, *MySqlSource: |
| 1847 | for _, qt := range qualifiedSourceTables { |
| 1848 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 1849 | fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", qt))) |
| 1850 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 1851 | fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", qt))) |
| 1852 | } |
| 1853 | cols = "id,val" |
| 1854 | case *MongoSource: |
| 1855 | for _, tn := range tableNames { |
| 1856 | res, err := s.Source().(*MongoSource).AdminClient(). |
| 1857 | Database(Schema(s)).Collection(tn). |
| 1858 | InsertOne(s.t.Context(), bson.D{bson.E{Key: "id", Value: 1}, bson.E{Key: "val", Value: "first"}}, options.InsertOne()) |
| 1859 | require.NoError(s.t, err) |
| 1860 | require.True(s.t, res.Acknowledged) |
| 1861 | } |
| 1862 | cols = fmt.Sprintf("%s,%s", connmongo.DefaultDocumentKeyColumnName, connmongo.DefaultFullDocumentColumnName) |
| 1863 | default: |
| 1864 | require.Fail(s.t, fmt.Sprintf("unknown source type %T", s.source)) |
| 1865 | } |
| 1866 | |
| 1867 | connectionGen := FlowConnectionGenerationConfig{ |
| 1868 | FlowJobName: "resync_missing_" + s.suffix, |
| 1869 | TableNameMapping: map[string]string{ |
| 1870 | qualifiedSourceTables[0]: tableNames[0], |
| 1871 | qualifiedSourceTables[1]: tableNames[1], |
| 1872 | }, |
| 1873 | Destination: s.ch.Peer().Name, |
| 1874 | } |
| 1875 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 1876 | flowConnConfig.DoInitialSnapshot = true |
| 1877 | flowConnConfig.InitialSnapshotOnly = true |
| 1878 | |
| 1879 | response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) |
| 1880 | require.NoError(s.t, err) |
| 1881 | require.NotNil(s.t, response) |
| 1882 | |
| 1883 | tc := NewTemporalClient(s.t) |
| 1884 | env, err := GetPeerflow(s.t.Context(), s.catalog, tc, flowConnConfig.FlowJobName) |
| 1885 | require.NoError(s.t, err) |
| 1886 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 1887 | EnvWaitForFinished(s.t, env, 3*time.Minute) |
| 1888 | for _, tn := range tableNames { |
| 1889 | RequireEqualTables(s.ch, tn, cols) |
| 1890 | } |
| 1891 | |
| 1892 | switch src := s.source.(type) { |
| 1893 | case *PostgresSource, *MySqlSource: |
| 1894 | for _, qt := range qualifiedSourceTables { |
| 1895 | require.NoError(s.t, s.source.Exec(s.t.Context(), "DROP TABLE "+qt)) |
| 1896 | } |
| 1897 | case *MongoSource: |
| 1898 | for _, tn := range tableNames { |
nothing calls this directly
no test coverage detected