| 469 | } |
| 470 | |
| 471 | func CreateQRepWorkflowConfig( |
| 472 | t *testing.T, |
| 473 | flowJobName string, |
| 474 | sourceTable string, |
| 475 | dstTable string, |
| 476 | query string, |
| 477 | dest string, |
| 478 | stagingPath string, |
| 479 | setupDst bool, |
| 480 | syncedAtCol string, |
| 481 | isDeletedCol string, |
| 482 | ) *protos.QRepConfig { |
| 483 | t.Helper() |
| 484 | |
| 485 | return &protos.QRepConfig{ |
| 486 | FlowJobName: flowJobName, |
| 487 | WatermarkTable: sourceTable, |
| 488 | DestinationTableIdentifier: dstTable, |
| 489 | SourceName: GeneratePostgresPeer(t).Name, |
| 490 | DestinationName: dest, |
| 491 | Query: query, |
| 492 | WatermarkColumn: "updated_at", |
| 493 | StagingPath: stagingPath, |
| 494 | WriteMode: &protos.QRepWriteMode{ |
| 495 | WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, |
| 496 | }, |
| 497 | NumRowsPerPartition: 1000, |
| 498 | InitialCopyOnly: true, |
| 499 | SyncedAtColName: syncedAtCol, |
| 500 | SetupWatermarkTableOnDestination: setupDst, |
| 501 | SoftDeleteColName: isDeletedCol, |
| 502 | Version: shared.InternalVersion_Latest, |
| 503 | } |
| 504 | } |
| 505 | |
| 506 | func RunQRepFlowWorkflow(t *testing.T, tc client.Client, config *protos.QRepConfig) WorkflowRun { |
| 507 | t.Helper() |