()
| 255 | } |
| 256 | |
| 257 | func (s PubSubSuite) TestInitialLoad() { |
| 258 | srcTableName := AttachSchema(s, "psinitial") |
| 259 | |
| 260 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 261 | CREATE TABLE IF NOT EXISTS %s ( |
| 262 | id SERIAL PRIMARY KEY, |
| 263 | val text |
| 264 | ); |
| 265 | `, srcTableName)) |
| 266 | require.NoError(s.t, err) |
| 267 | |
| 268 | sa, err := ServiceAccount() |
| 269 | require.NoError(s.t, err) |
| 270 | |
| 271 | _, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values |
| 272 | ('e2e_psinitial', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`) |
| 273 | require.NoError(s.t, err) |
| 274 | |
| 275 | flowName := AddSuffix(s, "e2epsinitial") |
| 276 | connectionGen := FlowConnectionGenerationConfig{ |
| 277 | FlowJobName: flowName, |
| 278 | TableNameMapping: map[string]string{srcTableName: flowName}, |
| 279 | Destination: s.Peer(sa).Name, |
| 280 | } |
| 281 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 282 | flowConnConfig.Script = "e2e_psinitial" |
| 283 | flowConnConfig.DoInitialSnapshot = true |
| 284 | |
| 285 | psclient, err := sa.CreatePubSubClient(s.t.Context()) |
| 286 | require.NoError(s.t, err) |
| 287 | defer psclient.Close() |
| 288 | topicName := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName) |
| 289 | _, err = psclient.TopicAdminClient.CreateTopic(s.t.Context(), &pubsubpb.Topic{ |
| 290 | Name: topicName, |
| 291 | }) |
| 292 | require.NoError(s.t, err) |
| 293 | sub, err := psclient.SubscriptionAdminClient.CreateSubscription(s.t.Context(), &pubsubpb.Subscription{ |
| 294 | Name: fmt.Sprintf("projects/%s/subscriptions/%s", psclient.Project(), flowName), |
| 295 | Topic: topicName, |
| 296 | MessageRetentionDuration: durationpb.New(10 * time.Minute), |
| 297 | ExpirationPolicy: &pubsubpb.ExpirationPolicy{Ttl: durationpb.New(24 * time.Hour)}, |
| 298 | }) |
| 299 | require.NoError(s.t, err) |
| 300 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( |
| 301 | `INSERT INTO %s (id, val) VALUES (1, 'testval')`, srcTableName)) |
| 302 | require.NoError(s.t, err) |
| 303 | |
| 304 | tc := NewTemporalClient(s.t) |
| 305 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 306 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 307 | |
| 308 | ctx, cancel := context.WithTimeout(s.t.Context(), 3*time.Minute) |
| 309 | defer cancel() |
| 310 | |
| 311 | msgs := make(chan *pubsub.Message) |
| 312 | go func() { |
| 313 | subclient := psclient.Subscriber(sub.Name) |
| 314 | _ = subclient.Receive(ctx, func(_ context.Context, m *pubsub.Message) { |
nothing calls this directly
no test coverage detected