MCPcopy Index your code
hub / github.com/bugy/script-server / _ReplayPipe

Class _ReplayPipe

src/react/observable.py:141–164  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

139
140
141class _ReplayPipe(ReplayObservable):
142 def __init__(self, source_observable):
143 super().__init__()
144 self.source = source_observable
145 self.source.subscribe(self)
146
147 def push(self, data: T):
148 raise RuntimeError('Piped observable is read-only')
149
150 def close(self):
151 raise RuntimeError('Piped observable is read-only')
152
153 def on_next(self, data):
154 self._push(data)
155
156 def on_close(self):
157 self._close()
158
159 def dispose(self):
160 self.source.subscribe_on_close(self._defer_dispose)
161
162 def _defer_dispose(self):
163 self.source.unsubscribe(self)
164 super().dispose()
165
166
167class _MappedPipe(PipedObservable):

Callers 1

replayMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected