MCPcopy
hub / github.com/PeerDB-io/peerdb / TestSimple

Method TestSimple

flow/e2e/pubsub_test.go:182–255  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

180}
181
182func (s PubSubSuite) TestSimple() {
183 srcTableName := AttachSchema(s, "pssimple")
184
185 _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
186 CREATE TABLE IF NOT EXISTS %s (
187 id SERIAL PRIMARY KEY,
188 val text
189 );
190 `, srcTableName))
191 require.NoError(s.t, err)
192
193 sa, err := ServiceAccount()
194 require.NoError(s.t, err)
195
196 _, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
197 ('e2e_pssimple', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`)
198 require.NoError(s.t, err)
199
200 flowName := AddSuffix(s, "e2epssimple")
201 connectionGen := FlowConnectionGenerationConfig{
202 FlowJobName: flowName,
203 TableNameMapping: map[string]string{srcTableName: flowName},
204 Destination: s.Peer(sa).Name,
205 }
206 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
207 flowConnConfig.Script = "e2e_pssimple"
208
209 psclient, err := sa.CreatePubSubClient(s.t.Context())
210 require.NoError(s.t, err)
211 defer psclient.Close()
212 topicName := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName)
213 _, err = psclient.TopicAdminClient.CreateTopic(s.t.Context(), &pubsubpb.Topic{
214 Name: topicName,
215 })
216 require.NoError(s.t, err)
217 sub, err := psclient.SubscriptionAdminClient.CreateSubscription(s.t.Context(), &pubsubpb.Subscription{
218 Name: fmt.Sprintf("projects/%s/subscriptions/%s", psclient.Project(), flowName),
219 Topic: topicName,
220 MessageRetentionDuration: durationpb.New(10 * time.Minute),
221 ExpirationPolicy: &pubsubpb.ExpirationPolicy{Ttl: durationpb.New(24 * time.Hour)},
222 })
223 require.NoError(s.t, err)
224
225 tc := NewTemporalClient(s.t)
226 env := ExecutePeerflow(s.t, tc, flowConnConfig)
227 SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
228
229 _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
230 INSERT INTO %s (id, val) VALUES (1, 'testval')
231 `, srcTableName))
232 require.NoError(s.t, err)
233
234 ctx, cancel := context.WithTimeout(s.t.Context(), 3*time.Minute)
235 defer cancel()
236
237 msgs := make(chan *pubsub.Message)
238 go func() {
239 subclient := psclient.Subscriber(sub.Name)

Callers

nothing calls this directly

Calls 15

ConnMethod · 0.95
PeerMethod · 0.95
AttachSchemaFunction · 0.85
ServiceAccountFunction · 0.85
AddSuffixFunction · 0.85
NewTemporalClientFunction · 0.85
ExecutePeerflowFunction · 0.85
SetupCDCFlowStatusQueryFunction · 0.85
RequireEnvCanceledFunction · 0.85
CreatePubSubClientMethod · 0.80
ReceiveMethod · 0.80

Tested by

no test coverage detected