WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.
(timespan Duration, opts ...Option)
| 2773 | // WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows |
| 2774 | // and emit them rather than emitting the items one at a time. |
| 2775 | func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Observable { |
| 2776 | if timespan == nil { |
| 2777 | return Thrown(IllegalInputError{error: "timespan must no be nil"}) |
| 2778 | } |
| 2779 | |
| 2780 | f := func(ctx context.Context, next chan Item, option Option, opts ...Option) { |
| 2781 | observe := o.Observe(opts...) |
| 2782 | ch := option.buildChannel() |
| 2783 | done := make(chan struct{}) |
| 2784 | empty := true |
| 2785 | mutex := sync.Mutex{} |
| 2786 | if !Of(FromChannel(ch)).SendContext(ctx, next) { |
| 2787 | return |
| 2788 | } |
| 2789 | |
| 2790 | go func() { |
| 2791 | defer func() { |
| 2792 | mutex.Lock() |
| 2793 | close(ch) |
| 2794 | mutex.Unlock() |
| 2795 | }() |
| 2796 | defer close(next) |
| 2797 | for { |
| 2798 | select { |
| 2799 | case <-ctx.Done(): |
| 2800 | return |
| 2801 | case <-done: |
| 2802 | return |
| 2803 | case <-time.After(timespan.duration()): |
| 2804 | mutex.Lock() |
| 2805 | if empty { |
| 2806 | mutex.Unlock() |
| 2807 | continue |
| 2808 | } |
| 2809 | close(ch) |
| 2810 | empty = true |
| 2811 | ch = option.buildChannel() |
| 2812 | if !Of(FromChannel(ch)).SendContext(ctx, next) { |
| 2813 | close(done) |
| 2814 | return |
| 2815 | } |
| 2816 | mutex.Unlock() |
| 2817 | } |
| 2818 | } |
| 2819 | }() |
| 2820 | |
| 2821 | for { |
| 2822 | select { |
| 2823 | case <-ctx.Done(): |
| 2824 | return |
| 2825 | case <-done: |
| 2826 | return |
| 2827 | case item, ok := <-observe: |
| 2828 | if !ok { |
| 2829 | close(done) |
| 2830 | return |
| 2831 | } |
| 2832 | if item.Error() { |
nothing calls this directly
no test coverage detected