Execute a callback via the framework. Must be called from inside an active request handler; the backend's request adapter reads cookies/headers/args from the current request.
(
callback: CallbackAdapter, kwargs: dict[str, Any]
)
| 15 | |
| 16 | |
| 17 | def run_callback( |
| 18 | callback: CallbackAdapter, kwargs: dict[str, Any] |
| 19 | ) -> CallbackExecutionResponse: |
| 20 | """Execute a callback via the framework. |
| 21 | |
| 22 | Must be called from inside an active request handler; the backend's |
| 23 | request adapter reads cookies/headers/args from the current request. |
| 24 | """ |
| 25 | body = callback.as_callback_body(kwargs) |
| 26 | app = get_app() |
| 27 | |
| 28 | try: |
| 29 | # pylint: disable=protected-access |
| 30 | cb_ctx = app._initialize_context(body) |
| 31 | func = app._prepare_callback(cb_ctx, body) |
| 32 | args = app._inputs_to_vals(cb_ctx.inputs_list + cb_ctx.states_list) |
| 33 | ctx = copy_context() |
| 34 | partial_func = app._execute_callback(func, args, cb_ctx.outputs_list, cb_ctx) |
| 35 | response_text = ctx.run(partial_func) |
| 36 | except Exception as err: |
| 37 | raise CallbackExecutionError( |
| 38 | f"Callback {callback.output_id} failed: {err}" |
| 39 | ) from err |
| 40 | |
| 41 | return json.loads(response_text) |
no test coverage detected
searching dependent graphs…