StreamIsActive is called by keda controller once to allow us to stream isActive events to it.
(scaledObject *pb.ScaledObjectRef, epsServer pb.ExternalScaler_StreamIsActiveServer)
| 72 | |
| 73 | // StreamIsActive is called by keda controller once to allow us to stream isActive events to it. |
| 74 | func (s *Scaler) StreamIsActive(scaledObject *pb.ScaledObjectRef, epsServer pb.ExternalScaler_StreamIsActiveServer) error { |
| 75 | ctx := epsServer.Context() |
| 76 | |
| 77 | // Extract the service details first |
| 78 | project, p1 := scaledObject.ScalerMetadata["project"] |
| 79 | service, p2 := scaledObject.ScalerMetadata["service"] |
| 80 | version, p3 := scaledObject.ScalerMetadata["version"] |
| 81 | |
| 82 | // Throw an error if the fields are not present |
| 83 | if !p1 || !p2 || !p3 { |
| 84 | return helpers.Logger.LogError(helpers.GetRequestID(ctx), "Invalid fields provided in keda scaler config", nil, map[string]interface{}{"project": project, "service": service, "version": version}) |
| 85 | } |
| 86 | |
| 87 | helpers.Logger.LogDebug(helpers.GetRequestID(ctx), "Keda stream is active called", map[string]interface{}{"project": project, "service": service, "version": version}) |
| 88 | |
| 89 | // Create a channel which we can use to write data to this stream |
| 90 | ch := make(chan bool, 5) |
| 91 | |
| 92 | // Add stream to internal map |
| 93 | s.addIsActiveStream(project, service, version, ch) |
| 94 | defer s.removeIsActiveStream(project, service, version) |
| 95 | |
| 96 | for { |
| 97 | select { |
| 98 | // Exit if the stream has closed |
| 99 | case <-ctx.Done(): |
| 100 | helpers.Logger.LogDebug(helpers.GetRequestID(ctx), "Exiting from keda is active stream", map[string]interface{}{"project": project, "service": service, "version": version}) |
| 101 | return nil |
| 102 | |
| 103 | // Forward isActive messages to server |
| 104 | case isActive := <-ch: |
| 105 | // Quit if isActive is false |
| 106 | if !isActive { |
| 107 | return nil |
| 108 | } |
| 109 | |
| 110 | helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Streaming is active result: %v", isActive), map[string]interface{}{"project": project, "service": service, "version": version}) |
| 111 | if err := epsServer.Send(&pb.IsActiveResponse{Result: isActive}); err != nil { |
| 112 | _ = helpers.Logger.LogError(helpers.GetRequestID(ctx), "Unable to forward is stream active resonse to keda", err, map[string]interface{}{"project": project, "service": service, "version": version}) |
| 113 | } |
| 114 | } |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | // IsActive is called by the keda controller to check if keda should scale the deployment from 0 to 1 |
| 119 | func (s *Scaler) IsActive(ctx context.Context, scaledObject *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) { |
nothing calls this directly
no test coverage detected