| 30 | |
| 31 | |
| 32 | class WriteBuf: |
| 33 | def __init__(self, data: Optional[bytes] = None) -> None: |
| 34 | super(WriteBuf, self).__init__() |
| 35 | self._wbuf = io.BytesIO(data) if data is not None else io.BytesIO() |
| 36 | |
| 37 | def write(self, data: bytes) -> 'WriteBuf': |
| 38 | self._wbuf.write(data) |
| 39 | return self |
| 40 | |
| 41 | def write_byte(self, v: int) -> 'WriteBuf': |
| 42 | return self.write(struct.pack('B', v)) |
| 43 | |
| 44 | def write_bool(self, v: bool) -> 'WriteBuf': |
| 45 | return self.write_byte(1 if v else 0) |
| 46 | |
| 47 | def write_int(self, v: int) -> 'WriteBuf': |
| 48 | return self.write(struct.pack('>I', v)) |
| 49 | |
| 50 | def write_string(self, v: Union[bytes, str]) -> 'WriteBuf': |
| 51 | if not isinstance(v, bytes): |
| 52 | v = bytes(bytearray(v, 'utf-8')) |
| 53 | self.write_int(len(v)) |
| 54 | return self.write(v) |
| 55 | |
| 56 | def write_list(self, v: List[str]) -> 'WriteBuf': |
| 57 | return self.write_string(','.join(v)) |
| 58 | |
| 59 | @classmethod |
| 60 | def _bitlength(cls, n: int) -> int: |
| 61 | try: |
| 62 | return n.bit_length() |
| 63 | except AttributeError: |
| 64 | return len(bin(n)) - (2 if n > 0 else 3) |
| 65 | |
| 66 | @classmethod |
| 67 | def _create_mpint(cls, n: int, signed: bool = True, bits: Optional[int] = None) -> bytes: |
| 68 | if bits is None: |
| 69 | bits = cls._bitlength(n) |
| 70 | length = bits // 8 + (1 if n != 0 else 0) |
| 71 | ql = (length + 7) // 8 |
| 72 | fmt, v2 = '>{}Q'.format(ql), [0] * ql |
| 73 | for i in range(ql): |
| 74 | v2[ql - i - 1] = n & 0xffffffffffffffff |
| 75 | n >>= 64 |
| 76 | data = bytes(struct.pack(fmt, *v2)[-length:]) |
| 77 | if not signed: |
| 78 | data = data.lstrip(b'\x00') |
| 79 | elif data.startswith(b'\xff\x80'): |
| 80 | data = data[1:] |
| 81 | return data |
| 82 | |
| 83 | def write_mpint1(self, n: int) -> 'WriteBuf': |
| 84 | # NOTE: Data Type Enc @ http://www.snailbook.com/docs/protocol-1.5.txt |
| 85 | bits = self._bitlength(n) |
| 86 | data = self._create_mpint(n, False, bits) |
| 87 | self.write(struct.pack('>H', bits)) |
| 88 | return self.write(data) |
| 89 |
no outgoing calls
no test coverage detected