(project, service, version, minReplicas string)
| 15 | } |
| 16 | |
| 17 | func (s *Scaler) isStreamActive(project, service, version, minReplicas string) bool { |
| 18 | // Create a context |
| 19 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 20 | defer cancel() |
| 21 | |
| 22 | // Return true if min replicas is not equal to zero. |
| 23 | if minReplicas != "0" { |
| 24 | return true |
| 25 | } |
| 26 | |
| 27 | // Check if stream exists |
| 28 | count := 0 |
| 29 | for { |
| 30 | // Return if count is 3 |
| 31 | if count == 3 { |
| 32 | return false |
| 33 | } |
| 34 | |
| 35 | // Check if key exists |
| 36 | exists, err := s.pubsubClient.CheckIfKeyExists(ctx, generateKey(project, service, version)) |
| 37 | if err != nil { |
| 38 | count++ |
| 39 | time.Sleep(2 * time.Second) |
| 40 | continue |
| 41 | } |
| 42 | |
| 43 | return exists |
| 44 | } |
| 45 | } |
| 46 | |
| 47 | func (s *Scaler) addIsActiveStream(project, service, version string, ch chan bool) { |
| 48 | // First remove the previous stream with the same name |
no test coverage detected