Common async wrapper: acquires the pipeline lock, runs blocking_fn in a thread, then handles success / cancel / error outcomes uniformly.
(
job: Job,
job_dir: Path,
jobs_dir: Path,
blocking_fn,
*fn_args: object,
error_msg: str = "Audio processing failed. Please try again.",
)
| 193 | |
| 194 | |
| 195 | async def _run_async( |
| 196 | job: Job, |
| 197 | job_dir: Path, |
| 198 | jobs_dir: Path, |
| 199 | blocking_fn, |
| 200 | *fn_args: object, |
| 201 | error_msg: str = "Audio processing failed. Please try again.", |
| 202 | ) -> None: |
| 203 | """Common async wrapper: acquires the pipeline lock, runs blocking_fn in a |
| 204 | thread, then handles success / cancel / error outcomes uniformly.""" |
| 205 | try: |
| 206 | async with _pipeline_lock: |
| 207 | await asyncio.to_thread(blocking_fn, job, *fn_args, job_dir) |
| 208 | except Exception as e: |
| 209 | if not isinstance(e, JobCancelled) and not job.cancel_requested: |
| 210 | logger.exception("pipeline failed for job %s: %s", job.id, e) |
| 211 | _set(job, status="error", stage="Error: Processing failed", error=error_msg) |
| 212 | persist_registry(jobs_dir) |
| 213 | _rmtree(job_dir) |
| 214 | return |
| 215 | logger.info( |
| 216 | "pipeline cancelled%s for job %s", |
| 217 | " (wrapped)" if not isinstance(e, JobCancelled) else "", |
| 218 | job.id, |
| 219 | ) |
| 220 | _set(job, status="cancelled", stage="Cancelled") |
| 221 | persist_registry(jobs_dir) |
| 222 | _rmtree(job_dir) |
| 223 | return |
| 224 | _set(job, status="done", progress=1.0, stage="Done") |
| 225 | _write_metadata(job, job_dir) |
| 226 | persist_registry(jobs_dir) |
| 227 | |
| 228 | |
| 229 | async def run_pipeline(job: Job, url: str, jobs_dir: Path) -> None: |
no test coverage detected