(knowledge_id, user_id, source_url_list: List[str], selector: str)
| 64 | |
| 65 | @celery_app.task(name="celery:sync_web_document") |
| 66 | def sync_web_document(knowledge_id, user_id, source_url_list: List[str], selector: str): |
| 67 | from knowledge.task.handler import get_sync_web_document_handler |
| 68 | |
| 69 | handler = get_sync_web_document_handler(knowledge_id, user_id) |
| 70 | for source_url in source_url_list: |
| 71 | try: |
| 72 | result = Fork(base_fork_url=source_url, selector_list=selector.split(" ")).fork() |
| 73 | handler(source_url, selector, result) |
| 74 | except Exception as e: |
| 75 | pass |
nothing calls this directly
no test coverage detected