(t *testing.T)
| 450 | } |
| 451 | |
| 452 | func TestReceive_ProtocolErrorStreamReconnect(t *testing.T) { |
| 453 | server := newTestServer(t) |
| 454 | |
| 455 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 456 | t.Cleanup(cancel) |
| 457 | |
| 458 | client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second) |
| 459 | require.NoError(t, err) |
| 460 | |
| 461 | // Cleanups run LIFO: the goroutine-drain registered here runs after Close below, |
| 462 | // which is when Receive has actually returned. Without this, the Receive goroutine |
| 463 | // can outlive the test and call t.Logf after teardown, panicking. |
| 464 | receiveDone := make(chan struct{}) |
| 465 | t.Cleanup(func() { |
| 466 | select { |
| 467 | case <-receiveDone: |
| 468 | case <-time.After(2 * time.Second): |
| 469 | t.Error("Receive goroutine did not exit after Close") |
| 470 | } |
| 471 | }) |
| 472 | t.Cleanup(func() { |
| 473 | err := client.Close() |
| 474 | assert.NoError(t, err, "failed to close flow") |
| 475 | }) |
| 476 | |
| 477 | // Track acks received before and after server-side stream close |
| 478 | var ackCount atomic.Int32 |
| 479 | receivedFirst := make(chan struct{}) |
| 480 | receivedAfterReconnect := make(chan struct{}) |
| 481 | |
| 482 | go func() { |
| 483 | defer close(receiveDone) |
| 484 | err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error { |
| 485 | if msg.IsInitiator || len(msg.EventId) == 0 { |
| 486 | return nil |
| 487 | } |
| 488 | n := ackCount.Add(1) |
| 489 | if n == 1 { |
| 490 | close(receivedFirst) |
| 491 | } |
| 492 | if n == 2 { |
| 493 | close(receivedAfterReconnect) |
| 494 | } |
| 495 | return nil |
| 496 | }) |
| 497 | if err != nil && !errors.Is(err, context.Canceled) { |
| 498 | t.Logf("receive error: %v", err) |
| 499 | } |
| 500 | }() |
| 501 | |
| 502 | // Wait for stream to be established, then send first ack |
| 503 | select { |
| 504 | case <-server.handlerStarted: |
| 505 | case <-time.After(3 * time.Second): |
| 506 | t.Fatal("timeout waiting for stream to be established") |
| 507 | } |
| 508 | server.acks <- &proto.FlowEventAck{EventId: []byte("before-close")} |
| 509 |
nothing calls this directly
no test coverage detected