MCPcopy Index your code
hub / github.com/bugy/script-server / _MappedPipe

Class _MappedPipe

src/react/observable.py:167–179  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

165
166
167class _MappedPipe(PipedObservable):
168 def __init__(self, source_observable, map_function):
169 super().__init__(source_observable)
170
171 self.map_function = map_function
172 source_observable.subscribe(self)
173
174 def on_next(self, data):
175 mapped_data = self.map_function(data)
176 self._push(mapped_data)
177
178 def on_close(self):
179 self._close()
180
181
182class _TimeBufferedPipe(PipedObservable):

Callers 1

mapMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected