App for testing dynamic routes.
()
| 22 | |
| 23 | |
| 24 | def UploadFile(): |
| 25 | """App for testing dynamic routes.""" |
| 26 | import shutil |
| 27 | |
| 28 | import reflex as rx |
| 29 | |
| 30 | LARGE_DATA = "DUMMY" * 1024 * 512 |
| 31 | |
| 32 | class UploadState(rx.State): |
| 33 | upload_done: rx.Field[bool] = rx.field(False) |
| 34 | event_order: rx.Field[list[str]] = rx.field([]) |
| 35 | progress_dicts: rx.Field[list[dict]] = rx.field([]) |
| 36 | stream_progress_dicts: rx.Field[list[dict]] = rx.field([]) |
| 37 | disabled: rx.Field[bool] = rx.field(False) |
| 38 | large_data: rx.Field[str] = rx.field("") |
| 39 | quaternary_names: rx.Field[list[str]] = rx.field([]) |
| 40 | quaternary_field: rx.Field[str] = rx.field("") |
| 41 | stream_chunk_records: rx.Field[list[str]] = rx.field([]) |
| 42 | stream_completed_files: rx.Field[list[str]] = rx.field([]) |
| 43 | stream_field: rx.Field[str] = rx.field("") |
| 44 | |
| 45 | @rx.event |
| 46 | async def handle_upload(self, files: list[rx.UploadFile]): |
| 47 | self.upload_done = False |
| 48 | for file in files: |
| 49 | upload_data = await file.read() |
| 50 | if not file.name: |
| 51 | continue |
| 52 | local_file = rx.get_upload_dir() / file.name |
| 53 | local_file.parent.mkdir(parents=True, exist_ok=True) |
| 54 | local_file.write_bytes(upload_data) |
| 55 | self.upload_done = True |
| 56 | |
| 57 | @rx.event |
| 58 | async def handle_upload_secondary(self, files: list[rx.UploadFile]): |
| 59 | self.upload_done = False |
| 60 | for file in files: |
| 61 | upload_data = await file.read() |
| 62 | if not file.name: |
| 63 | continue |
| 64 | local_file = rx.get_upload_dir() / file.name |
| 65 | local_file.parent.mkdir(parents=True, exist_ok=True) |
| 66 | local_file.write_bytes(upload_data) |
| 67 | self.large_data = LARGE_DATA |
| 68 | yield UploadState.chain_event |
| 69 | |
| 70 | @rx.event |
| 71 | def upload_progress(self, progress): |
| 72 | assert progress |
| 73 | print(self.event_order) |
| 74 | self.progress_dicts.append(progress) |
| 75 | |
| 76 | @rx.event |
| 77 | def chain_event(self): |
| 78 | assert self.large_data == LARGE_DATA |
| 79 | self.large_data = "" |
| 80 | self.upload_done = True |
| 81 | self.event_order.append("chain_event") |
no test coverage detected