MCPcopy
hub / github.com/PeerDB-io/peerdb / TestCreateTopic

Method TestCreateTopic

flow/e2e/pubsub_test.go:125–180  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

123}
124
125func (s PubSubSuite) TestCreateTopic() {
126 srcTableName := AttachSchema(s, "pscreate")
127
128 _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
129 CREATE TABLE IF NOT EXISTS %s (
130 id SERIAL PRIMARY KEY,
131 val text
132 );
133 `, srcTableName))
134 require.NoError(s.t, err)
135
136 sa, err := ServiceAccount()
137 require.NoError(s.t, err)
138
139 _, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values
140 ('e2e_pscreate', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`)
141 require.NoError(s.t, err)
142
143 flowName := AddSuffix(s, "e2epscreate")
144 connectionGen := FlowConnectionGenerationConfig{
145 FlowJobName: flowName,
146 TableNameMapping: map[string]string{srcTableName: flowName},
147 Destination: s.Peer(sa).Name,
148 }
149 flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
150 flowConnConfig.Script = "e2e_pscreate"
151
152 tc := NewTemporalClient(s.t)
153 env := ExecutePeerflow(s.t, tc, flowConnConfig)
154 SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
155
156 _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
157 INSERT INTO %s (id, val) VALUES (1, 'testval')
158 `, srcTableName))
159 require.NoError(s.t, err)
160
161 EnvWaitFor(s.t, env, 3*time.Minute, "create topic", func() bool {
162 psclient, err := sa.CreatePubSubClient(s.t.Context())
163 defer func() {
164 _ = psclient.Close()
165 }()
166 require.NoError(s.t, err)
167 topic := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName)
168 if _, err := psclient.TopicAdminClient.GetTopic(s.t.Context(), &pubsubpb.GetTopicRequest{
169 Topic: topic,
170 }); err == nil {
171 return true
172 } else if status.Code(err) != codes.NotFound {
173 require.NoError(s.t, err)
174 }
175 return false
176 })
177
178 env.Cancel(s.t.Context())
179 RequireEnvCanceled(s.t, env)
180}
181
182func (s PubSubSuite) TestSimple() {

Callers

nothing calls this directly

Calls 15

ConnMethod · 0.95
PeerMethod · 0.95
AttachSchemaFunction · 0.85
ServiceAccountFunction · 0.85
AddSuffixFunction · 0.85
NewTemporalClientFunction · 0.85
ExecutePeerflowFunction · 0.85
SetupCDCFlowStatusQueryFunction · 0.85
EnvWaitForFunction · 0.85
RequireEnvCanceledFunction · 0.85
CreatePubSubClientMethod · 0.80

Tested by

no test coverage detected