(self, x=MTU, **kwargs)
| 159 | return super(UnstableSocket, self).send(x) |
| 160 | |
| 161 | def recv(self, x=MTU, **kwargs): |
| 162 | # type: (int, **Any) -> Optional[Packet] |
| 163 | if self.no_error_for_x_tx_pkts == 0: |
| 164 | if random.randint(0, 1000) == 42: |
| 165 | self.no_error_for_x_tx_pkts = 10 |
| 166 | raise OSError("Socket closed") |
| 167 | if random.randint(0, 1000) == 13: |
| 168 | self.no_error_for_x_tx_pkts = 10 |
| 169 | raise Scapy_Exception("Socket closed") |
| 170 | if random.randint(0, 1000) == 7: |
| 171 | self.no_error_for_x_tx_pkts = 10 |
| 172 | raise ValueError("Socket closed") |
| 173 | if random.randint(0, 1000) == 113: |
| 174 | self.no_error_for_x_tx_pkts = 10 |
| 175 | return None |
| 176 | if self.no_error_for_x_tx_pkts > 0: |
| 177 | self.no_error_for_x_tx_pkts -= 1 |
| 178 | return super(UnstableSocket, self).recv(x, **kwargs) |
| 179 | |
| 180 | |
| 181 | def cleanup_testsockets(): |
no test coverage detected