()
| 20 | code: str, inputs: List, warmups: List, entry_point: str |
| 21 | ) -> Union[str, float]: |
| 22 | def unsafe_execute(): |
| 23 | with create_tempdir(): |
| 24 | # These system calls are needed when cleaning up tempdir. |
| 25 | import os |
| 26 | import shutil |
| 27 | |
| 28 | rmtree = shutil.rmtree |
| 29 | rmdir = os.rmdir |
| 30 | chdir = os.chdir |
| 31 | # Disable functionalities that can make destructive changes to the test. |
| 32 | reliability_guard() |
| 33 | # load functions |
| 34 | exec_globals = {} |
| 35 | exec(code, exec_globals) |
| 36 | fn = exec_globals[entry_point] |
| 37 | try: |
| 38 | # warmup calls |
| 39 | for warmup in warmups: |
| 40 | with swallow_io(): |
| 41 | fn(*warmup) |
| 42 | |
| 43 | start_time = time.time() |
| 44 | # real call |
| 45 | with swallow_io(): |
| 46 | with time_limit(3): |
| 47 | fn(*inputs) |
| 48 | duration = time.time() - start_time |
| 49 | |
| 50 | result.append(duration) |
| 51 | except TimeoutException: |
| 52 | result.append("timed out") |
| 53 | except BaseException as e: |
| 54 | result.append("thrown exception") |
| 55 | # Needed for cleaning up. |
| 56 | shutil.rmtree = rmtree |
| 57 | os.rmdir = rmdir |
| 58 | os.chdir = chdir |
| 59 | |
| 60 | manager = multiprocessing.Manager() |
| 61 | result = manager.list() |
nothing calls this directly
no test coverage detected