Shared buffer.
| 93 | |
| 94 | |
| 95 | class SharedBuffer: |
| 96 | """Shared buffer.""" |
| 97 | |
| 98 | def __init__(self, proc_id: int, notifier: Notifier, name: str = None): |
| 99 | self.proc_id = proc_id |
| 100 | self.notifier = notifier |
| 101 | self.is_create = name is None |
| 102 | if self.is_create: |
| 103 | # double buffer |
| 104 | self.shm = shared_memory.SharedMemory(create=True, size=SHARED_BLOCK_REAL_SIZE * NUM_SHARED_BLOCK) |
| 105 | else: |
| 106 | self.shm = shared_memory.SharedMemory(name=name) |
| 107 | self._buf_id = 0 |
| 108 | |
| 109 | if proc_id >= 0: |
| 110 | self.proc_mask = 1 << proc_id |
| 111 | else: |
| 112 | self.proc_mask = 0 |
| 113 | |
| 114 | self.is_closed = False |
| 115 | |
| 116 | @contextmanager |
| 117 | def acquire_buf(self): |
| 118 | buf = self.shm.buf |
| 119 | assert buf is not None |
| 120 | buf_start = self._buf_id * SHARED_BLOCK_REAL_SIZE |
| 121 | out_buf = buf[buf_start:buf_start + SHARED_BLOCK_REAL_SIZE] |
| 122 | yield out_buf |
| 123 | self._buf_id = (self._buf_id + 1) % NUM_SHARED_BLOCK |
| 124 | |
| 125 | def name(self): |
| 126 | return self.shm.name |
| 127 | |
| 128 | def pack_data(self, data, receiver_mask): |
| 129 | """Pack data.""" |
| 130 | dumped_data = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) |
| 131 | data_size = len(dumped_data) |
| 132 | |
| 133 | num_packs = get_num_packages(data_size) |
| 134 | head = struct.pack('II', data_size, receiver_mask) |
| 135 | |
| 136 | for _ in range(num_packs): |
| 137 | with self.acquire_buf() as buf: |
| 138 | pac_size = min(len(dumped_data), SHARED_BLOCK_SIZE) |
| 139 | packed_data = head + dumped_data[:pac_size] |
| 140 | buf[:HEAD_SIZE + pac_size] = packed_data |
| 141 | dumped_data = dumped_data[pac_size:] |
| 142 | yield buf |
| 143 | |
| 144 | def send(self, data, receiver_mask: int = 0xff): |
| 145 | """Pack data.""" |
| 146 | for _ in self.pack_data(data, receiver_mask): |
| 147 | self.notifier.set() |
| 148 | |
| 149 | async def send_async(self, data, receiver_mask: int = 0xff): |
| 150 | """Async pack data.""" |
| 151 | for _ in self.pack_data(data, receiver_mask): |
| 152 | await self.notifier.set_async() |
no outgoing calls
no test coverage detected