(self, items, md5sums, token_nums, datas)
| 115 | return |
| 116 | |
| 117 | async def _alloc_resource(self, items, md5sums, token_nums, datas): |
| 118 | |
| 119 | while True: |
| 120 | records = obtain(self.cache_client.root.alloc(md5sums, token_nums)) |
| 121 | |
| 122 | if records is None: |
| 123 | await asyncio.sleep(0.1) |
| 124 | continue |
| 125 | |
| 126 | uid_list = [] |
| 127 | for item, rec in zip(items, records): |
| 128 | item.uuid = rec["id"] |
| 129 | item.token_id = rec["token_id"] |
| 130 | item.token_num = rec["token_num"] |
| 131 | uid_list.append(rec["id"]) |
| 132 | |
| 133 | ready_flags = obtain(self.cache_client.root.get_items_data(uid_list)) |
| 134 | update_data_ids = [] |
| 135 | |
| 136 | for uid, ready, data in zip(uid_list, ready_flags, datas): |
| 137 | if not ready: |
| 138 | create_shm(get_shm_name_data(uid), data) |
| 139 | update_data_ids.append(uid) |
| 140 | |
| 141 | if update_data_ids: |
| 142 | self.cache_client.root.set_items_data(update_data_ids) |
| 143 | return |
| 144 | |
| 145 | async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, sampling_params: SamplingParams): |
| 146 | # 只有 P 和 NORMAL 节点需要真的管理多模态资源 |
no test coverage detected