扫描所有任务文件夹,同步图片列表 批量扫描 history 目录下的所有任务文件夹, 同步图片列表并更新记录状态。 Returns: Dict[str, Any]: 扫描结果统计 - success: 是否成功 - total_tasks: 扫描的任务总数 - synced: 成功同步的任务数 - failed: 失败的任务数 - orphan
(self)
| 642 | } |
| 643 | |
| 644 | def scan_all_tasks(self) -> Dict[str, Any]: |
| 645 | """ |
| 646 | 扫描所有任务文件夹,同步图片列表 |
| 647 | |
| 648 | 批量扫描 history 目录下的所有任务文件夹, |
| 649 | 同步图片列表并更新记录状态。 |
| 650 | |
| 651 | Returns: |
| 652 | Dict[str, Any]: 扫描结果统计 |
| 653 | - success: 是否成功 |
| 654 | - total_tasks: 扫描的任务总数 |
| 655 | - synced: 成功同步的任务数 |
| 656 | - failed: 失败的任务数 |
| 657 | - orphan_tasks: 孤立任务列表(有图片但无记录) |
| 658 | - results: 详细结果列表 |
| 659 | - error: 错误信息(失败时) |
| 660 | """ |
| 661 | if not os.path.exists(self.history_dir): |
| 662 | return { |
| 663 | "success": False, |
| 664 | "error": "历史记录目录不存在" |
| 665 | } |
| 666 | |
| 667 | try: |
| 668 | synced_count = 0 |
| 669 | failed_count = 0 |
| 670 | orphan_tasks = [] # 没有关联记录的任务 |
| 671 | results = [] |
| 672 | |
| 673 | # 遍历 history 目录 |
| 674 | for item in os.listdir(self.history_dir): |
| 675 | item_path = os.path.join(self.history_dir, item) |
| 676 | |
| 677 | # 只处理目录(任务文件夹) |
| 678 | if not os.path.isdir(item_path): |
| 679 | continue |
| 680 | |
| 681 | # 假设任务文件夹名就是 task_id |
| 682 | task_id = item |
| 683 | |
| 684 | # 扫描并同步 |
| 685 | result = self.scan_and_sync_task_images(task_id) |
| 686 | results.append(result) |
| 687 | |
| 688 | if result.get("success"): |
| 689 | if result.get("no_record"): |
| 690 | orphan_tasks.append(task_id) |
| 691 | else: |
| 692 | synced_count += 1 |
| 693 | else: |
| 694 | failed_count += 1 |
| 695 | |
| 696 | return { |
| 697 | "success": True, |
| 698 | "total_tasks": len(results), |
| 699 | "synced": synced_count, |
| 700 | "failed": failed_count, |
| 701 | "orphan_tasks": orphan_tasks, |
no test coverage detected