Test creating multiple sockets simultaneously.
(selenium_nodesock)
| 325 | |
| 326 | |
| 327 | def test_socket_create_multiple(selenium_nodesock): |
| 328 | """Test creating multiple sockets simultaneously.""" |
| 329 | |
| 330 | def echo_handler(conn, _addr): |
| 331 | data = conn.recv(1024) |
| 332 | _, sport = conn.getsockname() |
| 333 | conn.sendall(f"Server{sport}:{data.decode()}".encode()) |
| 334 | |
| 335 | @run_in_pyodide |
| 336 | def run(selenium, host1, port1, host2, port2): |
| 337 | import socket |
| 338 | |
| 339 | s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 340 | s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 341 | |
| 342 | s1.connect((host1, port1)) |
| 343 | s2.connect((host2, port2)) |
| 344 | |
| 345 | s1.sendall(b"Hello1") |
| 346 | s2.sendall(b"Hello2") |
| 347 | |
| 348 | r1 = s1.recv(1024) |
| 349 | r2 = s2.recv(1024) |
| 350 | |
| 351 | s1.close() |
| 352 | s2.close() |
| 353 | |
| 354 | return [r1.decode(), r2.decode()] |
| 355 | |
| 356 | with ( |
| 357 | tcp_server(echo_handler) as (host1, port1), |
| 358 | tcp_server(echo_handler) as (host2, port2), |
| 359 | ): |
| 360 | result = run(selenium_nodesock, host1, port1, host2, port2) |
| 361 | assert result == [f"Server{port1}:Hello1", f"Server{port2}:Hello2"] |
| 362 | |
| 363 | |
| 364 | def test_socket_recv_eof(selenium_nodesock): |
nothing calls this directly
no test coverage detected
searching dependent graphs…