()
| 576 | } |
| 577 | |
| 578 | func (s *proxyTestSuite) TestResiliencyStreaming() { |
| 579 | // Set the resiliency policy |
| 580 | s.setupResiliency() |
| 581 | defer func() { |
| 582 | s.policyDef = nil |
| 583 | }() |
| 584 | |
| 585 | s.T().Run("retries are not allowed", func(t *testing.T) { |
| 586 | // We're purposely not setting dapr-stream=true in this context because we want to simulate the failure when the RPC is not marked as streaming |
| 587 | ctx, cancel := context.WithTimeout(t.Context(), time.Minute) |
| 588 | defer cancel() |
| 589 | ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs( |
| 590 | diagConsts.GRPCProxyAppIDKey, "test", |
| 591 | "dapr-test", t.Name(), |
| 592 | )) |
| 593 | |
| 594 | // Invoke the stream |
| 595 | stream, err := s.testClient.PingStream(ctx) |
| 596 | require.NoError(t, err, "PingStream request should be successful") |
| 597 | |
| 598 | // First message should succeed |
| 599 | err = stream.Send(&pb.PingRequest{Value: "1"}) |
| 600 | require.NoError(t, err, "First message should be sent") |
| 601 | res, err := stream.Recv() |
| 602 | require.NoError(t, err, "First response should be received") |
| 603 | require.NotNil(t, res) |
| 604 | |
| 605 | // Second message should fail |
| 606 | s.service.expectPingStreamError.Store(true) |
| 607 | defer func() { |
| 608 | s.service.expectPingStreamError.Store(false) |
| 609 | }() |
| 610 | err = stream.SendMsg(&pb.PingRequest{Value: "2"}) |
| 611 | require.NoError(t, err, "Second message should be sent") |
| 612 | _, err = stream.Recv() |
| 613 | require.Error(t, err, "Second Recv should fail with error") |
| 614 | |
| 615 | grpcStatus, ok := status.FromError(err) |
| 616 | require.True(t, ok, "Error should have a gRPC status code") |
| 617 | require.Equal(t, codes.FailedPrecondition, grpcStatus.Code()) |
| 618 | require.ErrorIs(t, err, errRetryOnStreamingRPC) |
| 619 | }) |
| 620 | |
| 621 | s.T().Run("timeouts do not apply after initial handshake", func(t *testing.T) { |
| 622 | ctx, cancel := context.WithTimeout(t.Context(), time.Minute) |
| 623 | defer cancel() |
| 624 | |
| 625 | meter := setupMetrics(s) |
| 626 | t.Cleanup(func() { |
| 627 | meter.Stop() |
| 628 | }) |
| 629 | |
| 630 | ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs( |
| 631 | diagConsts.GRPCProxyAppIDKey, testAppID, |
| 632 | StreamMetadataKey, "1", |
| 633 | "dapr-test", t.Name(), |
| 634 | )) |
| 635 |
nothing calls this directly
no test coverage detected