(config TestConfig)
| 53 | } |
| 54 | |
| 55 | func runTest(config TestConfig) error { |
| 56 | if config.Mode != "streammanager" && config.Mode != "writer" { |
| 57 | return fmt.Errorf("invalid mode: %s (must be 'streammanager' or 'writer')", config.Mode) |
| 58 | } |
| 59 | |
| 60 | fmt.Printf("Starting Streaming Integration Test\n") |
| 61 | fmt.Printf(" Mode: %s\n", config.Mode) |
| 62 | fmt.Printf(" Data Size: %d bytes\n", config.DataSize) |
| 63 | fmt.Printf(" Delay: %v, Skew: %v\n", config.Delay, config.Skew) |
| 64 | fmt.Printf(" Window Size: %d\n", config.WindowSize) |
| 65 | if config.SlowReader > 0 { |
| 66 | fmt.Printf(" Slow Reader: %d bytes/sec\n", config.SlowReader) |
| 67 | } |
| 68 | |
| 69 | // 1. Create metrics |
| 70 | metrics := NewMetrics() |
| 71 | |
| 72 | // 2. Create the delivery pipe |
| 73 | pipe := NewDeliveryPipe(DeliveryConfig{ |
| 74 | Delay: config.Delay, |
| 75 | Skew: config.Skew, |
| 76 | }, metrics) |
| 77 | |
| 78 | // 3. Create brokers with bridges |
| 79 | writerBridge := &WriterBridge{pipe: pipe} |
| 80 | readerBridge := &ReaderBridge{pipe: pipe} |
| 81 | |
| 82 | writerBroker := streamclient.NewBroker(writerBridge) |
| 83 | readerBroker := streamclient.NewBroker(readerBridge) |
| 84 | |
| 85 | // 4. Wire up delivery targets |
| 86 | pipe.SetDataTarget(readerBroker.RecvData) |
| 87 | pipe.SetAckTarget(writerBroker.RecvAck) |
| 88 | |
| 89 | // 5. Start the delivery pipe |
| 90 | pipe.Start() |
| 91 | |
| 92 | // 6. Create the reader side |
| 93 | reader, streamMeta := readerBroker.CreateStreamReader("reader-route", "writer-route", int64(config.WindowSize)) |
| 94 | |
| 95 | // 7. Set up writer side based on mode |
| 96 | var writerDone chan error |
| 97 | if config.Mode == "streammanager" { |
| 98 | writerDone = runStreamManagerMode(config, writerBroker, streamMeta) |
| 99 | } else { |
| 100 | writerDone = runWriterMode(config, writerBroker, streamMeta) |
| 101 | } |
| 102 | |
| 103 | // 8. Create verifier |
| 104 | verifier := NewVerifier(config.DataSize) |
| 105 | |
| 106 | // 9. Create metrics writer wrapper |
| 107 | metricsWriter := &MetricsWriter{ |
| 108 | writer: verifier, |
| 109 | metrics: metrics, |
| 110 | } |
| 111 | |
| 112 | // 10. Wrap reader with slow reader if configured |
no test coverage detected