()
| 123 | } |
| 124 | |
| 125 | func (s PubSubSuite) TestCreateTopic() { |
| 126 | srcTableName := AttachSchema(s, "pscreate") |
| 127 | |
| 128 | _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 129 | CREATE TABLE IF NOT EXISTS %s ( |
| 130 | id SERIAL PRIMARY KEY, |
| 131 | val text |
| 132 | ); |
| 133 | `, srcTableName)) |
| 134 | require.NoError(s.t, err) |
| 135 | |
| 136 | sa, err := ServiceAccount() |
| 137 | require.NoError(s.t, err) |
| 138 | |
| 139 | _, err = s.Conn().Exec(s.t.Context(), `insert into public.scripts (name, lang, source) values |
| 140 | ('e2e_pscreate', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`) |
| 141 | require.NoError(s.t, err) |
| 142 | |
| 143 | flowName := AddSuffix(s, "e2epscreate") |
| 144 | connectionGen := FlowConnectionGenerationConfig{ |
| 145 | FlowJobName: flowName, |
| 146 | TableNameMapping: map[string]string{srcTableName: flowName}, |
| 147 | Destination: s.Peer(sa).Name, |
| 148 | } |
| 149 | flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) |
| 150 | flowConnConfig.Script = "e2e_pscreate" |
| 151 | |
| 152 | tc := NewTemporalClient(s.t) |
| 153 | env := ExecutePeerflow(s.t, tc, flowConnConfig) |
| 154 | SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) |
| 155 | |
| 156 | _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` |
| 157 | INSERT INTO %s (id, val) VALUES (1, 'testval') |
| 158 | `, srcTableName)) |
| 159 | require.NoError(s.t, err) |
| 160 | |
| 161 | EnvWaitFor(s.t, env, 3*time.Minute, "create topic", func() bool { |
| 162 | psclient, err := sa.CreatePubSubClient(s.t.Context()) |
| 163 | defer func() { |
| 164 | _ = psclient.Close() |
| 165 | }() |
| 166 | require.NoError(s.t, err) |
| 167 | topic := fmt.Sprintf("projects/%s/topics/%s", psclient.Project(), flowName) |
| 168 | if _, err := psclient.TopicAdminClient.GetTopic(s.t.Context(), &pubsubpb.GetTopicRequest{ |
| 169 | Topic: topic, |
| 170 | }); err == nil { |
| 171 | return true |
| 172 | } else if status.Code(err) != codes.NotFound { |
| 173 | require.NoError(s.t, err) |
| 174 | } |
| 175 | return false |
| 176 | }) |
| 177 | |
| 178 | env.Cancel(s.t.Context()) |
| 179 | RequireEnvCanceled(s.t, env) |
| 180 | } |
| 181 | |
| 182 | func (s PubSubSuite) TestSimple() { |
nothing calls this directly
no test coverage detected