(self, addr: object)
| 277 | ################################################################ |
| 278 | |
| 279 | async def bind(self, addr: object) -> None: |
| 280 | self._check_closed() |
| 281 | if self._binding is not None: |
| 282 | _fake_err(errno.EINVAL) |
| 283 | await trio.lowlevel.checkpoint() |
| 284 | ip_str, port, *_ = await self._resolve_address_nocp(addr, local=True) |
| 285 | assert _ == [], "TODO: handle other values?" |
| 286 | |
| 287 | ip = ipaddress.ip_address(ip_str) |
| 288 | assert _family_for(ip) == self.family |
| 289 | # We convert binds to INET_ANY into binds to localhost |
| 290 | if ip == ipaddress.ip_address("0.0.0.0"): |
| 291 | ip = ipaddress.ip_address("127.0.0.1") |
| 292 | elif ip == ipaddress.ip_address("::"): |
| 293 | ip = ipaddress.ip_address("::1") |
| 294 | if port == 0: |
| 295 | port = next(self._fake_net._auto_port_iter) |
| 296 | binding = UDPBinding(local=UDPEndpoint(ip, port)) |
| 297 | self._fake_net._bind(binding, self) |
| 298 | self._binding = binding |
| 299 | |
| 300 | async def connect(self, peer: object) -> NoReturn: |
| 301 | raise NotImplementedError("FakeNet does not (yet) support connected sockets") |
no test coverage detected