MCPcopy
hub / github.com/reflex-dev/reflex / test_upcast_event_handler_arg

Function test_upcast_event_handler_arg

tests/units/test_model.py:236–260  ·  view source on GitHub ↗

Test that upcast event handler args work correctly. Args: handler: The handler to test. payload: The payload to test. mock_base_state_event_processor: Fixture for processing events with a BaseState. emitted_deltas: List to store emitted deltas.

(
    handler, payload, mock_base_state_event_processor, emitted_deltas
)

Source from the content-addressed store, hash-verified

234 ],
235)
236async def test_upcast_event_handler_arg(
237 handler, payload, mock_base_state_event_processor, emitted_deltas
238):
239 """Test that upcast event handler args work correctly.
240
241 Args:
242 handler: The handler to test.
243 payload: The payload to test.
244 mock_base_state_event_processor: Fixture for processing events with a BaseState.
245 emitted_deltas: List to store emitted deltas.
246 """
247 async with mock_base_state_event_processor as processor:
248 await processor.enqueue(
249 "test_token", Event.from_event_type(handler(**payload))[0]
250 )
251 assert emitted_deltas == [
252 (
253 "test_token",
254 {
255 UpcastStateWithSqlAlchemy.get_full_name(): {
256 "passed" + FIELD_MARKER: True
257 }
258 },
259 ),
260 ]
261
262
263def test_no_rebind_mutable_proxy_for_instrumented_functions():

Callers

nothing calls this directly

Calls 3

from_event_typeMethod · 0.80
get_full_nameMethod · 0.80
enqueueMethod · 0.45

Tested by

no test coverage detected