MCPcopy Index your code
hub / github.com/ReactiveX/RxGo / WindowWithTime

Method WindowWithTime

observable_operator.go:2775–2857  ·  view source on GitHub ↗

WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.

(timespan Duration, opts ...Option)

Source from the content-addressed store, hash-verified

2773// WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows
2774// and emit them rather than emitting the items one at a time.
2775func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Observable {
2776 if timespan == nil {
2777 return Thrown(IllegalInputError{error: "timespan must no be nil"})
2778 }
2779
2780 f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
2781 observe := o.Observe(opts...)
2782 ch := option.buildChannel()
2783 done := make(chan struct{})
2784 empty := true
2785 mutex := sync.Mutex{}
2786 if !Of(FromChannel(ch)).SendContext(ctx, next) {
2787 return
2788 }
2789
2790 go func() {
2791 defer func() {
2792 mutex.Lock()
2793 close(ch)
2794 mutex.Unlock()
2795 }()
2796 defer close(next)
2797 for {
2798 select {
2799 case <-ctx.Done():
2800 return
2801 case <-done:
2802 return
2803 case <-time.After(timespan.duration()):
2804 mutex.Lock()
2805 if empty {
2806 mutex.Unlock()
2807 continue
2808 }
2809 close(ch)
2810 empty = true
2811 ch = option.buildChannel()
2812 if !Of(FromChannel(ch)).SendContext(ctx, next) {
2813 close(done)
2814 return
2815 }
2816 mutex.Unlock()
2817 }
2818 }
2819 }()
2820
2821 for {
2822 select {
2823 case <-ctx.Done():
2824 return
2825 case <-done:
2826 return
2827 case item, ok := <-observe:
2828 if !ok {
2829 close(done)
2830 return
2831 }
2832 if item.Error() {

Callers

nothing calls this directly

Calls 10

ObserveMethod · 0.95
ThrownFunction · 0.85
OfFunction · 0.85
FromChannelFunction · 0.85
customObservableOperatorFunction · 0.85
SendContextMethod · 0.80
buildChannelMethod · 0.65
durationMethod · 0.65
ErrorMethod · 0.65
getErrorStrategyMethod · 0.65

Tested by

no test coverage detected