MCPcopy Index your code
hub / github.com/praw-dev/praw / test_stream__exception_handler

Method test_stream__exception_handler

tests/unit/models/test_util.py:147–165  ·  view source on GitHub ↗
(self, monkeypatch)

Source from the content-addressed store, hash-verified

145 seen.add(thing)
146
147 def test_stream__exception_handler(self, monkeypatch):
148 monkeypatch.setattr("praw.models.util.time.sleep", lambda *_: None)
149 Thing = namedtuple("Thing", ["fullname"])
150 handled = []
151 responses = [RuntimeError("boom"), [Thing(1)], [Thing(2)]]
152
153 def generate(limit, **kwargs):
154 response = responses.pop(0)
155 if isinstance(response, Exception):
156 raise response
157 return response
158
159 stream = stream_generator(generate, exception_handler=handled.append)
160 # The first fetch raises and is handled; the stream then resumes and keeps
161 # yielding new items on subsequent iterations.
162 assert next(stream).fullname == 1
163 assert next(stream).fullname == 2
164 assert len(handled) == 1
165 assert str(handled[0]) == "boom"
166
167 def test_stream__exception_handler__reraise(self):
168 def generate(limit, **kwargs):

Callers

nothing calls this directly

Calls 1

stream_generatorFunction · 0.90

Tested by

no test coverage detected