MCPcopy Index your code
hub / github.com/python-websockets/websockets / test_one_input

Function test_one_input

fuzzing/fuzz_websocket_parser.py:12–42  ·  view source on GitHub ↗
(data)

Source from the content-addressed store, hash-verified

10
11
12def test_one_input(data):
13 fdp = atheris.FuzzedDataProvider(data)
14 mask = fdp.ConsumeBool()
15 max_size_enabled = fdp.ConsumeBool()
16 max_size = fdp.ConsumeInt(4)
17 payload = fdp.ConsumeBytes(atheris.ALL_REMAINING)
18
19 reader = StreamReader()
20 reader.feed_data(payload)
21 reader.feed_eof()
22
23 parser = Frame.parse(
24 reader.read_exact,
25 mask=mask,
26 max_size=max_size if max_size_enabled else None,
27 )
28
29 try:
30 next(parser)
31 except StopIteration as exc:
32 assert isinstance(exc.value, Frame)
33 return # input accepted
34 except (
35 EOFError, # connection is closed without a full WebSocket frame
36 UnicodeDecodeError, # frame contains invalid UTF-8
37 PayloadTooBig, # frame's payload size exceeds ``max_size``
38 ProtocolError, # frame contains incorrect values
39 ):
40 return # input rejected with a documented exception
41
42 raise RuntimeError("parsing didn't complete")
43
44
45def main():

Callers

nothing calls this directly

Calls 4

feed_dataMethod · 0.95
feed_eofMethod · 0.95
StreamReaderClass · 0.90
parseMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…