MCPcopy
hub / github.com/spacecloud-io/space-cloud / StreamIsActive

Method StreamIsActive

runner/modules/scaler/keda.go:74–116  ·  view source on GitHub ↗

StreamIsActive is called by keda controller once to allow us to stream isActive events to it.

(scaledObject *pb.ScaledObjectRef, epsServer pb.ExternalScaler_StreamIsActiveServer)

Source from the content-addressed store, hash-verified

72
73// StreamIsActive is called by keda controller once to allow us to stream isActive events to it.
74func (s *Scaler) StreamIsActive(scaledObject *pb.ScaledObjectRef, epsServer pb.ExternalScaler_StreamIsActiveServer) error {
75 ctx := epsServer.Context()
76
77 // Extract the service details first
78 project, p1 := scaledObject.ScalerMetadata["project"]
79 service, p2 := scaledObject.ScalerMetadata["service"]
80 version, p3 := scaledObject.ScalerMetadata["version"]
81
82 // Throw an error if the fields are not present
83 if !p1 || !p2 || !p3 {
84 return helpers.Logger.LogError(helpers.GetRequestID(ctx), "Invalid fields provided in keda scaler config", nil, map[string]interface{}{"project": project, "service": service, "version": version})
85 }
86
87 helpers.Logger.LogDebug(helpers.GetRequestID(ctx), "Keda stream is active called", map[string]interface{}{"project": project, "service": service, "version": version})
88
89 // Create a channel which we can use to write data to this stream
90 ch := make(chan bool, 5)
91
92 // Add stream to internal map
93 s.addIsActiveStream(project, service, version, ch)
94 defer s.removeIsActiveStream(project, service, version)
95
96 for {
97 select {
98 // Exit if the stream has closed
99 case <-ctx.Done():
100 helpers.Logger.LogDebug(helpers.GetRequestID(ctx), "Exiting from keda is active stream", map[string]interface{}{"project": project, "service": service, "version": version})
101 return nil
102
103 // Forward isActive messages to server
104 case isActive := <-ch:
105 // Quit if isActive is false
106 if !isActive {
107 return nil
108 }
109
110 helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Streaming is active result: %v", isActive), map[string]interface{}{"project": project, "service": service, "version": version})
111 if err := epsServer.Send(&pb.IsActiveResponse{Result: isActive}); err != nil {
112 _ = helpers.Logger.LogError(helpers.GetRequestID(ctx), "Unable to forward is stream active resonse to keda", err, map[string]interface{}{"project": project, "service": service, "version": version})
113 }
114 }
115 }
116}
117
118// IsActive is called by the keda controller to check if keda should scale the deployment from 0 to 1
119func (s *Scaler) IsActive(ctx context.Context, scaledObject *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) {

Callers

nothing calls this directly

Calls 4

addIsActiveStreamMethod · 0.95
removeIsActiveStreamMethod · 0.95
ContextMethod · 0.65
SendMethod · 0.65

Tested by

no test coverage detected