MCPcopy Index your code
hub / github.com/pyodide/pyodide / tcp_server

Function tcp_server

src/tests/test_python_socket.py:18–55  ·  view source on GitHub ↗

Start a TCP server on an OS-assigned port in a background thread. Yields the (host, port) the server is listening on. *handler* is called with ``(conn, addr)`` for each accepted connection.

(handler, *, timeout=5.0)

Source from the content-addressed store, hash-verified

16
17@contextlib.contextmanager
18def tcp_server(handler, *, timeout=5.0):
19 """Start a TCP server on an OS-assigned port in a background thread.
20
21 Yields the (host, port) the server is listening on.
22 *handler* is called with ``(conn, addr)`` for each accepted connection.
23 """
24 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
25 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
26 server_socket.bind(("127.0.0.1", 0))
27 server_socket.listen(1)
28 server_socket.settimeout(timeout)
29 host, port = server_socket.getsockname()
30
31 errors: list[str] = []
32 ready = threading.Event()
33
34 def _serve():
35 ready.set()
36 try:
37 conn, addr = server_socket.accept()
38 try:
39 handler(conn, addr)
40 finally:
41 conn.close()
42 except Exception as e:
43 errors.append(str(e))
44 finally:
45 server_socket.close()
46
47 thread = threading.Thread(target=_serve, daemon=True)
48 thread.start()
49 ready.wait(timeout=timeout)
50
51 try:
52 yield host, port
53 finally:
54 thread.join(timeout=timeout)
55 assert not errors, f"Server error: {errors[0]}"
56
57
58def test_socket_connect(selenium_nodesock):

Callers 15

test_socket_connectFunction · 0.85
test_socket_getpeernameFunction · 0.85
test_socket_getsocknameFunction · 0.85
test_socket_filenoFunction · 0.85
test_socket_recv_eofFunction · 0.85
test_socket_makefileFunction · 0.85

Calls 2

bindMethod · 0.80
joinMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…