(
dataset: str,
code: str,
inputs: List[Any],
entry_point: str,
expected,
atol,
ref_time: List[float],
fast_check: bool = False,
min_time_limit: float = DEFAULT_MIN_TIME_LIMIT,
gt_time_limit_factor: float = DEFAULT_GT_TIME_LIMIT_FACTOR,
)
| 221 | |
| 222 | |
| 223 | def untrusted_check( |
| 224 | dataset: str, |
| 225 | code: str, |
| 226 | inputs: List[Any], |
| 227 | entry_point: str, |
| 228 | expected, |
| 229 | atol, |
| 230 | ref_time: List[float], |
| 231 | fast_check: bool = False, |
| 232 | min_time_limit: float = DEFAULT_MIN_TIME_LIMIT, |
| 233 | gt_time_limit_factor: float = DEFAULT_GT_TIME_LIMIT_FACTOR, |
| 234 | ) -> Tuple[str, np.ndarray]: |
| 235 | time_limits = [max(min_time_limit, gt_time_limit_factor * t) for t in ref_time] |
| 236 | timeout = min(os.getenv("EVALPLUS_TIMEOUT_PER_TASK", 60), sum(time_limits)) + 1 |
| 237 | if not fast_check: |
| 238 | timeout += 1 # extra time for data collection |
| 239 | |
| 240 | # shared memory objects |
| 241 | progress = Value("i", 0) |
| 242 | stat = Value("i", _UNKNOWN) |
| 243 | details = Array("b", [False for _ in range(len(inputs))]) |
| 244 | |
| 245 | p = multiprocessing.Process( |
| 246 | target=unsafe_execute, |
| 247 | args=( |
| 248 | dataset, |
| 249 | entry_point, |
| 250 | code, |
| 251 | inputs, |
| 252 | expected, |
| 253 | time_limits, |
| 254 | atol, |
| 255 | fast_check, |
| 256 | # return values |
| 257 | stat, |
| 258 | details, |
| 259 | progress, |
| 260 | ), |
| 261 | ) |
| 262 | p.start() |
| 263 | p.join(timeout=timeout + 1) |
| 264 | if p.is_alive(): |
| 265 | p.terminate() |
| 266 | time.sleep(0.1) |
| 267 | if p.is_alive(): |
| 268 | p.kill() |
| 269 | time.sleep(0.1) |
| 270 | |
| 271 | stat = _mapping[stat.value] |
| 272 | details = details[: progress.value] |
| 273 | |
| 274 | if not stat: |
| 275 | stat = TIMEOUT |
| 276 | |
| 277 | if stat == PASS: |
| 278 | if len(details) != len(inputs) or not all(details): |
| 279 | stat = FAIL |
| 280 |
no outgoing calls
no test coverage detected