| 71 | assert len(list(r.stream())) |
| 72 | |
| 73 | def test_error(self): |
| 74 | buf = io.BytesIO() |
| 75 | buf.write(b"bogus") |
| 76 | buf.seek(0) |
| 77 | r = mitmproxy.io.FlowReader(buf) |
| 78 | with pytest.raises(FlowReadException, match="Invalid data format"): |
| 79 | list(r.stream()) |
| 80 | |
| 81 | buf = io.BytesIO() |
| 82 | f = tflow.tdummyflow() |
| 83 | w = mitmproxy.io.FlowWriter(buf) |
| 84 | w.add(f) |
| 85 | |
| 86 | buf = io.BytesIO(buf.getvalue().replace(b"dummy", b"nknwn")) |
| 87 | r = mitmproxy.io.FlowReader(buf) |
| 88 | with pytest.raises(FlowReadException, match="Unknown flow type"): |
| 89 | list(r.stream()) |
| 90 | |
| 91 | f = FlowReadException("foo") |
| 92 | assert str(f) == "foo" |
| 93 | |
| 94 | def test_versioncheck(self): |
| 95 | f = tflow.tflow() |