MCPcopy
hub / github.com/Simple-Efficient/RL-Factory / wrapper_mp

Function wrapper_mp

verl/utils/py_functional.py:104–134  ·  view source on GitHub ↗
(*args, **kwargs)

Source from the content-addressed store, hash-verified

102 # --- Multiprocessing based timeout (existing logic) ---
103 @wraps(func)
104 def wrapper_mp(*args, **kwargs):
105 q = multiprocessing.Queue(maxsize=1)
106 process = multiprocessing.Process(target=_mp_target_wrapper, args=(func, q, args, kwargs))
107 process.start()
108 process.join(timeout=seconds)
109
110 if process.is_alive():
111 process.terminate()
112 process.join(timeout=0.5) # Give it a moment to terminate
113 if process.is_alive():
114 print(f"Warning: Process {process.pid} did not terminate gracefully after timeout.")
115 # Update function name in error message if needed (optional but good practice)
116 raise TimeoutError(f"Function {func.__name__} timed out after {seconds} seconds (multiprocessing)!")
117
118 try:
119 success, result_or_exc = q.get(timeout=0.1) # Small timeout for queue read
120 if success:
121 return result_or_exc
122 else:
123 raise result_or_exc # Reraise exception from child
124 except queue.Empty as err:
125 exitcode = process.exitcode
126 if exitcode is not None and exitcode != 0:
127 raise RuntimeError(f"Child process exited with error (exitcode: {exitcode}) before returning result.") from err
128 else:
129 # Should have timed out if queue is empty after join unless process died unexpectedly
130 # Update function name in error message if needed (optional but good practice)
131 raise TimeoutError(f"Operation timed out or process finished unexpectedly without result (exitcode: {exitcode}).") from err
132 finally:
133 q.close()
134 q.join_thread()
135
136 return wrapper_mp
137

Callers

nothing calls this directly

Calls 2

startMethod · 0.80
getMethod · 0.45

Tested by

no test coverage detected