MCPcopy
hub / github.com/PeerDB-io/peerdb / TestInitialLoad

Method TestInitialLoad

flow/e2e/pubsub_test.go:257–329  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

255}
256
257func (s PubSubSuite) TestInitialLoad() {
258 srcTableName := AttachSchema(s, "psinitial")
259
260 _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
261 CREATE TABLE IF NOT EXISTS %s (
262 id SERIAL PRIMARY KEY,
263 val text
264 );
265 `, srcTableName))
266 require.NoError(s.t, err)
267
268 sa, err := ServiceAccount()
269 require.NoError(s.t, err)
270
271 _, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
272 ('e2e_psinitial', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`)
273 require.NoError(s.t, err)
274
275 flowName := AddSuffix(s, "e2epsinitial")
276 connectionGen := FlowConnectionGenerationConfig{
277 FlowJobName: flowName,
278 TableNameMapping: map[string]string{srcTableName: flowName},
279 Destination: s.Peer(sa).Name,
280 }
281 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
282 flowConnConfig.Script = "e2e_psinitial"
283 flowConnConfig.DoInitialSnapshot = true
284
285 psclient, err := sa.CreatePubSubClient(s.t.Context())
286 require.NoError(s.t, err)
287 defer psclient.Close()
288 topicName := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName)
289 _, err = psclient.TopicAdminClient.CreateTopic(s.t.Context(), &pubsubpb.Topic{
290 Name: topicName,
291 })
292 require.NoError(s.t, err)
293 sub, err := psclient.SubscriptionAdminClient.CreateSubscription(s.t.Context(), &pubsubpb.Subscription{
294 Name: fmt.Sprintf("projects/%s/subscriptions/%s", psclient.Project(), flowName),
295 Topic: topicName,
296 MessageRetentionDuration: durationpb.New(10 * time.Minute),
297 ExpirationPolicy: &pubsubpb.ExpirationPolicy{Ttl: durationpb.New(24 * time.Hour)},
298 })
299 require.NoError(s.t, err)
300 _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(
301 `INSERT INTO %s (id, val) VALUES (1, 'testval')`, srcTableName))
302 require.NoError(s.t, err)
303
304 tc := NewTemporalClient(s.t)
305 env := ExecutePeerflow(s.t, tc, flowConnConfig)
306 SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
307
308 ctx, cancel := context.WithTimeout(s.t.Context(), 3*time.Minute)
309 defer cancel()
310
311 msgs := make(chan *pubsub.Message)
312 go func() {
313 subclient := psclient.Subscriber(sub.Name)
314 _ = subclient.Receive(ctx, func(_ context.Context, m *pubsub.Message) {

Callers

nothing calls this directly

Calls 15

ConnMethod · 0.95
PeerMethod · 0.95
AttachSchemaFunction · 0.85
ServiceAccountFunction · 0.85
AddSuffixFunction · 0.85
NewTemporalClientFunction · 0.85
ExecutePeerflowFunction · 0.85
SetupCDCFlowStatusQueryFunction · 0.85
RequireEnvCanceledFunction · 0.85
CreatePubSubClientMethod · 0.80
ReceiveMethod · 0.80

Tested by

no test coverage detected