(cls, f, without_time=False)
| 38 | |
| 39 | @classmethod |
| 40 | def stream_deserialize(cls, f, without_time=False): |
| 41 | c = cls() |
| 42 | if c.protover >= CADDR_TIME_VERSION and not without_time: |
| 43 | c.nTime = struct.unpack(b"<I", ser_read(f, 4))[0] |
| 44 | c.nServices = struct.unpack(b"<Q", ser_read(f, 8))[0] |
| 45 | |
| 46 | packedIP = ser_read(f, 16) |
| 47 | |
| 48 | if bytes(packedIP[0:12]) == IPV4_COMPAT: # IPv4 |
| 49 | c.ip = socket.inet_ntop(socket.AF_INET, packedIP[12:16]) |
| 50 | else: #IPv6 |
| 51 | c.ip = socket.inet_ntop(socket.AF_INET6, packedIP) |
| 52 | |
| 53 | c.port = struct.unpack(b">H", ser_read(f, 2))[0] |
| 54 | return c |
| 55 | |
| 56 | def stream_serialize(self, f, without_time=False): |
| 57 | if self.protover >= CADDR_TIME_VERSION and not without_time: |
nothing calls this directly
no test coverage detected