()
| 180 | } |
| 181 | |
| 182 | func (s PubSubSuite) TestSimple() { |
| 183 | srcTableName := AttachSchema(s, "pssimple") |
| 184 | |
| 185 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 186 | CREATE TABLE IF NOT EXISTS %s ( |
| 187 | id SERIAL PRIMARY KEY, |
| 188 | val text |
| 189 | ); |
| 190 | `, srcTableName)) |
| 191 | require.NoError(s.t, err) |
| 192 | |
| 193 | sa, err := ServiceAccount() |
| 194 | require.NoError(s.t, err) |
| 195 | |
| 196 | _, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values |
| 197 | ('e2e_pssimple', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`) |
| 198 | require.NoError(s.t, err) |
| 199 | |
| 200 | flowName := AddSuffix(s, "e2epssimple") |
| 201 | connectionGen := FlowConnectionGenerationConfig{ |
| 202 | FlowJobName: flowName, |
| 203 | TableNameMapping: map[string]string{srcTableName: flowName}, |
| 204 | Destination: s.Peer(sa).Name, |
| 205 | } |
| 206 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 207 | flowConnConfig.Script = "e2e_pssimple" |
| 208 | |
| 209 | psclient, err := sa.CreatePubSubClient(s.t.Context()) |
| 210 | require.NoError(s.t, err) |
| 211 | defer psclient.Close() |
| 212 | topicName := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName) |
| 213 | _, err = psclient.TopicAdminClient.CreateTopic(s.t.Context(), &pubsubpb.Topic{ |
| 214 | Name: topicName, |
| 215 | }) |
| 216 | require.NoError(s.t, err) |
| 217 | sub, err := psclient.SubscriptionAdminClient.CreateSubscription(s.t.Context(), &pubsubpb.Subscription{ |
| 218 | Name: fmt.Sprintf("projects/%s/subscriptions/%s", psclient.Project(), flowName), |
| 219 | Topic: topicName, |
| 220 | MessageRetentionDuration: durationpb.New(10 * time.Minute), |
| 221 | ExpirationPolicy: &pubsubpb.ExpirationPolicy{Ttl: durationpb.New(24 * time.Hour)}, |
| 222 | }) |
| 223 | require.NoError(s.t, err) |
| 224 | |
| 225 | tc := NewTemporalClient(s.t) |
| 226 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 227 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 228 | |
| 229 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 230 | INSERT INTO %s (id, val) VALUES (1, 'testval') |
| 231 | `, srcTableName)) |
| 232 | require.NoError(s.t, err) |
| 233 | |
| 234 | ctx, cancel := context.WithTimeout(s.t.Context(), 3*time.Minute) |
| 235 | defer cancel() |
| 236 | |
| 237 | msgs := make(chan *pubsub.Message) |
| 238 | go func() { |
| 239 | subclient := psclient.Subscriber(sub.Name) |
nothing calls this directly
no test coverage detected