MCPcopy
hub / github.com/livekit/livekit / subscribeDataTrack

Method subscribeDataTrack

pkg/rtc/subscriptionmanager.go:1051–1114  ·  view source on GitHub ↗
(sub *dataTrackSubscription)

Source from the content-addressed store, hash-verified

1049}
1050
1051func (m *SubscriptionManager) subscribeDataTrack(sub *dataTrackSubscription) error {
1052 sub.logger.Debugw("executing subscribe")
1053
1054 if !m.params.Participant.CanSubscribe() {
1055 return ErrNoSubscribePermission
1056 }
1057
1058 trackID := sub.trackID
1059 res := m.params.DataTrackResolver(m.params.Participant, trackID)
1060 sub.logger.Debugw("resolved data track", "result", res)
1061
1062 if res.TrackChangedNotifier != nil && sub.setChangedNotifier(res.TrackChangedNotifier) {
1063 // set callback only when we haven't done it before
1064 // we set the observer before checking for existence of track, so that we may get notified
1065 // when the track becomes available
1066 res.TrackChangedNotifier.AddObserver(string(sub.subscriberID), func() {
1067 m.queueReconcileDataTrack(trackID)
1068 })
1069 }
1070 if res.TrackRemovedNotifier != nil && sub.setRemovedNotifier(res.TrackRemovedNotifier) {
1071 res.TrackRemovedNotifier.AddObserver(string(sub.subscriberID), func() {
1072 // re-resolve the track in case the same track had been re-published
1073 res := m.params.DataTrackResolver(m.params.Participant, trackID)
1074 if res.DataTrack != nil {
1075 // do not unsubscribe, track is still available
1076 return
1077 }
1078 m.handleSourceDataTrackRemoved(trackID)
1079 })
1080 }
1081
1082 dataTrack := res.DataTrack
1083 if dataTrack == nil {
1084 return ErrTrackNotFound
1085 }
1086
1087 sub.setPublisher(res.PublisherIdentity, res.PublisherID)
1088
1089 permChanged := sub.setHasPermission(res.HasPermission)
1090 if permChanged {
1091 m.params.Participant.SendSubscriptionPermissionUpdate(sub.getPublisherID(), trackID, res.HasPermission)
1092 }
1093 if !res.HasPermission {
1094 return ErrNoTrackPermission
1095 }
1096
1097 dataDownTrack, err := dataTrack.AddSubscriber(m.params.Participant)
1098 if err != nil && !errors.Is(err, errAlreadySubscribed) {
1099 return err
1100 }
1101 if err == errAlreadySubscribed {
1102 sub.logger.Debugw("already subscribed to data track")
1103 }
1104 if err == nil && dataDownTrack != nil { // subTrack could be nil if already subscribed
1105 dataDownTrack.OnClose(func() {
1106 m.handleDataDownTrackClose(sub)
1107 })
1108 sub.setDataDownTrack(dataDownTrack)

Callers 1

Calls 15

markSubscribedToMethod · 0.95
setChangedNotifierMethod · 0.80
setRemovedNotifierMethod · 0.80
setPublisherMethod · 0.80
setHasPermissionMethod · 0.80
getPublisherIDMethod · 0.80
setDataDownTrackMethod · 0.80
CanSubscribeMethod · 0.65
AddObserverMethod · 0.65

Tested by

no test coverage detected