(self, str_send, wait=True, crop=True, binary=False)
| 17 | # -------------------------------------------------------------------- |
| 18 | # send PJL command to printer, optionally receive response |
| 19 | def cmd(self, str_send, wait=True, crop=True, binary=False): |
| 20 | str_recv = "" # response buffer |
| 21 | str_stat = "" # status buffer |
| 22 | token = c.DELIMITER + str(random.randrange(2**16)) # unique delimiter |
| 23 | status = "@PJL INFO STATUS" + c.EOL if self.status and wait else "" |
| 24 | footer = "@PJL ECHO " + token + c.EOL + c.EOL if wait else "" |
| 25 | # send command to printer device |
| 26 | try: |
| 27 | cmd_send = c.UEL + str_send + c.EOL + status + footer + c.UEL |
| 28 | # write to logfile |
| 29 | log().write(self.logfile, str_send + os.linesep) |
| 30 | # sent to printer |
| 31 | self.send(cmd_send) |
| 32 | # for commands that expect a response |
| 33 | if wait: |
| 34 | # use random token as delimiter PJL responses |
| 35 | str_recv = self.recv( |
| 36 | "(@PJL ECHO\s+)?" + token + ".*$", wait, True, binary |
| 37 | ) |
| 38 | if self.status: |
| 39 | # get status messages and remove them from received buffer |
| 40 | str_stat = item( |
| 41 | re.findall("@PJL INFO STATUS.*", str_recv, re.DOTALL) |
| 42 | ) |
| 43 | str_recv = re.compile("\x0c?@PJL INFO STATUS.*", re.DOTALL).sub( |
| 44 | "", str_recv |
| 45 | ) |
| 46 | if crop: |
| 47 | # crop very first PJL line which is echoed by most interpreters |
| 48 | str_recv = re.sub( |
| 49 | r"^\x04?(\x00+)?@PJL.*" + c.EOL, "", str_recv) |
| 50 | return self.pjl_err(str_recv, str_stat) |
| 51 | |
| 52 | # handle CTRL+C and exceptions |
| 53 | except (KeyboardInterrupt, Exception) as e: |
| 54 | if not self.fuzz or not str(e): |
| 55 | self.reconnect(str(e)) |
| 56 | return "" |
| 57 | |
| 58 | # handle error messages from PJL interpreter |
| 59 | def pjl_err(self, str_recv, str_stat): |
no test coverage detected