| 165 | |
| 166 | |
| 167 | class _MappedPipe(PipedObservable): |
| 168 | def __init__(self, source_observable, map_function): |
| 169 | super().__init__(source_observable) |
| 170 | |
| 171 | self.map_function = map_function |
| 172 | source_observable.subscribe(self) |
| 173 | |
| 174 | def on_next(self, data): |
| 175 | mapped_data = self.map_function(data) |
| 176 | self._push(mapped_data) |
| 177 | |
| 178 | def on_close(self): |
| 179 | self._close() |
| 180 | |
| 181 | |
| 182 | class _TimeBufferedPipe(PipedObservable): |