MCPcopy Index your code
hub / github.com/mitmproxy/mitmproxy / load_flows

Method load_flows

mitmproxy/addons/readfile.py:43–59  ·  view source on GitHub ↗
(self, fo: BinaryIO)

Source from the content-addressed store, hash-verified

41 self.filter = None
42
43 async def load_flows(self, fo: BinaryIO) -> int:
44 cnt = 0
45 freader = io.FlowReader(fo)
46 try:
47 for flow in freader.stream():
48 if self.filter and not self.filter(flow):
49 continue
50 await ctx.master.load_flow(flow)
51 cnt += 1
52 except (OSError, exceptions.FlowReadException) as e:
53 if cnt:
54 logging.warning("Flow file corrupted - loaded %i flows." % cnt)
55 else:
56 logging.error("Flow file corrupted.")
57 raise exceptions.FlowReadException(str(e)) from e
58 else:
59 return cnt
60
61 async def load_flows_from_path(self, path: str) -> int:
62 path = os.path.expanduser(path)

Callers 4

test_corruptMethod · 0.95
load_flows_from_pathMethod · 0.95
test_stdinMethod · 0.45
load_flows_from_pathMethod · 0.45

Calls 4

streamMethod · 0.95
filterMethod · 0.45
load_flowMethod · 0.45
errorMethod · 0.45

Tested by 2

test_corruptMethod · 0.76
test_stdinMethod · 0.36