MCPcopy
hub / github.com/RUB-NDS/PRET / conn

Class conn

helper.py:412–587  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

410
411
412class conn(object):
413 # create debug connection object
414 def __init__(self, mode, debug, quiet):
415 self.mode = mode
416 self.debug = debug
417 self.quiet = quiet
418 self._file = None
419 self._sock = socket()
420
421 # open connection
422 def open(self, target, port=9100):
423 # target is a character device
424 if os.path.exists(target) \
425 and stat.S_ISCHR(os.stat(target).st_mode):
426 self._file = os.open(target, os.O_RDWR)
427 # treat target as ipv4 socket
428 else:
429 m = re.search('^(.+?):([0-9]+)$', target)
430 if m:
431 [target, port] = m.groups()
432 port = int(port)
433
434 self._sock.connect((target, port))
435
436 # close connection
437 def close(self, *arg):
438 # close file descriptor
439 if self._file:
440 os.close(self._file)
441 # close inet socket
442 else:
443 self._sock.close()
444
445 # set timeout
446 def timeout(self, *arg):
447 self._sock.settimeout(*arg)
448
449 # send data
450 def send(self, data):
451 if self.debug:
452 output().send(self.beautify(data), self.debug)
453 # send data to device
454 if self._file:
455 return os.write(self._file, data)
456 # send data to socket
457 elif self._sock:
458 if not isinstance(data, bytes):
459 data = data.encode()
460 try:
461 return self._sock.sendall(data)
462 except:
463 print(traceback.print_exc())
464
465 # receive data
466 def recv(self, bytes):
467 # receive data from device
468 if self._file:
469 data = os.read(self._file, bytes).decode()

Callers 1

do_openMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected