MCPcopy Index your code
hub / github.com/mitmproxy/mitmproxy / ASWBXMLByteQueue

Class ASWBXMLByteQueue

mitmproxy/contrib/wbxml/ASWBXMLByteQueue.py:33–102  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

31import logging
32
33class ASWBXMLByteQueue(Queue):
34
35 def __init__(self, wbxmlBytes):
36
37 self.bytesDequeued = 0
38 self.bytesEnqueued = 0
39
40 Queue.__init__(self)
41
42 for byte in wbxmlBytes:
43 self.put(byte)
44 self.bytesEnqueued += 1
45
46
47 logging.debug("Array byte count: %d, enqueued: %d" % (self.qsize(), self.bytesEnqueued))
48
49 """
50 Created to debug the dequeueing of bytes
51 """
52 def dequeueAndLog(self):
53 singleByte = self.get()
54 self.bytesDequeued += 1
55 logging.debug("Dequeued byte 0x{0:X} ({1} total)".format(singleByte, self.bytesDequeued))
56 return singleByte
57
58 """
59 Return true if the continuation bit is set in the byte
60 """
61 def checkContinuationBit(self, byteval):
62 continuationBitmask = 0x80
63 return (continuationBitmask & byteval) != 0
64
65 def dequeueMultibyteInt(self):
66 iReturn = 0
67 singleByte = 0xFF
68
69 while True:
70 iReturn <<= 7
71 if (self.qsize() == 0):
72 break
73 else:
74 singleByte = self.dequeueAndLog()
75 iReturn += int(singleByte & 0x7F)
76 if not self.checkContinuationBit(singleByte):
77 return iReturn
78
79 def dequeueString(self, length=None):
80 if ( length != None):
81 currentByte = 0x00
82 strReturn = ""
83 for i in range(0, length):
84 # TODO: Improve this handling. We are technically UTF-8, meaning
85 # that characters could be more than one byte long. This will fail if we have
86 # characters outside of the US-ASCII range
87 if ( self.qsize() == 0 ):
88 break
89 currentByte = self.dequeueAndLog()
90 strReturn += chr(currentByte)

Callers 1

loadBytesMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…