(manager: HttpServerManager)
| 26 | |
| 27 | |
| 28 | async def pd_handle_loop(manager: HttpServerManager): |
| 29 | assert manager.args.host not in ["127.0.0.1", "localhost"], "pd mode must specify host ip" |
| 30 | if manager.args.host in ["0.0.0.0"]: |
| 31 | manager.host_ip = get_hostname_ip() |
| 32 | else: |
| 33 | manager.host_ip = manager.args.host |
| 34 | |
| 35 | asyncio.create_task(timer_log(manager)) |
| 36 | |
| 37 | id_to_handle_task: Dict[int, asyncio.Task] = {} |
| 38 | |
| 39 | while True: |
| 40 | try: |
| 41 | id_to_pd_master_obj = await _get_pd_master_objs(manager.args) |
| 42 | logger.info(f"get pd_master_objs {id_to_pd_master_obj}") |
| 43 | |
| 44 | if id_to_pd_master_obj is not None: |
| 45 | for node_id, pd_master_obj in id_to_handle_task.items(): |
| 46 | if node_id not in id_to_pd_master_obj: |
| 47 | id_to_handle_task[node_id].cancel() |
| 48 | id_to_handle_task.pop(node_id, None) |
| 49 | logger.info(f"pd_handle_task {pd_master_obj} cancelled") |
| 50 | |
| 51 | for node_id, pd_master_obj in id_to_pd_master_obj.items(): |
| 52 | if node_id not in id_to_handle_task: |
| 53 | id_to_handle_task[node_id] = asyncio.create_task(_pd_handle_task(manager, pd_master_obj)) |
| 54 | |
| 55 | await asyncio.sleep(30) |
| 56 | |
| 57 | except Exception as e: |
| 58 | logger.exception(str(e)) |
| 59 | await asyncio.sleep(10) |
| 60 | |
| 61 | |
| 62 | async def _pd_handle_task(manager: HttpServerManager, pd_master_obj: PD_Master_Obj): |
no test coverage detected