(self, input: dict, request: Request)
| 7 | |
| 8 | class ImportKnowledge(ApiHandler): |
| 9 | async def process(self, input: dict, request: Request) -> dict | Response: |
| 10 | if "files[]" not in request.files: |
| 11 | raise Exception("No files part") |
| 12 | |
| 13 | ctxid = request.form.get("ctxid", "") |
| 14 | if not ctxid: |
| 15 | raise Exception("No context id provided") |
| 16 | |
| 17 | context = self.use_context(ctxid) |
| 18 | |
| 19 | file_list = request.files.getlist("files[]") |
| 20 | KNOWLEDGE_FOLDER = files.get_abs_path(get_custom_knowledge_subdir_abs(context.agent0), "main") |
| 21 | |
| 22 | # Ensure knowledge folder exists (create if missing) |
| 23 | try: |
| 24 | os.makedirs(KNOWLEDGE_FOLDER, exist_ok=True) |
| 25 | except (OSError, PermissionError) as e: |
| 26 | raise Exception(f"Failed to create knowledge folder {KNOWLEDGE_FOLDER}: {e}") |
| 27 | |
| 28 | # Verify the directory is accessible |
| 29 | if not os.access(KNOWLEDGE_FOLDER, os.W_OK): |
| 30 | raise Exception(f"Knowledge folder {KNOWLEDGE_FOLDER} is not writable") |
| 31 | |
| 32 | saved_filenames = [] |
| 33 | |
| 34 | for file in file_list: |
| 35 | if file and file.filename: |
| 36 | filename = safe_filename(file.filename) |
| 37 | if not filename: |
| 38 | continue |
| 39 | file.save(os.path.join(KNOWLEDGE_FOLDER, filename)) |
| 40 | saved_filenames.append(filename) |
| 41 | |
| 42 | #reload memory to re-import knowledge |
| 43 | await Memory.reload(context.agent0) |
| 44 | context.log.set_initial_progress() |
| 45 | |
| 46 | return { |
| 47 | "message": "Knowledge Imported", |
| 48 | "filenames": saved_filenames[:5] |
| 49 | } |
nothing calls this directly
no test coverage detected