(sub *dataTrackSubscription)
| 1049 | } |
| 1050 | |
| 1051 | func (m *SubscriptionManager) subscribeDataTrack(sub *dataTrackSubscription) error { |
| 1052 | sub.logger.Debugw("executing subscribe") |
| 1053 | |
| 1054 | if !m.params.Participant.CanSubscribe() { |
| 1055 | return ErrNoSubscribePermission |
| 1056 | } |
| 1057 | |
| 1058 | trackID := sub.trackID |
| 1059 | res := m.params.DataTrackResolver(m.params.Participant, trackID) |
| 1060 | sub.logger.Debugw("resolved data track", "result", res) |
| 1061 | |
| 1062 | if res.TrackChangedNotifier != nil && sub.setChangedNotifier(res.TrackChangedNotifier) { |
| 1063 | // set callback only when we haven't done it before |
| 1064 | // we set the observer before checking for existence of track, so that we may get notified |
| 1065 | // when the track becomes available |
| 1066 | res.TrackChangedNotifier.AddObserver(string(sub.subscriberID), func() { |
| 1067 | m.queueReconcileDataTrack(trackID) |
| 1068 | }) |
| 1069 | } |
| 1070 | if res.TrackRemovedNotifier != nil && sub.setRemovedNotifier(res.TrackRemovedNotifier) { |
| 1071 | res.TrackRemovedNotifier.AddObserver(string(sub.subscriberID), func() { |
| 1072 | // re-resolve the track in case the same track had been re-published |
| 1073 | res := m.params.DataTrackResolver(m.params.Participant, trackID) |
| 1074 | if res.DataTrack != nil { |
| 1075 | // do not unsubscribe, track is still available |
| 1076 | return |
| 1077 | } |
| 1078 | m.handleSourceDataTrackRemoved(trackID) |
| 1079 | }) |
| 1080 | } |
| 1081 | |
| 1082 | dataTrack := res.DataTrack |
| 1083 | if dataTrack == nil { |
| 1084 | return ErrTrackNotFound |
| 1085 | } |
| 1086 | |
| 1087 | sub.setPublisher(res.PublisherIdentity, res.PublisherID) |
| 1088 | |
| 1089 | permChanged := sub.setHasPermission(res.HasPermission) |
| 1090 | if permChanged { |
| 1091 | m.params.Participant.SendSubscriptionPermissionUpdate(sub.getPublisherID(), trackID, res.HasPermission) |
| 1092 | } |
| 1093 | if !res.HasPermission { |
| 1094 | return ErrNoTrackPermission |
| 1095 | } |
| 1096 | |
| 1097 | dataDownTrack, err := dataTrack.AddSubscriber(m.params.Participant) |
| 1098 | if err != nil && !errors.Is(err, errAlreadySubscribed) { |
| 1099 | return err |
| 1100 | } |
| 1101 | if err == errAlreadySubscribed { |
| 1102 | sub.logger.Debugw("already subscribed to data track") |
| 1103 | } |
| 1104 | if err == nil && dataDownTrack != nil { // subTrack could be nil if already subscribed |
| 1105 | dataDownTrack.OnClose(func() { |
| 1106 | m.handleDataDownTrackClose(sub) |
| 1107 | }) |
| 1108 | sub.setDataDownTrack(dataDownTrack) |
no test coverage detected