MCPcopy Index your code
hub / github.com/mitmproxy/mitmproxy / next_flow

Method next_flow

mitmproxy/addons/serverplayback.py:225–248  ·  view source on GitHub ↗

Returns the next flow object, or None if no matching flow was found.

(self, flow: http.HTTPFlow)

Source from the content-addressed store, hash-verified

223 return hashlib.sha256(repr(key).encode("utf8", "surrogateescape")).digest()
224
225 def next_flow(self, flow: http.HTTPFlow) -> http.HTTPFlow | None:
226 """
227 Returns the next flow object, or None if no matching flow was
228 found.
229 """
230 hash = self._hash(flow)
231 if hash in self.flowmap:
232 if ctx.options.server_replay_reuse or ctx.options.server_replay_nopop:
233 return next(
234 (flow for flow in self.flowmap[hash] if flow.response), None
235 )
236 else:
237 ret = self.flowmap[hash].pop(0)
238 while not ret.response:
239 if self.flowmap[hash]:
240 ret = self.flowmap[hash].pop(0)
241 else:
242 del self.flowmap[hash]
243 return None
244 if not self.flowmap[hash]:
245 del self.flowmap[hash]
246 return ret
247 else:
248 return None
249
250 def configure(self, updated):
251 if ctx.options.server_replay_kill_extra:

Callers 6

test_server_playbackFunction · 0.95
test_add_flowsFunction · 0.95
test_loadFunction · 0.95
requestMethod · 0.95

Calls 2

_hashMethod · 0.95
popMethod · 0.45

Tested by 5

test_server_playbackFunction · 0.76
test_add_flowsFunction · 0.76
test_loadFunction · 0.76