MCPcopy
hub / github.com/PeerDB-io/peerdb / Test_Trips_Flow

Method Test_Trips_Flow

flow/e2e/bigquery_source_test.go:412–483  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

410}
411
412func (s BigQueryClickhouseSuite) Test_Trips_Flow() {
413 t := s.T()
414 ctx := t.Context()
415
416 source := s.Source().(*bigQuerySource)
417 srcTable := "trips_1k"
418 dstTable := "trips_1k_dst"
419
420 t.Logf("ClickHouse database: %s", s.Peer().Config.(*protos.Peer_ClickhouseConfig).ClickhouseConfig.Database)
421
422 count, err := source.helper.countRowsWithDataset(ctx, source.config.DatasetId, srcTable, "")
423 require.NoError(t, err, "should be able to count rows in source table")
424 require.Positive(t, count, "source table should have data")
425 t.Logf("Source table %s has %d rows", srcTable, count)
426
427 connectionGen := FlowConnectionGenerationConfig{
428 FlowJobName: AddSuffix(s, srcTable),
429 TableMappings: []*protos.TableMapping{
430 {
431 SourceTableIdentifier: fmt.Sprintf("%s.%s", source.config.DatasetId, srcTable),
432 DestinationTableIdentifier: s.DestinationTable(dstTable),
433 },
434 },
435 Destination: s.Peer().Name,
436 }
437 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
438 flowConnConfig.DoInitialSnapshot = true
439 flowConnConfig.InitialSnapshotOnly = true
440 flowConnConfig.SnapshotStagingPath = bigQueryTestStagingPath(s, srcTable)
441
442 tc := NewTemporalClient(t)
443 env := ExecutePeerflow(t, tc, flowConnConfig)
444
445 EnvWaitForEqualTablesWithNames(
446 env,
447 s,
448 "initial load to match",
449 srcTable,
450 dstTable,
451 "trip_id,vendor_id,passenger_count,trip_distance,fare_amount",
452 )
453
454 EnvWaitForFinished(t, env, 3*time.Minute)
455
456 apiClient, err := NewApiClient()
457 require.NoError(t, err)
458
459 statusResp, err := apiClient.MirrorStatus(ctx, &protos.MirrorStatusRequest{
460 FlowJobName: flowConnConfig.FlowJobName,
461 IncludeFlowInfo: true,
462 })
463 require.NoError(t, err)
464
465 cdcStatus := statusResp.GetCdcStatus()
466 require.NotNil(t, cdcStatus)
467 require.NotNil(t, cdcStatus.SnapshotStatus)
468 require.NotEmpty(t, cdcStatus.SnapshotStatus.Clones)
469

Callers

nothing calls this directly

Calls 15

AddSuffixFunction · 0.85
bigQueryTestStagingPathFunction · 0.85
NewTemporalClientFunction · 0.85
ExecutePeerflowFunction · 0.85
EnvWaitForFinishedFunction · 0.85
NewApiClientFunction · 0.85
countRowsWithDatasetMethod · 0.80
MirrorStatusMethod · 0.80
TMethod · 0.65

Tested by

no test coverage detected