| 31 | import logging |
| 32 | |
| 33 | class ASWBXMLByteQueue(Queue): |
| 34 | |
| 35 | def __init__(self, wbxmlBytes): |
| 36 | |
| 37 | self.bytesDequeued = 0 |
| 38 | self.bytesEnqueued = 0 |
| 39 | |
| 40 | Queue.__init__(self) |
| 41 | |
| 42 | for byte in wbxmlBytes: |
| 43 | self.put(byte) |
| 44 | self.bytesEnqueued += 1 |
| 45 | |
| 46 | |
| 47 | logging.debug("Array byte count: %d, enqueued: %d" % (self.qsize(), self.bytesEnqueued)) |
| 48 | |
| 49 | """ |
| 50 | Created to debug the dequeueing of bytes |
| 51 | """ |
| 52 | def dequeueAndLog(self): |
| 53 | singleByte = self.get() |
| 54 | self.bytesDequeued += 1 |
| 55 | logging.debug("Dequeued byte 0x{0:X} ({1} total)".format(singleByte, self.bytesDequeued)) |
| 56 | return singleByte |
| 57 | |
| 58 | """ |
| 59 | Return true if the continuation bit is set in the byte |
| 60 | """ |
| 61 | def checkContinuationBit(self, byteval): |
| 62 | continuationBitmask = 0x80 |
| 63 | return (continuationBitmask & byteval) != 0 |
| 64 | |
| 65 | def dequeueMultibyteInt(self): |
| 66 | iReturn = 0 |
| 67 | singleByte = 0xFF |
| 68 | |
| 69 | while True: |
| 70 | iReturn <<= 7 |
| 71 | if (self.qsize() == 0): |
| 72 | break |
| 73 | else: |
| 74 | singleByte = self.dequeueAndLog() |
| 75 | iReturn += int(singleByte & 0x7F) |
| 76 | if not self.checkContinuationBit(singleByte): |
| 77 | return iReturn |
| 78 | |
| 79 | def dequeueString(self, length=None): |
| 80 | if ( length != None): |
| 81 | currentByte = 0x00 |
| 82 | strReturn = "" |
| 83 | for i in range(0, length): |
| 84 | # TODO: Improve this handling. We are technically UTF-8, meaning |
| 85 | # that characters could be more than one byte long. This will fail if we have |
| 86 | # characters outside of the US-ASCII range |
| 87 | if ( self.qsize() == 0 ): |
| 88 | break |
| 89 | currentByte = self.dequeueAndLog() |
| 90 | strReturn += chr(currentByte) |
no outgoing calls
no test coverage detected
searching dependent graphs…