MCPcopy
hub / github.com/PeerDB-io/peerdb / TestTransform

Method TestTransform

flow/e2e/postgres_qrep_test.go:497–545  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

495}
496
497func (s PeerFlowE2ETestSuitePG) TestTransform() {
498 numRows := 10
499
500 srcTable := "test_transform"
501 s.setupSourceTable(srcTable, numRows)
502
503 dstTable := "test_transformdst"
504
505 srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
506 dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)
507
508 query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified)
509
510 _, err := s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
511 ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`)
512 require.NoError(s.t, err)
513
514 jobName := AddSuffix(s, srcTable)
515 qrepConfig := CreateQRepWorkflowConfig(
516 s.t,
517 jobName,
518 srcSchemaQualified,
519 dstSchemaQualified,
520 query,
521 GeneratePostgresPeer(s.t).Name,
522 "",
523 true,
524 "_PEERDB_SYNCED_AT",
525 "",
526 )
527 qrepConfig.WriteMode = &protos.QRepWriteMode{
528 WriteType: protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE,
529 }
530 qrepConfig.InitialCopyOnly = false
531 qrepConfig.Script = "pgtransform"
532
533 tc := NewTemporalClient(s.t)
534 env := RunQRepFlowWorkflow(s.t, tc, qrepConfig)
535 EnvWaitFor(s.t, env, 3*time.Minute, "waiting for first sync to complete", func() bool {
536 return s.compareCounts(dstSchemaQualified, int64(numRows)) == nil
537 })
538 require.NoError(s.t, env.Error(s.t.Context()))
539
540 var exists bool
541 err = s.Conn().QueryRow(s.t.Context(),
542 fmt.Sprintf("select exists(select * from %s where myreal <> 1729)", dstSchemaQualified)).Scan(&exists)
543 require.NoError(s.t, err)
544 require.False(s.t, exists)
545}

Callers

nothing calls this directly

Calls 13

setupSourceTableMethod · 0.95
ConnMethod · 0.95
compareCountsMethod · 0.95
AddSuffixFunction · 0.85
CreateQRepWorkflowConfigFunction · 0.85
GeneratePostgresPeerFunction · 0.85
NewTemporalClientFunction · 0.85
RunQRepFlowWorkflowFunction · 0.85
EnvWaitForFunction · 0.85
ExecMethod · 0.65
ErrorMethod · 0.45
ScanMethod · 0.45

Tested by

no test coverage detected