MCPcopy
hub / github.com/reflex-dev/reflex / test_upload_file

Function test_upload_file

tests/units/test_app.py:993–1056  ·  view source on GitHub ↗

Test that file upload works correctly. Args: tmp_path: Temporary path. state: The state class. delta: Expected delta after processing all files. token: a Token. mocker: pytest mocker object. attached_mock_base_state_event_processor: BaseStateEvent

(
    tmp_path: Path,
    state,
    delta,
    token: str,
    mocker: MockerFixture,
    attached_mock_base_state_event_processor: BaseStateEventProcessor,
    mock_root_event_context: EventContext,
    clean_registration_context: RegistrationContext,
)

Source from the content-addressed store, hash-verified

991 ],
992)
993async def test_upload_file(
994 tmp_path: Path,
995 state,
996 delta,
997 token: str,
998 mocker: MockerFixture,
999 attached_mock_base_state_event_processor: BaseStateEventProcessor,
1000 mock_root_event_context: EventContext,
1001 clean_registration_context: RegistrationContext,
1002):
1003 """Test that file upload works correctly.
1004
1005 Args:
1006 tmp_path: Temporary path.
1007 state: The state class.
1008 delta: Expected delta after processing all files.
1009 token: a Token.
1010 mocker: pytest mocker object.
1011 attached_mock_base_state_event_processor: BaseStateEventProcessor Fixture attached to the app instance to capture emitted events.
1012 mock_root_event_context: The mocked root event context, for accessing state_manager.
1013 clean_registration_context: Fixture to ensure clean registration context for each test, preventing cross-test contamination of state subclasses.
1014 """
1015 clean_registration_context.register_base_state(state)
1016 app = Mock(
1017 event_processor=attached_mock_base_state_event_processor,
1018 )
1019 async with mock_root_event_context.state_manager.modify_state(
1020 BaseStateToken(ident=token, cls=state)
1021 ) as root_state:
1022 (await root_state.get_state(state))._tmp_path = tmp_path
1023 data = b"This is binary data"
1024
1025 request_mock = unittest.mock.Mock()
1026 request_mock.headers = {
1027 "reflex-client-token": token,
1028 "reflex-event-handler": f"{state.get_full_name()}.multi_handle_upload",
1029 }
1030
1031 file1 = UploadFile(
1032 filename="image1.jpg",
1033 file=io.BytesIO(data),
1034 )
1035 file2 = UploadFile(
1036 filename="image2.jpg",
1037 file=io.BytesIO(data),
1038 )
1039
1040 async def form(): # noqa: RUF029
1041 return FormData([("files", file1), ("files", file2)])
1042
1043 request_mock.form = form
1044
1045 upload_fn = upload(app)
1046 streaming_response = await upload_fn(request_mock)
1047 assert isinstance(streaming_response, StreamingResponse)
1048 # Handler yields after each file, producing intermediate + final updates.
1049 updates = []
1050 async for state_update in streaming_response.body_iterator:

Callers

nothing calls this directly

Calls 7

BaseStateTokenClass · 0.90
uploadFunction · 0.90
register_base_stateMethod · 0.80
get_full_nameMethod · 0.80
UploadFileFunction · 0.50
modify_stateMethod · 0.45
get_stateMethod · 0.45

Tested by

no test coverage detected