()
| 415 | """ |
| 416 | |
| 417 | def execute() -> dict: |
| 418 | try: |
| 419 | cb_ctx = create_ws_context(payload, response_adapter, ws_callback) |
| 420 | # pylint: disable=protected-access |
| 421 | func = dash_app._prepare_callback(cb_ctx, payload) |
| 422 | args = dash_app._inputs_to_vals( # pylint: disable=protected-access |
| 423 | cb_ctx.inputs_list + cb_ctx.states_list |
| 424 | ) |
| 425 | |
| 426 | ctx = copy_context() |
| 427 | partial_func = ( |
| 428 | dash_app._execute_callback( # pylint: disable=protected-access |
| 429 | func, args, cb_ctx.outputs_list, cb_ctx |
| 430 | ) |
| 431 | ) |
| 432 | |
| 433 | # Run in new event loop (handles both sync and async callbacks) |
| 434 | def run_callback(): |
| 435 | result = partial_func() |
| 436 | if inspect.iscoroutine(result): |
| 437 | return asyncio.run(result) |
| 438 | return result |
| 439 | |
| 440 | response_data = ctx.run(run_callback) |
| 441 | return {"status": "ok", "data": json.loads(response_data)} |
| 442 | |
| 443 | except PreventUpdate: |
| 444 | return {"status": "prevent_update"} |
| 445 | except WebsocketDisconnected: |
| 446 | return {"status": "prevent_update"} |
| 447 | except Exception as e: # pylint: disable=broad-exception-caught |
| 448 | traceback.print_exc() |
| 449 | return {"status": "error", "message": str(e)} |
| 450 | |
| 451 | return executor.submit(execute) |
nothing calls this directly
no test coverage detected
searching dependent graphs…