| 200 | } |
| 201 | |
| 202 | func TestListPolling(t *testing.T) { |
| 203 | ctx := t.Context() |
| 204 | tests := []struct { |
| 205 | name string |
| 206 | config string |
| 207 | expectedCount int |
| 208 | }{ |
| 209 | { |
| 210 | name: "basic", |
| 211 | config: ` |
| 212 | source: s3 |
| 213 | bucket_name: bucket_no_prefix |
| 214 | polling_method: list |
| 215 | polling_interval: 1 |
| 216 | `, |
| 217 | expectedCount: 2, |
| 218 | }, |
| 219 | { |
| 220 | name: "with prefix", |
| 221 | config: ` |
| 222 | source: s3 |
| 223 | bucket_name: bucket_with_prefix |
| 224 | polling_method: list |
| 225 | polling_interval: 1 |
| 226 | prefix: foo/ |
| 227 | `, |
| 228 | expectedCount: 4, |
| 229 | }, |
| 230 | } |
| 231 | |
| 232 | for _, test := range tests { |
| 233 | t.Run(test.name, func(t *testing.T) { |
| 234 | linesRead := 0 |
| 235 | f := Source{} |
| 236 | logger := log.NewEntry(log.New()) |
| 237 | logger.Logger.SetLevel(log.TraceLevel) |
| 238 | |
| 239 | err := f.Configure(ctx, []byte(test.config), logger, metrics.AcquisitionMetricsLevelNone) |
| 240 | require.NoError(t, err) |
| 241 | |
| 242 | f.s3Client = mockS3Client{} |
| 243 | |
| 244 | if f.Config.PollingMethod != PollMethodList { |
| 245 | t.Fatalf("expected list polling, got %s", f.Config.PollingMethod) |
| 246 | } |
| 247 | |
| 248 | out := make(chan pipeline.Event) |
| 249 | tb := tomb.Tomb{} |
| 250 | |
| 251 | go func() { |
| 252 | for { |
| 253 | select { |
| 254 | case s := <-out: |
| 255 | fmt.Printf("got line %s\n", s.Line.Raw) |
| 256 | linesRead++ |
| 257 | case <-tb.Dying(): |
| 258 | return |
| 259 | } |