MCPcopy
hub / github.com/netbirdio/netbird / TestReceive_ProtocolErrorStreamReconnect

Function TestReceive_ProtocolErrorStreamReconnect

flow/client/client_test.go:452–549  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

450}
451
452func TestReceive_ProtocolErrorStreamReconnect(t *testing.T) {
453 server := newTestServer(t)
454
455 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
456 t.Cleanup(cancel)
457
458 client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
459 require.NoError(t, err)
460
461 // Cleanups run LIFO: the goroutine-drain registered here runs after Close below,
462 // which is when Receive has actually returned. Without this, the Receive goroutine
463 // can outlive the test and call t.Logf after teardown, panicking.
464 receiveDone := make(chan struct{})
465 t.Cleanup(func() {
466 select {
467 case <-receiveDone:
468 case <-time.After(2 * time.Second):
469 t.Error("Receive goroutine did not exit after Close")
470 }
471 })
472 t.Cleanup(func() {
473 err := client.Close()
474 assert.NoError(t, err, "failed to close flow")
475 })
476
477 // Track acks received before and after server-side stream close
478 var ackCount atomic.Int32
479 receivedFirst := make(chan struct{})
480 receivedAfterReconnect := make(chan struct{})
481
482 go func() {
483 defer close(receiveDone)
484 err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
485 if msg.IsInitiator || len(msg.EventId) == 0 {
486 return nil
487 }
488 n := ackCount.Add(1)
489 if n == 1 {
490 close(receivedFirst)
491 }
492 if n == 2 {
493 close(receivedAfterReconnect)
494 }
495 return nil
496 })
497 if err != nil && !errors.Is(err, context.Canceled) {
498 t.Logf("receive error: %v", err)
499 }
500 }()
501
502 // Wait for stream to be established, then send first ack
503 select {
504 case <-server.handlerStarted:
505 case <-time.After(3 * time.Second):
506 t.Fatal("timeout waiting for stream to be established")
507 }
508 server.acks <- &proto.FlowEventAck{EventId: []byte("before-close")}
509

Callers

nothing calls this directly

Calls 10

CloseMethod · 0.95
ReceiveMethod · 0.95
connCountMethod · 0.80
sendRSTStreamMethod · 0.80
LoadMethod · 0.80
newTestServerFunction · 0.70
CleanupMethod · 0.65
ErrorMethod · 0.65
AddMethod · 0.65
IsMethod · 0.45

Tested by

no test coverage detected