MCPcopy Index your code
hub / github.com/ModelTC/LightLLM / HttpServerManager

Class HttpServerManager

lightllm/server/httpserver/manager.py:40–736  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

38
39
40class HttpServerManager:
41 def __init__(
42 self,
43 args,
44 router_port,
45 cache_port,
46 detokenization_pub_port,
47 visual_port,
48 metric_port,
49 enable_multimodal,
50 ):
51 self.args = args
52 context = zmq.asyncio.Context(2)
53 self.send_to_router = context.socket(zmq.PUSH)
54 self.send_to_router.connect(f"{args.zmq_mode}127.0.0.1:{router_port}")
55
56 self.multinode_req_manager = None
57 self.nnodes = args.nnodes
58 self._shm_lock_pool = AtomicShmArrayLock(f"{get_unique_server_name()}_lightllm_resource_lock", 1)
59 self._resource_lock = AsyncLock(self._shm_lock_pool.get_lock_context(0))
60 self.node_rank = args.node_rank
61 self.disable_abort = args.nnodes > 1 and args.dp == 1 # mulitnode dp=1 mode, disable abort
62 self.is_multinode_tp = args.dp == 1 and args.nnodes > 1
63 self.is_multinode_tp_master = args.dp == 1 and args.nnodes > 1 and args.node_rank == 0
64 self.is_multinode_tp_slave = args.dp == 1 and args.nnodes > 1 and args.node_rank > 0
65 if self.is_multinode_tp:
66 if args.node_rank == 0:
67 self.multinode_req_manager = []
68 for child_ip in args.child_ips:
69 context = zmq.asyncio.Context(2)
70 self.multinode_req_manager.append(context.socket(zmq.PUSH))
71 self.multinode_req_manager[-1].connect(f"tcp://{child_ip}:{args.multinode_httpmanager_port}")
72 logger.info(
73 f"HttpServerManager connected to child node at {child_ip}:{args.multinode_httpmanager_port}"
74 )
75 else:
76 context = zmq.asyncio.Context(2)
77 self.multinode_req_manager = context.socket(zmq.PULL)
78 self.multinode_req_manager.bind(f"tcp://*:{args.multinode_httpmanager_port}")
79 logger.info(
80 f"HttpServerManager listening for child node requests on *:{args.multinode_httpmanager_port}"
81 )
82
83 self.enable_multimodal = enable_multimodal
84 if self.enable_multimodal:
85 self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True})
86 self.send_to_visual = context.socket(zmq.PUSH)
87 self.send_to_visual.connect(f"{args.zmq_mode}127.0.0.1:{visual_port}")
88
89 self.shm_req_manager = ShmReqManager()
90
91 self.recv_from_detokenization = context.socket(zmq.SUB)
92 self.recv_from_detokenization.connect(f"{args.zmq_mode}127.0.0.1:{detokenization_pub_port}")
93 self.recv_from_detokenization.setsockopt(zmq.SUBSCRIBE, b"")
94
95 self.tokenizer = get_tokenizer(args.model_dir, args.tokenizer_mode, trust_remote_code=args.trust_remote_code)
96
97 self.req_id_to_out_inf: Dict[int, ReqStatus] = {} # value type (out_str, metadata, finished, event)

Callers 1

set_argsMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected