MCPcopy
hub / github.com/PeerDB-io/peerdb / TestQRep

Method TestQRep

flow/e2e/api_test.go:2803–2875  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

2801}
2802
2803func (s APITestSuite) TestQRep() {
2804 if _, ok := s.source.(*MongoSource); ok {
2805 s.t.Skip("QRepFlowWorkFlow is not implemented for MongoDB")
2806 }
2807
2808 peerType, err := s.GetPeerType(s.t.Context(), &protos.PeerInfoRequest{
2809 PeerName: s.source.GeneratePeer(s.t).Name,
2810 })
2811 require.NoError(s.t, err)
2812 tableName := AddSuffix(s, "qrepapi")
2813 schemaQualified := AttachSchema(s, tableName)
2814 require.NoError(s.t, s.source.Exec(s.t.Context(),
2815 fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", schemaQualified)))
2816 require.NoError(s.t, s.source.Exec(s.t.Context(),
2817 fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", schemaQualified)))
2818
2819 flowName := fmt.Sprintf("qrepapiflow_%s_%s", peerType.PeerType, s.suffix)
2820 qrepConfig := CreateQRepWorkflowConfig(
2821 s.t,
2822 flowName,
2823 schemaQualified,
2824 tableName,
2825 fmt.Sprintf("SELECT * FROM %s WHERE id BETWEEN {{.start}} AND {{.end}}", schemaQualified),
2826 s.ch.Peer().Name,
2827 "",
2828 true,
2829 "",
2830 "",
2831 )
2832 qrepConfig.SourceName = s.source.GeneratePeer(s.t).Name
2833 qrepConfig.WatermarkColumn = "id"
2834 qrepConfig.InitialCopyOnly = false
2835 qrepConfig.WaitBetweenBatchesSeconds = 5
2836 qrepConfig.NumRowsPerPartition = 1
2837 _, err = s.CreateQRepFlow(s.t.Context(), &protos.CreateQRepFlowRequest{
2838 QrepConfig: qrepConfig,
2839 })
2840 require.NoError(s.t, err)
2841
2842 tc := NewTemporalClient(s.t)
2843 env, err := GetPeerflow(s.t.Context(), s.catalog, tc, qrepConfig.FlowJobName)
2844 require.NoError(s.t, err)
2845
2846 EnvWaitForEqualTables(env, s.ch, "qrep initial load", tableName, "id,val")
2847
2848 require.NoError(s.t, s.source.Exec(s.t.Context(),
2849 fmt.Sprintf("INSERT INTO %s(id, val) values (2,'second')", schemaQualified)))
2850
2851 EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", tableName, "id,val")
2852 statusResponse, err := s.MirrorStatus(s.t.Context(), &protos.MirrorStatusRequest{
2853 FlowJobName: qrepConfig.FlowJobName,
2854 IncludeFlowInfo: true,
2855 ExcludeBatches: false,
2856 })
2857 require.NoError(s.t, err)
2858 qStatus := statusResponse.GetQrepStatus()
2859 require.NotNil(s.t, qStatus)
2860 require.Len(s.t, qStatus.Partitions, 2)

Callers

nothing calls this directly

Calls 15

waitForFlowDroppedMethod · 0.95
AddSuffixFunction · 0.85
AttachSchemaFunction · 0.85
CreateQRepWorkflowConfigFunction · 0.85
NewTemporalClientFunction · 0.85
GetPeerflowFunction · 0.85
EnvWaitForEqualTablesFunction · 0.85
CreateQRepFlowMethod · 0.80
MirrorStatusMethod · 0.80
FlowStateChangeMethod · 0.80
GeneratePeerMethod · 0.65
ExecMethod · 0.65

Tested by

no test coverage detected