()
| 495 | } |
| 496 | |
| 497 | func (s PeerFlowE2ETestSuitePG) TestTransform() { |
| 498 | numRows := 10 |
| 499 | |
| 500 | srcTable := "test_transform" |
| 501 | s.setupSourceTable(srcTable, numRows) |
| 502 | |
| 503 | dstTable := "test_transformdst" |
| 504 | |
| 505 | srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) |
| 506 | dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) |
| 507 | |
| 508 | query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified) |
| 509 | |
| 510 | _, err := s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values |
| 511 | ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`) |
| 512 | require.NoError(s.t, err) |
| 513 | |
| 514 | jobName := AddSuffix(s, srcTable) |
| 515 | qrepConfig := CreateQRepWorkflowConfig( |
| 516 | s.t, |
| 517 | jobName, |
| 518 | srcSchemaQualified, |
| 519 | dstSchemaQualified, |
| 520 | query, |
| 521 | GeneratePostgresPeer(s.t).Name, |
| 522 | "", |
| 523 | true, |
| 524 | "_PEERDB_SYNCED_AT", |
| 525 | "", |
| 526 | ) |
| 527 | qrepConfig.WriteMode = &protos.QRepWriteMode{ |
| 528 | WriteType: protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE, |
| 529 | } |
| 530 | qrepConfig.InitialCopyOnly = false |
| 531 | qrepConfig.Script = "pgtransform" |
| 532 | |
| 533 | tc := NewTemporalClient(s.t) |
| 534 | env := RunQRepFlowWorkflow(s.t, tc, qrepConfig) |
| 535 | EnvWaitFor(s.t, env, 3*time.Minute, "waiting for first sync to complete", func() bool { |
| 536 | return s.compareCounts(dstSchemaQualified, int64(numRows)) == nil |
| 537 | }) |
| 538 | require.NoError(s.t, env.Error(s.t.Context())) |
| 539 | |
| 540 | var exists bool |
| 541 | err = s.Conn().QueryRow(s.t.Context(), |
| 542 | fmt.Sprintf("select exists(select * from %s where myreal <> 1729)", dstSchemaQualified)).Scan(&exists) |
| 543 | require.NoError(s.t, err) |
| 544 | require.False(s.t, exists) |
| 545 | } |
nothing calls this directly
no test coverage detected