MCPcopy
hub / github.com/dapr/dapr / TestResiliencyStreaming

Method TestResiliencyStreaming

pkg/api/grpc/proxy/handler_test.go:578–709  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

576}
577
578func (s *proxyTestSuite) TestResiliencyStreaming() {
579 // Set the resiliency policy
580 s.setupResiliency()
581 defer func() {
582 s.policyDef = nil
583 }()
584
585 s.T().Run("retries are not allowed", func(t *testing.T) {
586 // We're purposely not setting dapr-stream=true in this context because we want to simulate the failure when the RPC is not marked as streaming
587 ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
588 defer cancel()
589 ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
590 diagConsts.GRPCProxyAppIDKey, "test",
591 "dapr-test", t.Name(),
592 ))
593
594 // Invoke the stream
595 stream, err := s.testClient.PingStream(ctx)
596 require.NoError(t, err, "PingStream request should be successful")
597
598 // First message should succeed
599 err = stream.Send(&pb.PingRequest{Value: "1"})
600 require.NoError(t, err, "First message should be sent")
601 res, err := stream.Recv()
602 require.NoError(t, err, "First response should be received")
603 require.NotNil(t, res)
604
605 // Second message should fail
606 s.service.expectPingStreamError.Store(true)
607 defer func() {
608 s.service.expectPingStreamError.Store(false)
609 }()
610 err = stream.SendMsg(&pb.PingRequest{Value: "2"})
611 require.NoError(t, err, "Second message should be sent")
612 _, err = stream.Recv()
613 require.Error(t, err, "Second Recv should fail with error")
614
615 grpcStatus, ok := status.FromError(err)
616 require.True(t, ok, "Error should have a gRPC status code")
617 require.Equal(t, codes.FailedPrecondition, grpcStatus.Code())
618 require.ErrorIs(t, err, errRetryOnStreamingRPC)
619 })
620
621 s.T().Run("timeouts do not apply after initial handshake", func(t *testing.T) {
622 ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
623 defer cancel()
624
625 meter := setupMetrics(s)
626 t.Cleanup(func() {
627 meter.Stop()
628 })
629
630 ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
631 diagConsts.GRPCProxyAppIDKey, testAppID,
632 StreamMetadataKey, "1",
633 "dapr-test", t.Name(),
634 ))
635

Callers

nothing calls this directly

Calls 15

setupResiliencyMethod · 0.95
setupMetricsFunction · 0.85
assertRequestSentMetricsFunction · 0.85
CloseSendMethod · 0.80
RunMethod · 0.65
NameMethod · 0.65
PingStreamMethod · 0.65
SendMethod · 0.65
RecvMethod · 0.65
CleanupMethod · 0.65
ContextMethod · 0.45

Tested by

no test coverage detected