MCPcopy
hub / github.com/stemdeckapp/stemdeck / _run_async

Function _run_async

app/pipeline/runner.py:195–226  ·  view source on GitHub ↗

Common async wrapper: acquires the pipeline lock, runs blocking_fn in a thread, then handles success / cancel / error outcomes uniformly.

(
    job: Job,
    job_dir: Path,
    jobs_dir: Path,
    blocking_fn,
    *fn_args: object,
    error_msg: str = "Audio processing failed. Please try again.",
)

Source from the content-addressed store, hash-verified

193
194
195async def _run_async(
196 job: Job,
197 job_dir: Path,
198 jobs_dir: Path,
199 blocking_fn,
200 *fn_args: object,
201 error_msg: str = "Audio processing failed. Please try again.",
202) -> None:
203 """Common async wrapper: acquires the pipeline lock, runs blocking_fn in a
204 thread, then handles success / cancel / error outcomes uniformly."""
205 try:
206 async with _pipeline_lock:
207 await asyncio.to_thread(blocking_fn, job, *fn_args, job_dir)
208 except Exception as e:
209 if not isinstance(e, JobCancelled) and not job.cancel_requested:
210 logger.exception("pipeline failed for job %s: %s", job.id, e)
211 _set(job, status="error", stage="Error: Processing failed", error=error_msg)
212 persist_registry(jobs_dir)
213 _rmtree(job_dir)
214 return
215 logger.info(
216 "pipeline cancelled%s for job %s",
217 " (wrapped)" if not isinstance(e, JobCancelled) else "",
218 job.id,
219 )
220 _set(job, status="cancelled", stage="Cancelled")
221 persist_registry(jobs_dir)
222 _rmtree(job_dir)
223 return
224 _set(job, status="done", progress=1.0, stage="Done")
225 _write_metadata(job, job_dir)
226 persist_registry(jobs_dir)
227
228
229async def run_pipeline(job: Job, url: str, jobs_dir: Path) -> None:

Callers 2

run_pipelineFunction · 0.85
run_local_pipelineFunction · 0.85

Calls 3

_setFunction · 0.90
_write_metadataFunction · 0.85
_rmtreeFunction · 0.70

Tested by

no test coverage detected