()
| 2801 | } |
| 2802 | |
| 2803 | func (s APITestSuite) TestQRep() { |
| 2804 | if _, ok := s.source.(*MongoSource); ok { |
| 2805 | s.t.Skip("QRepFlowWorkFlow is not implemented for MongoDB") |
| 2806 | } |
| 2807 | |
| 2808 | peerType, err := s.GetPeerType(s.t.Context(), &protos.PeerInfoRequest{ |
| 2809 | PeerName: s.source.GeneratePeer(s.t).Name, |
| 2810 | }) |
| 2811 | require.NoError(s.t, err) |
| 2812 | tableName := AddSuffix(s, "qrepapi") |
| 2813 | schemaQualified := AttachSchema(s, tableName) |
| 2814 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 2815 | fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", schemaQualified))) |
| 2816 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 2817 | fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", schemaQualified))) |
| 2818 | |
| 2819 | flowName := fmt.Sprintf("qrepapiflow_%s_%s", peerType.PeerType, s.suffix) |
| 2820 | qrepConfig := CreateQRepWorkflowConfig( |
| 2821 | s.t, |
| 2822 | flowName, |
| 2823 | schemaQualified, |
| 2824 | tableName, |
| 2825 | fmt.Sprintf("SELECT * FROM %s WHERE id BETWEEN {{.start}} AND {{.end}}", schemaQualified), |
| 2826 | s.ch.Peer().Name, |
| 2827 | "", |
| 2828 | true, |
| 2829 | "", |
| 2830 | "", |
| 2831 | ) |
| 2832 | qrepConfig.SourceName = s.source.GeneratePeer(s.t).Name |
| 2833 | qrepConfig.WatermarkColumn = "id" |
| 2834 | qrepConfig.InitialCopyOnly = false |
| 2835 | qrepConfig.WaitBetweenBatchesSeconds = 5 |
| 2836 | qrepConfig.NumRowsPerPartition = 1 |
| 2837 | _, err = s.CreateQRepFlow(s.t.Context(), &protos.CreateQRepFlowRequest{ |
| 2838 | QrepConfig: qrepConfig, |
| 2839 | }) |
| 2840 | require.NoError(s.t, err) |
| 2841 | |
| 2842 | tc := NewTemporalClient(s.t) |
| 2843 | env, err := GetPeerflow(s.t.Context(), s.catalog, tc, qrepConfig.FlowJobName) |
| 2844 | require.NoError(s.t, err) |
| 2845 | |
| 2846 | EnvWaitForEqualTables(env, s.ch, "qrep initial load", tableName, "id,val") |
| 2847 | |
| 2848 | require.NoError(s.t, s.source.Exec(s.t.Context(), |
| 2849 | fmt.Sprintf("INSERT INTO %s(id, val) values (2,'second')", schemaQualified))) |
| 2850 | |
| 2851 | EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", tableName, "id,val") |
| 2852 | statusResponse, err := s.MirrorStatus(s.t.Context(), &protos.MirrorStatusRequest{ |
| 2853 | FlowJobName: qrepConfig.FlowJobName, |
| 2854 | IncludeFlowInfo: true, |
| 2855 | ExcludeBatches: false, |
| 2856 | }) |
| 2857 | require.NoError(s.t, err) |
| 2858 | qStatus := statusResponse.GetQrepStatus() |
| 2859 | require.NotNil(s.t, qStatus) |
| 2860 | require.Len(s.t, qStatus.Partitions, 2) |
nothing calls this directly
no test coverage detected