MCPcopy
hub / github.com/reflex-dev/reflex / create_server_and_test

Function create_server_and_test

tests/units/utils/test_processes.py:122–135  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

120 do_close = threading.Event()
121
122 def create_server_and_test():
123 nonlocal do_close, is_open, shared
124 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125 server.bind(("", 0))
126
127 server.listen(1)
128
129 port = server.getsockname()[1]
130 shared = port
131
132 is_open.set()
133 do_close.wait(timeout=DEFAULT_TIMEOUT)
134
135 server.close()
136
137 thread = threading.Thread(target=create_server_and_test)
138 thread.start()

Callers

nothing calls this directly

Calls 4

bindMethod · 0.80
listenMethod · 0.80
setMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected