MCPcopy Index your code
hub / github.com/python-websockets/websockets / read_line

Method read_line

src/websockets/streams.py:20–52  ·  view source on GitHub ↗

Read a LF-terminated line from the stream. This is a generator-based coroutine. The return value includes the LF character. Args: m: Maximum number bytes to read; this is a security limit. Raises: EOFError: If the stream ends witho

(self, m: int)

Source from the content-addressed store, hash-verified

18 self.eof = False
19
20 def read_line(self, m: int) -> Generator[None, None, bytearray]:
21 """
22 Read a LF-terminated line from the stream.
23
24 This is a generator-based coroutine.
25
26 The return value includes the LF character.
27
28 Args:
29 m: Maximum number bytes to read; this is a security limit.
30
31 Raises:
32 EOFError: If the stream ends without a LF.
33 RuntimeError: If the stream ends in more than ``m`` bytes.
34
35 """
36 n = 0 # number of bytes to read
37 p = 0 # number of bytes without a newline
38 while True:
39 n = self.buffer.find(b"\n", p) + 1
40 if n > 0:
41 break
42 p = len(self.buffer)
43 if p > m:
44 raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
45 if self.eof:
46 raise EOFError(f"stream ends after {p} bytes, before end of line")
47 yield
48 if n > m:
49 raise RuntimeError(f"read {n} bytes, expected no more than {m} bytes")
50 r = self.buffer[:n]
51 del self.buffer[:n]
52 return r
53
54 def read_exact(self, n: int) -> Generator[None, None, bytearray]:
55 """

Callers 6

parseMethod · 0.80
test_read_lineMethod · 0.80

Calls

no outgoing calls