| 38 | |
| 39 | |
| 40 | class HttpServerManager: |
| 41 | def __init__( |
| 42 | self, |
| 43 | args, |
| 44 | router_port, |
| 45 | cache_port, |
| 46 | detokenization_pub_port, |
| 47 | visual_port, |
| 48 | metric_port, |
| 49 | enable_multimodal, |
| 50 | ): |
| 51 | self.args = args |
| 52 | context = zmq.asyncio.Context(2) |
| 53 | self.send_to_router = context.socket(zmq.PUSH) |
| 54 | self.send_to_router.connect(f"{args.zmq_mode}127.0.0.1:{router_port}") |
| 55 | |
| 56 | self.multinode_req_manager = None |
| 57 | self.nnodes = args.nnodes |
| 58 | self._shm_lock_pool = AtomicShmArrayLock(f"{get_unique_server_name()}_lightllm_resource_lock", 1) |
| 59 | self._resource_lock = AsyncLock(self._shm_lock_pool.get_lock_context(0)) |
| 60 | self.node_rank = args.node_rank |
| 61 | self.disable_abort = args.nnodes > 1 and args.dp == 1 # mulitnode dp=1 mode, disable abort |
| 62 | self.is_multinode_tp = args.dp == 1 and args.nnodes > 1 |
| 63 | self.is_multinode_tp_master = args.dp == 1 and args.nnodes > 1 and args.node_rank == 0 |
| 64 | self.is_multinode_tp_slave = args.dp == 1 and args.nnodes > 1 and args.node_rank > 0 |
| 65 | if self.is_multinode_tp: |
| 66 | if args.node_rank == 0: |
| 67 | self.multinode_req_manager = [] |
| 68 | for child_ip in args.child_ips: |
| 69 | context = zmq.asyncio.Context(2) |
| 70 | self.multinode_req_manager.append(context.socket(zmq.PUSH)) |
| 71 | self.multinode_req_manager[-1].connect(f"tcp://{child_ip}:{args.multinode_httpmanager_port}") |
| 72 | logger.info( |
| 73 | f"HttpServerManager connected to child node at {child_ip}:{args.multinode_httpmanager_port}" |
| 74 | ) |
| 75 | else: |
| 76 | context = zmq.asyncio.Context(2) |
| 77 | self.multinode_req_manager = context.socket(zmq.PULL) |
| 78 | self.multinode_req_manager.bind(f"tcp://*:{args.multinode_httpmanager_port}") |
| 79 | logger.info( |
| 80 | f"HttpServerManager listening for child node requests on *:{args.multinode_httpmanager_port}" |
| 81 | ) |
| 82 | |
| 83 | self.enable_multimodal = enable_multimodal |
| 84 | if self.enable_multimodal: |
| 85 | self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True}) |
| 86 | self.send_to_visual = context.socket(zmq.PUSH) |
| 87 | self.send_to_visual.connect(f"{args.zmq_mode}127.0.0.1:{visual_port}") |
| 88 | |
| 89 | self.shm_req_manager = ShmReqManager() |
| 90 | |
| 91 | self.recv_from_detokenization = context.socket(zmq.SUB) |
| 92 | self.recv_from_detokenization.connect(f"{args.zmq_mode}127.0.0.1:{detokenization_pub_port}") |
| 93 | self.recv_from_detokenization.setsockopt(zmq.SUBSCRIBE, b"") |
| 94 | |
| 95 | self.tokenizer = get_tokenizer(args.model_dir, args.tokenizer_mode, trust_remote_code=args.trust_remote_code) |
| 96 | |
| 97 | self.req_id_to_out_inf: Dict[int, ReqStatus] = {} # value type (out_str, metadata, finished, event) |