(self, path:str)
| 16 | self.isNeedUpdate = False |
| 17 | |
| 18 | def update(self, path:str) -> Tuple[bool,List[Rule]]: |
| 19 | async def _update(): |
| 20 | # 并发执行所有下载任务,并直接拿到结果 |
| 21 | tasks = [self.__Download(rule, path) for rule in self.ruleList] |
| 22 | results = await asyncio.gather(*tasks) |
| 23 | |
| 24 | # 更新规则状态 |
| 25 | for new in results: |
| 26 | for rule in self.ruleList: |
| 27 | if new.name == rule.name: |
| 28 | rule.latest = new.latest |
| 29 | rule.update = new.update |
| 30 | if rule.update: |
| 31 | self.isNeedUpdate = True # 只要有一个需要更新就标记 |
| 32 | break |
| 33 | return self.isNeedUpdate, self.ruleList |
| 34 | |
| 35 | return asyncio.run(_update()) |
| 36 | |
| 37 | def __CalcFileSha256(self, filename): |
| 38 | with open(filename, "rb") as f: |
no outgoing calls
no test coverage detected