Send data to the queue. Args: data: The data to send. Can be any serializable object or bytes. shm_threshold: Size threshold in bytes. If the data is of type bytes and its size is greater than or equal to this threshold, shared memory will be
(self, data: Any, shm_threshold: int = 1024 * 1024)
| 217 | fmq_logger.info(f"Queue {name}({role}) initialized on {full_ep}") |
| 218 | |
| 219 | async def put(self, data: Any, shm_threshold: int = 1024 * 1024): |
| 220 | """ |
| 221 | Send data to the queue. |
| 222 | |
| 223 | Args: |
| 224 | data: The data to send. Can be any serializable object or bytes. |
| 225 | shm_threshold: Size threshold in bytes. If the data is of type bytes and its size is |
| 226 | greater than or equal to this threshold, shared memory will be used to send the message. |
| 227 | Default is 1MB (1024 * 1024 bytes). |
| 228 | |
| 229 | Raises: |
| 230 | PermissionError: If called by a non-producer role. |
| 231 | """ |
| 232 | if self.role != Role.PRODUCER: |
| 233 | raise PermissionError("Only producers can send messages.") |
| 234 | |
| 235 | desc = None |
| 236 | payload = data |
| 237 | |
| 238 | if isinstance(data, bytes) and len(data) >= shm_threshold: |
| 239 | desc = Descriptor.create(data) |
| 240 | payload = None |
| 241 | |
| 242 | msg = Message(msg_id=self._msg_id, payload=payload, descriptor=desc) |
| 243 | raw = msg.serialize() |
| 244 | |
| 245 | async with self.lock: |
| 246 | await self.socket.send(raw, copy=self.copy) |
| 247 | self._msg_id += 1 |
| 248 | |
| 249 | async def get(self, timeout: int = None) -> Optional[Message]: |
| 250 | # Receive data from queue |