| 191 | } |
| 192 | |
| 193 | func EnvWaitForEqualTablesWithNames_Only( |
| 194 | env WorkflowRun, |
| 195 | suite RowSource, |
| 196 | reason string, |
| 197 | srcTable string, |
| 198 | dstTable string, |
| 199 | cols string, |
| 200 | ) { |
| 201 | t := suite.T() |
| 202 | pgSource, ok := suite.Source().(*PostgresSource) |
| 203 | if !ok { |
| 204 | t.Fatal("EnvWaitForEqualTablesWithNames_Only only works with PostgresSource") |
| 205 | } |
| 206 | |
| 207 | t.Helper() |
| 208 | |
| 209 | EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { |
| 210 | t.Helper() |
| 211 | |
| 212 | sourceRows, err := pgSource.GetRowsOnly(t.Context(), suite.Suffix(), srcTable, cols) |
| 213 | if err != nil { |
| 214 | t.Log(err) |
| 215 | return false |
| 216 | } |
| 217 | |
| 218 | rows, err := suite.GetRows(dstTable, cols) |
| 219 | if err != nil { |
| 220 | t.Log(err) |
| 221 | return false |
| 222 | } |
| 223 | |
| 224 | return e2eshared.CheckEqualRecordBatches(t, sourceRows, rows) |
| 225 | }) |
| 226 | } |
| 227 | |
| 228 | func EnvWaitForCount( |
| 229 | env WorkflowRun, |