()
| 410 | } |
| 411 | |
| 412 | func (s BigQueryClickhouseSuite) Test_Trips_Flow() { |
| 413 | t := s.T() |
| 414 | ctx := t.Context() |
| 415 | |
| 416 | source := s.Source().(*bigQuerySource) |
| 417 | srcTable := "trips_1k" |
| 418 | dstTable := "trips_1k_dst" |
| 419 | |
| 420 | t.Logf("ClickHouse database: %s", s.Peer().Config.(*protos.Peer_ClickhouseConfig).ClickhouseConfig.Database) |
| 421 | |
| 422 | count, err := source.helper.countRowsWithDataset(ctx, source.config.DatasetId, srcTable, "") |
| 423 | require.NoError(t, err, "should be able to count rows in source table") |
| 424 | require.Positive(t, count, "source table should have data") |
| 425 | t.Logf("Source table %s has %d rows", srcTable, count) |
| 426 | |
| 427 | connectionGen := FlowConnectionGenerationConfig{ |
| 428 | FlowJobName: AddSuffix(s, srcTable), |
| 429 | TableMappings: []*protos.TableMapping{ |
| 430 | { |
| 431 | SourceTableIdentifier: fmt.Sprintf("%s.%s", source.config.DatasetId, srcTable), |
| 432 | DestinationTableIdentifier: s.DestinationTable(dstTable), |
| 433 | }, |
| 434 | }, |
| 435 | Destination: s.Peer().Name, |
| 436 | } |
| 437 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 438 | flowConnConfig.DoInitialSnapshot = true |
| 439 | flowConnConfig.InitialSnapshotOnly = true |
| 440 | flowConnConfig.SnapshotStagingPath = bigQueryTestStagingPath(s, srcTable) |
| 441 | |
| 442 | tc := NewTemporalClient(t) |
| 443 | env := ExecutePeerflow(t, tc, flowConnConfig) |
| 444 | |
| 445 | EnvWaitForEqualTablesWithNames( |
| 446 | env, |
| 447 | s, |
| 448 | "initial load to match", |
| 449 | srcTable, |
| 450 | dstTable, |
| 451 | "trip_id,vendor_id,passenger_count,trip_distance,fare_amount", |
| 452 | ) |
| 453 | |
| 454 | EnvWaitForFinished(t, env, 3*time.Minute) |
| 455 | |
| 456 | apiClient, err := NewApiClient() |
| 457 | require.NoError(t, err) |
| 458 | |
| 459 | statusResp, err := apiClient.MirrorStatus(ctx, &protos.MirrorStatusRequest{ |
| 460 | FlowJobName: flowConnConfig.FlowJobName, |
| 461 | IncludeFlowInfo: true, |
| 462 | }) |
| 463 | require.NoError(t, err) |
| 464 | |
| 465 | cdcStatus := statusResp.GetCdcStatus() |
| 466 | require.NotNil(t, cdcStatus) |
| 467 | require.NotNil(t, cdcStatus.SnapshotStatus) |
| 468 | require.NotEmpty(t, cdcStatus.SnapshotStatus.Clones) |
| 469 |
nothing calls this directly
no test coverage detected