Test that file upload works correctly. Args: tmp_path: Temporary path. state: The state class. delta: Expected delta after processing all files. token: a Token. mocker: pytest mocker object. attached_mock_base_state_event_processor: BaseStateEvent
(
tmp_path: Path,
state,
delta,
token: str,
mocker: MockerFixture,
attached_mock_base_state_event_processor: BaseStateEventProcessor,
mock_root_event_context: EventContext,
clean_registration_context: RegistrationContext,
)
| 991 | ], |
| 992 | ) |
| 993 | async def test_upload_file( |
| 994 | tmp_path: Path, |
| 995 | state, |
| 996 | delta, |
| 997 | token: str, |
| 998 | mocker: MockerFixture, |
| 999 | attached_mock_base_state_event_processor: BaseStateEventProcessor, |
| 1000 | mock_root_event_context: EventContext, |
| 1001 | clean_registration_context: RegistrationContext, |
| 1002 | ): |
| 1003 | """Test that file upload works correctly. |
| 1004 | |
| 1005 | Args: |
| 1006 | tmp_path: Temporary path. |
| 1007 | state: The state class. |
| 1008 | delta: Expected delta after processing all files. |
| 1009 | token: a Token. |
| 1010 | mocker: pytest mocker object. |
| 1011 | attached_mock_base_state_event_processor: BaseStateEventProcessor Fixture attached to the app instance to capture emitted events. |
| 1012 | mock_root_event_context: The mocked root event context, for accessing state_manager. |
| 1013 | clean_registration_context: Fixture to ensure clean registration context for each test, preventing cross-test contamination of state subclasses. |
| 1014 | """ |
| 1015 | clean_registration_context.register_base_state(state) |
| 1016 | app = Mock( |
| 1017 | event_processor=attached_mock_base_state_event_processor, |
| 1018 | ) |
| 1019 | async with mock_root_event_context.state_manager.modify_state( |
| 1020 | BaseStateToken(ident=token, cls=state) |
| 1021 | ) as root_state: |
| 1022 | (await root_state.get_state(state))._tmp_path = tmp_path |
| 1023 | data = b"This is binary data" |
| 1024 | |
| 1025 | request_mock = unittest.mock.Mock() |
| 1026 | request_mock.headers = { |
| 1027 | "reflex-client-token": token, |
| 1028 | "reflex-event-handler": f"{state.get_full_name()}.multi_handle_upload", |
| 1029 | } |
| 1030 | |
| 1031 | file1 = UploadFile( |
| 1032 | filename="image1.jpg", |
| 1033 | file=io.BytesIO(data), |
| 1034 | ) |
| 1035 | file2 = UploadFile( |
| 1036 | filename="image2.jpg", |
| 1037 | file=io.BytesIO(data), |
| 1038 | ) |
| 1039 | |
| 1040 | async def form(): # noqa: RUF029 |
| 1041 | return FormData([("files", file1), ("files", file2)]) |
| 1042 | |
| 1043 | request_mock.form = form |
| 1044 | |
| 1045 | upload_fn = upload(app) |
| 1046 | streaming_response = await upload_fn(request_mock) |
| 1047 | assert isinstance(streaming_response, StreamingResponse) |
| 1048 | # Handler yields after each file, producing intermediate + final updates. |
| 1049 | updates = [] |
| 1050 | async for state_update in streaming_response.body_iterator: |
nothing calls this directly
no test coverage detected