(
self,
buffers: Iterable[Buffer],
ancbufsize: int = 0,
flags: int = 0,
)
| 345 | sendmsg = _sendmsg |
| 346 | |
| 347 | async def _recvmsg_into( |
| 348 | self, |
| 349 | buffers: Iterable[Buffer], |
| 350 | ancbufsize: int = 0, |
| 351 | flags: int = 0, |
| 352 | ) -> tuple[ |
| 353 | int, |
| 354 | list[tuple[int, int, bytes]], |
| 355 | int, |
| 356 | tuple[str, int] | tuple[str, int, int, int], |
| 357 | ]: |
| 358 | if ancbufsize != 0: |
| 359 | raise NotImplementedError("FakeNet doesn't support ancillary data") |
| 360 | if flags != 0: |
| 361 | raise NotImplementedError("FakeNet doesn't support any recv flags") |
| 362 | if self._binding is None: |
| 363 | # I messed this up a few times when writing tests ... but it also never happens |
| 364 | # in any of the existing tests, so maybe it could be intentional... |
| 365 | raise NotImplementedError( |
| 366 | "The code will most likely hang if you try to receive on a fakesocket " |
| 367 | "without a binding. If that is not the case, or you explicitly want to " |
| 368 | "test that, remove this warning.", |
| 369 | ) |
| 370 | |
| 371 | self._check_closed() |
| 372 | |
| 373 | ancdata: list[tuple[int, int, bytes]] = [] |
| 374 | msg_flags = 0 |
| 375 | |
| 376 | packet = await self._packet_receiver.receive() |
| 377 | address = packet.source.as_python_sockaddr() |
| 378 | written = _scatter(packet.payload, buffers) |
| 379 | if written < len(packet.payload): |
| 380 | msg_flags |= trio.socket.MSG_TRUNC |
| 381 | return written, ancdata, msg_flags, address |
| 382 | |
| 383 | if sys.platform != "win32" or ( |
| 384 | not TYPE_CHECKING and hasattr(socket.socket, "sendmsg") |
no test coverage detected