(job: Job, source: Path, job_dir: Path)
| 22 | |
| 23 | |
| 24 | def separate(job: Job, source: Path, job_dir: Path) -> Path: |
| 25 | _set(job, status="separating", progress=0.0, stage="Separating stems...") |
| 26 | |
| 27 | cmd = [ |
| 28 | sys.executable, |
| 29 | "-m", |
| 30 | "demucs", |
| 31 | "-n", |
| 32 | DEMUCS_MODEL, |
| 33 | "-d", |
| 34 | DEMUCS_DEVICE, |
| 35 | "-o", |
| 36 | str(job_dir), |
| 37 | str(source), |
| 38 | ] |
| 39 | env = os.environ.copy() |
| 40 | try: |
| 41 | import certifi |
| 42 | |
| 43 | env.setdefault("SSL_CERT_FILE", certifi.where()) |
| 44 | env.setdefault("REQUESTS_CA_BUNDLE", certifi.where()) |
| 45 | except ModuleNotFoundError: |
| 46 | pass |
| 47 | |
| 48 | proc = subprocess.Popen( |
| 49 | cmd, |
| 50 | stdout=subprocess.DEVNULL, |
| 51 | stderr=subprocess.PIPE, |
| 52 | text=True, |
| 53 | bufsize=0, |
| 54 | env=env, |
| 55 | ) |
| 56 | if proc.stderr is None: |
| 57 | raise RuntimeError("demucs subprocess has no stderr pipe") |
| 58 | set_proc(job.id, proc) |
| 59 | |
| 60 | # tqdm uses \r to redraw -- read char-by-char and split on \r or \n. |
| 61 | # Keep the last few non-progress lines so we can surface them if demucs |
| 62 | # exits non-zero (otherwise the only signal would be a bare exit code). |
| 63 | buf = "" |
| 64 | tail: list[str] = [] |
| 65 | last_output: list[float] = [time.monotonic()] |
| 66 | # Event set by the reader loop when the process exits normally so the |
| 67 | # watchdog can wake up immediately instead of waiting out its 30 s sleep. |
| 68 | _done_evt = threading.Event() |
| 69 | |
| 70 | def _watchdog() -> None: |
| 71 | while not _done_evt.wait(timeout=30): |
| 72 | if proc.poll() is not None: |
| 73 | return |
| 74 | if time.monotonic() - last_output[0] > TIMEOUT_DEMUCS_STALL: |
| 75 | logger.warning( |
| 76 | "demucs stalled for %ss with no output, terminating job %s", |
| 77 | TIMEOUT_DEMUCS_STALL, |
| 78 | job.id, |
| 79 | ) |
| 80 | proc.terminate() |
| 81 | return |
no test coverage detected