(hostname: str, timeout: int = 5)
| 164 | |
| 165 | |
| 166 | def ping(hostname: str, timeout: int = 5) -> int: |
| 167 | watchdog = select.epoll() |
| 168 | started = time.monotonic() |
| 169 | random_identifier = f'archinstall-{random.randint(1000, 9999)}'.encode() |
| 170 | |
| 171 | # Create a raw socket (requires root, which should be fine on archiso) |
| 172 | icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) |
| 173 | watchdog.register(icmp_socket, select.EPOLLIN | select.EPOLLHUP) |
| 174 | |
| 175 | icmp_packet = build_icmp(random_identifier) |
| 176 | |
| 177 | # Send the ICMP packet |
| 178 | icmp_socket.sendto(icmp_packet, (hostname, 0)) |
| 179 | latency = -1 |
| 180 | |
| 181 | # Gracefully wait for X amount of time |
| 182 | # for a ICMP response or exit with no latency |
| 183 | while latency == -1 and time.monotonic() - started < timeout: |
| 184 | try: |
| 185 | for _fileno, _event in watchdog.poll(0.1): |
| 186 | response, _ = icmp_socket.recvfrom(1024) |
| 187 | icmp_type = struct.unpack('!B', response[20:21])[0] |
| 188 | |
| 189 | # Check if it's an Echo Reply (ICMP type 0) |
| 190 | if icmp_type == 0 and response[-len(random_identifier) :] == random_identifier: |
| 191 | latency = round((time.monotonic() - started) * 1000) |
| 192 | break |
| 193 | except OSError as e: |
| 194 | debug(f'Error: {e}') |
| 195 | break |
| 196 | |
| 197 | icmp_socket.close() |
| 198 | return latency |
no test coverage detected