MCPcopy Index your code
hub / github.com/zalando/postgres-operator / syncStream

Method syncStream

pkg/cluster/streams.go:486–545  ·  view source on GitHub ↗
(appId string)

Source from the content-addressed store, hash-verified

484}
485
486func (c *Cluster) syncStream(appId string) error {
487 var (
488 streams *zalandov1.FabricEventStreamList
489 err error
490 )
491 c.setProcessName("syncing stream with applicationId %s", appId)
492 c.logger.Debugf("syncing stream with applicationId %s", appId)
493
494 listOptions := metav1.ListOptions{
495 LabelSelector: c.labelsSet(false).String(),
496 }
497 streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
498 if err != nil {
499 return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
500 }
501
502 streamExists := false
503 for _, stream := range streams.Items {
504 if stream.Spec.ApplicationId != appId {
505 continue
506 }
507 streamExists = true
508 c.Streams[appId] = &stream
509 desiredStreams := c.generateFabricEventStream(appId)
510 if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
511 c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
512 stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
513 c.setProcessName("updating event streams with applicationId %s", appId)
514 updatedStream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{})
515 if err != nil {
516 return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
517 }
518 c.Streams[appId] = updatedStream
519 }
520 if match, reason := c.compareStreams(&stream, desiredStreams); !match {
521 c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
522 // make sure to keep the old name with randomly generated suffix
523 desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name
524 updatedStream, err := c.updateStreams(desiredStreams)
525 if err != nil {
526 return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
527 }
528 c.Streams[appId] = updatedStream
529 c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
530 }
531 break
532 }
533
534 if !streamExists {
535 c.logger.Infof("event streams with applicationId %s do not exist, create it", appId)
536 createdStream, err := c.createStreams(appId)
537 if err != nil {
538 return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err)
539 }
540 c.logger.Infof("event streams %q have been successfully created", createdStream.Name)
541 c.Streams[appId] = createdStream
542 }
543

Callers 6

syncStreamsMethod · 0.95
TestSyncStreamsFunction · 0.80
TestUpdateStreamsFunction · 0.80
patchPostgresqlStreamsFunction · 0.80
TestDeleteStreamsFunction · 0.80

Calls 10

setProcessNameMethod · 0.95
labelsSetMethod · 0.95
compareStreamsMethod · 0.95
updateStreamsMethod · 0.95
createStreamsMethod · 0.95
ListMethod · 0.65
FabricEventStreamsMethod · 0.65
UpdateMethod · 0.65
StringMethod · 0.45

Tested by 5

TestSyncStreamsFunction · 0.64
TestUpdateStreamsFunction · 0.64
patchPostgresqlStreamsFunction · 0.64
TestDeleteStreamsFunction · 0.64