MCPcopy Index your code
hub / github.com/PaddlePaddle/FastDeploy / put

Method put

fastdeploy/inter_communicator/fmq.py:219–247  ·  view source on GitHub ↗

Send data to the queue. Args: data: The data to send. Can be any serializable object or bytes. shm_threshold: Size threshold in bytes. If the data is of type bytes and its size is greater than or equal to this threshold, shared memory will be

(self, data: Any, shm_threshold: int = 1024 * 1024)

Source from the content-addressed store, hash-verified

217 fmq_logger.info(f"Queue {name}({role}) initialized on {full_ep}")
218
219 async def put(self, data: Any, shm_threshold: int = 1024 * 1024):
220 """
221 Send data to the queue.
222
223 Args:
224 data: The data to send. Can be any serializable object or bytes.
225 shm_threshold: Size threshold in bytes. If the data is of type bytes and its size is
226 greater than or equal to this threshold, shared memory will be used to send the message.
227 Default is 1MB (1024 * 1024 bytes).
228
229 Raises:
230 PermissionError: If called by a non-producer role.
231 """
232 if self.role != Role.PRODUCER:
233 raise PermissionError("Only producers can send messages.")
234
235 desc = None
236 payload = data
237
238 if isinstance(data, bytes) and len(data) >= shm_threshold:
239 desc = Descriptor.create(data)
240 payload = None
241
242 msg = Message(msg_id=self._msg_id, payload=payload, descriptor=desc)
243 raw = msg.serialize()
244
245 async with self.lock:
246 await self.socket.send(raw, copy=self.copy)
247 self._msg_id += 1
248
249 async def get(self, timeout: int = None) -> Optional[Message]:
250 # Receive data from queue

Callers 12

pre_compile_from_configFunction · 0.95
run_load_testFunction · 0.95
consume_signalsMethod · 0.45
control_taskMethod · 0.45
warmupMethod · 0.45
upload_dataFunction · 0.45
_listen_connectionMethod · 0.45
run_control_methodMethod · 0.45
producer_taskFunction · 0.45
consumer_taskFunction · 0.45

Calls 4

serializeMethod · 0.95
MessageClass · 0.85
createMethod · 0.45
sendMethod · 0.45

Tested by 1