Spawn a TCP server that listens for connections and start the automaton for each new client. :param port: the port to listen to :param bg: background mode? (default: False) Note that in background mode, you shall close the TCP server as such::
(cls,
port: int,
iface: Optional[_GlobInterfaceType] = None,
local_ip: Optional[str] = None,
bg: bool = False,
**kwargs: Any)
| 968 | |
| 969 | @classmethod |
| 970 | def spawn(cls, |
| 971 | port: int, |
| 972 | iface: Optional[_GlobInterfaceType] = None, |
| 973 | local_ip: Optional[str] = None, |
| 974 | bg: bool = False, |
| 975 | **kwargs: Any) -> Optional[socket.socket]: |
| 976 | """ |
| 977 | Spawn a TCP server that listens for connections and start the automaton |
| 978 | for each new client. |
| 979 | |
| 980 | :param port: the port to listen to |
| 981 | :param bg: background mode? (default: False) |
| 982 | |
| 983 | Note that in background mode, you shall close the TCP server as such:: |
| 984 | |
| 985 | srv = MyAutomaton.spawn(8080, bg=True) |
| 986 | srv.shutdown(socket.SHUT_RDWR) # important |
| 987 | srv.close() |
| 988 | """ |
| 989 | from scapy.arch import get_if_addr |
| 990 | # create server sock and bind it |
| 991 | ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 992 | if local_ip is None: |
| 993 | local_ip = get_if_addr(iface or conf.iface) |
| 994 | try: |
| 995 | ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 996 | except OSError: |
| 997 | pass |
| 998 | ssock.bind((local_ip, port)) |
| 999 | ssock.listen(5) |
| 1000 | clients = [] |
| 1001 | if kwargs.get("verb", True): |
| 1002 | print(conf.color_theme.green( |
| 1003 | "Server %s started listening on %s" % ( |
| 1004 | cls.__name__, |
| 1005 | (local_ip, port), |
| 1006 | ) |
| 1007 | )) |
| 1008 | |
| 1009 | def _run() -> None: |
| 1010 | # Wait for clients forever |
| 1011 | try: |
| 1012 | while True: |
| 1013 | atmt_server = None |
| 1014 | clientsocket, address = ssock.accept() |
| 1015 | if kwargs.get("verb", True): |
| 1016 | print(conf.color_theme.gold( |
| 1017 | "\u2503 Connection received from %s" % repr(address) |
| 1018 | )) |
| 1019 | try: |
| 1020 | # Start atmt class with socket |
| 1021 | if cls.socketcls is not None: |
| 1022 | sock = cls.socketcls(clientsocket, cls.pkt_cls) |
| 1023 | else: |
| 1024 | sock = clientsocket |
| 1025 | atmt_server = cls( |
| 1026 | sock=sock, |
| 1027 | iface=iface, **kwargs |
nothing calls this directly
no test coverage detected