| 484 | } |
| 485 | |
| 486 | func (c *Cluster) syncStream(appId string) error { |
| 487 | var ( |
| 488 | streams *zalandov1.FabricEventStreamList |
| 489 | err error |
| 490 | ) |
| 491 | c.setProcessName("syncing stream with applicationId %s", appId) |
| 492 | c.logger.Debugf("syncing stream with applicationId %s", appId) |
| 493 | |
| 494 | listOptions := metav1.ListOptions{ |
| 495 | LabelSelector: c.labelsSet(false).String(), |
| 496 | } |
| 497 | streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) |
| 498 | if err != nil { |
| 499 | return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err) |
| 500 | } |
| 501 | |
| 502 | streamExists := false |
| 503 | for _, stream := range streams.Items { |
| 504 | if stream.Spec.ApplicationId != appId { |
| 505 | continue |
| 506 | } |
| 507 | streamExists = true |
| 508 | c.Streams[appId] = &stream |
| 509 | desiredStreams := c.generateFabricEventStream(appId) |
| 510 | if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { |
| 511 | c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) |
| 512 | stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences |
| 513 | c.setProcessName("updating event streams with applicationId %s", appId) |
| 514 | updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) |
| 515 | if err != nil { |
| 516 | return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) |
| 517 | } |
| 518 | c.Streams[appId] = updatedStream |
| 519 | } |
| 520 | if match, reason := c.compareStreams(&stream, desiredStreams); !match { |
| 521 | c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) |
| 522 | // make sure to keep the old name with randomly generated suffix |
| 523 | desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name |
| 524 | updatedStream, err := c.updateStreams(desiredStreams) |
| 525 | if err != nil { |
| 526 | return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) |
| 527 | } |
| 528 | c.Streams[appId] = updatedStream |
| 529 | c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) |
| 530 | } |
| 531 | break |
| 532 | } |
| 533 | |
| 534 | if !streamExists { |
| 535 | c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) |
| 536 | createdStream, err := c.createStreams(appId) |
| 537 | if err != nil { |
| 538 | return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) |
| 539 | } |
| 540 | c.logger.Infof("event streams %q have been successfully created", createdStream.Name) |
| 541 | c.Streams[appId] = createdStream |
| 542 | } |
| 543 | |