(
self,
)
| 124 | expected_fullname += 1 |
| 125 | |
| 126 | def test_stream( |
| 127 | self, |
| 128 | ): |
| 129 | Thing = namedtuple("Thing", ["fullname"]) |
| 130 | initial_things = [Thing(n) for n in reversed(range(100))] |
| 131 | counter = 99 |
| 132 | |
| 133 | def generate(limit, **kwargs): |
| 134 | nonlocal counter |
| 135 | counter += 1 |
| 136 | if counter % 2 == 0: |
| 137 | return initial_things |
| 138 | return [Thing(counter)] + initial_things[:-1] |
| 139 | |
| 140 | stream = stream_generator(generate) |
| 141 | seen = set() |
| 142 | for _ in range(400): |
| 143 | thing = next(stream) |
| 144 | assert thing not in seen |
| 145 | seen.add(thing) |
| 146 | |
| 147 | def test_stream__exception_handler(self, monkeypatch): |
| 148 | monkeypatch.setattr("praw.models.util.time.sleep", lambda *_: None) |
nothing calls this directly
no test coverage detected