MCPcopy Index your code
hub / github.com/praw-dev/praw / test_stream

Method test_stream

tests/unit/models/test_util.py:126–145  ·  view source on GitHub ↗
(
        self,
    )

Source from the content-addressed store, hash-verified

124 expected_fullname += 1
125
126 def test_stream(
127 self,
128 ):
129 Thing = namedtuple("Thing", ["fullname"])
130 initial_things = [Thing(n) for n in reversed(range(100))]
131 counter = 99
132
133 def generate(limit, **kwargs):
134 nonlocal counter
135 counter += 1
136 if counter % 2 == 0:
137 return initial_things
138 return [Thing(counter)] + initial_things[:-1]
139
140 stream = stream_generator(generate)
141 seen = set()
142 for _ in range(400):
143 thing = next(stream)
144 assert thing not in seen
145 seen.add(thing)
146
147 def test_stream__exception_handler(self, monkeypatch):
148 monkeypatch.setattr("praw.models.util.time.sleep", lambda *_: None)

Callers

nothing calls this directly

Calls 2

stream_generatorFunction · 0.90
addMethod · 0.45

Tested by

no test coverage detected