(opts ...Option)
| 32 | } |
| 33 | |
| 34 | func (i *createIterable) Observe(opts ...Option) <-chan Item { |
| 35 | mergedOptions := append(i.opts, opts...) |
| 36 | option := parseOptions(mergedOptions...) |
| 37 | |
| 38 | if !option.isConnectable() { |
| 39 | return i.next |
| 40 | } |
| 41 | |
| 42 | if option.isConnectOperation() { |
| 43 | i.connect(option.buildContext(emptyContext)) |
| 44 | return nil |
| 45 | } |
| 46 | |
| 47 | ch := option.buildChannel() |
| 48 | i.mutex.Lock() |
| 49 | i.subscribers = append(i.subscribers, ch) |
| 50 | i.mutex.Unlock() |
| 51 | return ch |
| 52 | } |
| 53 | |
| 54 | func (i *createIterable) connect(ctx context.Context) { |
| 55 | i.mutex.Lock() |
nothing calls this directly
no test coverage detected