MCPcopy
hub / github.com/bugy/script-server / __init__

Method __init__

src/react/observable.py:183–196  ·  view source on GitHub ↗
(self, source_observable: ObservableBase, period_millis, aggregate_function=None)

Source from the content-addressed store, hash-verified

181
182class _TimeBufferedPipe(PipedObservable):
183 def __init__(self, source_observable: ObservableBase, period_millis, aggregate_function=None):
184 super().__init__(source_observable)
185
186 self.period_millis = period_millis
187 self.buffer_chunks = []
188 self.aggregate_function = aggregate_function
189 self.buffer_lock = threading.RLock()
190 self.subscriber_lock = threading.RLock()
191 self.source_closed = False
192
193 self.flushing_thread = threading.Thread(target=self.flush_buffer)
194 self.flushing_thread.start()
195
196 source_observable.subscribe(self)
197
198 def on_next(self, data):
199 with self.buffer_lock:

Callers

nothing calls this directly

Calls 3

__init__Method · 0.45
startMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected