get_pd_master_objs 主要负责从 pd master 获取所有的pd master对象。
(args)
| 127 | |
| 128 | |
| 129 | async def _get_pd_master_objs(args) -> Optional[Dict[int, PD_Master_Obj]]: |
| 130 | """ |
| 131 | get_pd_master_objs 主要负责从 pd master 获取所有的pd master对象。 |
| 132 | """ |
| 133 | use_config_server = args.config_server_host and args.config_server_port |
| 134 | |
| 135 | # 如果不使用config_server服务来发现所有的 pd_master, 则需要使用启动参数中的 |
| 136 | # --pd_master_ip 和--pd_master_port 设置的唯一pd_master来进行连接, 其默认 |
| 137 | # node_id 为 0 |
| 138 | if not use_config_server: |
| 139 | ans = dict() |
| 140 | ans[0] = PD_Master_Obj(node_id=0, host_ip_port=f"{args.pd_master_ip}:{args.pd_master_port}") |
| 141 | return ans |
| 142 | |
| 143 | # 使用 config_server 服务来发现所有的 pd_master 节点。 |
| 144 | uri = f"ws://{args.config_server_host}:{args.config_server_port}/registered_objects" |
| 145 | |
| 146 | try: |
| 147 | async with httpx.AsyncClient() as client: |
| 148 | response = await client.get(uri) |
| 149 | if response.status_code == 200: |
| 150 | base64data = response.json()["data"] |
| 151 | id_to_pd_master_obj = pickle.loads(base64.b64decode(base64data)) |
| 152 | return id_to_pd_master_obj |
| 153 | else: |
| 154 | logger.error(f"get pd_master_objs error {response.status_code}") |
| 155 | return None |
| 156 | except Exception as e: |
| 157 | logger.exception(str(e)) |
| 158 | await asyncio.sleep(10) |
| 159 | return None |
| 160 | |
| 161 | |
| 162 | # 触发推理的task |
no test coverage detected