MCPcopy Index your code
hub / github.com/InternLM/lmdeploy / SharedBuffer

Class SharedBuffer

lmdeploy/pytorch/engine/executor/mp_executor.py:95–204  ·  view source on GitHub ↗

Shared buffer.

Source from the content-addressed store, hash-verified

93
94
95class SharedBuffer:
96 """Shared buffer."""
97
98 def __init__(self, proc_id: int, notifier: Notifier, name: str = None):
99 self.proc_id = proc_id
100 self.notifier = notifier
101 self.is_create = name is None
102 if self.is_create:
103 # double buffer
104 self.shm = shared_memory.SharedMemory(create=True, size=SHARED_BLOCK_REAL_SIZE * NUM_SHARED_BLOCK)
105 else:
106 self.shm = shared_memory.SharedMemory(name=name)
107 self._buf_id = 0
108
109 if proc_id >= 0:
110 self.proc_mask = 1 << proc_id
111 else:
112 self.proc_mask = 0
113
114 self.is_closed = False
115
116 @contextmanager
117 def acquire_buf(self):
118 buf = self.shm.buf
119 assert buf is not None
120 buf_start = self._buf_id * SHARED_BLOCK_REAL_SIZE
121 out_buf = buf[buf_start:buf_start + SHARED_BLOCK_REAL_SIZE]
122 yield out_buf
123 self._buf_id = (self._buf_id + 1) % NUM_SHARED_BLOCK
124
125 def name(self):
126 return self.shm.name
127
128 def pack_data(self, data, receiver_mask):
129 """Pack data."""
130 dumped_data = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
131 data_size = len(dumped_data)
132
133 num_packs = get_num_packages(data_size)
134 head = struct.pack('II', data_size, receiver_mask)
135
136 for _ in range(num_packs):
137 with self.acquire_buf() as buf:
138 pac_size = min(len(dumped_data), SHARED_BLOCK_SIZE)
139 packed_data = head + dumped_data[:pac_size]
140 buf[:HEAD_SIZE + pac_size] = packed_data
141 dumped_data = dumped_data[pac_size:]
142 yield buf
143
144 def send(self, data, receiver_mask: int = 0xff):
145 """Pack data."""
146 for _ in self.pack_data(data, receiver_mask):
147 self.notifier.set()
148
149 async def send_async(self, data, receiver_mask: int = 0xff):
150 """Async pack data."""
151 for _ in self.pack_data(data, receiver_mask):
152 await self.notifier.set_async()

Callers 2

__init__Method · 0.85
_main_loopMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected