MCPcopy
hub / github.com/crowdsecurity/crowdsec / TestStopStreaming

Function TestStopStreaming

pkg/acquisition/modules/loki/loki_test.go:370–416  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

368}
369
370func TestStopStreaming(t *testing.T) {
371 cstest.SetAWSTestEnv(t)
372
373 ctx := t.Context()
374
375 config := `
376mode: tail
377source: loki
378url: http://127.0.0.1:3100
379headers:
380 x-scope-orgid: "1234"
381query: >
382 {server="demo"}
383`
384 logger := log.New()
385 subLogger := logger.WithField("type", loki.ModuleName)
386 title := time.Now().String()
387 lokiSource := loki.Source{}
388
389 err := lokiSource.Configure(ctx, []byte(config), subLogger, metrics.AcquisitionMetricsLevelNone)
390 if err != nil {
391 t.Fatalf("Unexpected error : %s", err)
392 }
393
394 out := make(chan pipeline.Event)
395
396 lokiTomb := &tomb.Tomb{}
397
398 err = lokiSource.StreamingAcquisition(ctx, out, lokiTomb)
399 if err != nil {
400 t.Fatalf("Unexpected error : %s", err)
401 }
402
403 time.Sleep(time.Second * 2)
404
405 err = feedLoki(ctx, subLogger, 1, title)
406 if err != nil {
407 t.Fatalf("Unexpected error : %s", err)
408 }
409
410 lokiTomb.Kill(nil)
411
412 err = lokiTomb.Wait()
413 if err != nil {
414 t.Fatalf("Unexpected error : %s", err)
415 }
416}
417
418type LogStreams struct {
419 Streams []LogStream `json:"streams"`

Callers

nothing calls this directly

Calls 5

ConfigureMethod · 0.95
StreamingAcquisitionMethod · 0.95
feedLokiFunction · 0.85
KillMethod · 0.80
StringMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…