| 181 | |
| 182 | class _TimeBufferedPipe(PipedObservable): |
| 183 | def __init__(self, source_observable: ObservableBase, period_millis, aggregate_function=None): |
| 184 | super().__init__(source_observable) |
| 185 | |
| 186 | self.period_millis = period_millis |
| 187 | self.buffer_chunks = [] |
| 188 | self.aggregate_function = aggregate_function |
| 189 | self.buffer_lock = threading.RLock() |
| 190 | self.subscriber_lock = threading.RLock() |
| 191 | self.source_closed = False |
| 192 | |
| 193 | self.flushing_thread = threading.Thread(target=self.flush_buffer) |
| 194 | self.flushing_thread.start() |
| 195 | |
| 196 | source_observable.subscribe(self) |
| 197 | |
| 198 | def on_next(self, data): |
| 199 | with self.buffer_lock: |