MCPcopy Index your code
hub / github.com/secdev/scapy / __init__

Method __init__

scapy/automaton.py:901–958  ·  view source on GitHub ↗
(self, *args, **kargs)

Source from the content-addressed store, hash-verified

899
900 # Internals
901 def __init__(self, *args, **kargs):
902 # type: (Any, Any) -> None
903 external_fd = kargs.pop("external_fd", {})
904 if "sock" in kargs:
905 # We use a bi-directional sock
906 self.sock = kargs["sock"]
907 else:
908 # Separate sockets
909 self.sock = None
910 self.send_sock_class = kargs.pop("ll", conf.L3socket)
911 self.recv_sock_class = kargs.pop("recvsock", conf.L2listen)
912 self.listen_sock = None # type: Optional[SuperSocket]
913 self.send_sock = None # type: Optional[SuperSocket]
914 self.is_atmt_socket = kargs.pop("is_atmt_socket", False)
915 self.atmt_socket = kargs.pop("atmt_socket", None)
916 self.started = threading.Lock()
917 self.threadid = None # type: Optional[int]
918 self.breakpointed = None
919 self.breakpoints = set() # type: Set[_StateWrapper]
920 self.interception_points = set() # type: Set[_StateWrapper]
921 self.intercepted_packet = None # type: Union[None, Packet]
922 self.debug_level = 0
923 self.init_args = args
924 self.init_kargs = kargs
925 self.io = type.__new__(type, "IOnamespace", (), {})
926 self.oi = type.__new__(type, "IOnamespace", (), {})
927 self.cmdin = ObjectPipe[Message]("cmdin")
928 self.cmdout = ObjectPipe[Message]("cmdout")
929 self.ioin = {}
930 self.ioout = {}
931 self.packets = PacketList() # type: PacketList
932 self.atmt_session = kargs.pop("session", None)
933 for n in self.__class__.ionames:
934 extfd = external_fd.get(n)
935 if not isinstance(extfd, tuple):
936 extfd = (extfd, extfd)
937 ioin, ioout = extfd
938 if ioin is None:
939 ioin = ObjectPipe("ioin")
940 else:
941 ioin = self._IO_fdwrapper(ioin, None)
942 if ioout is None:
943 ioout = ObjectPipe("ioout")
944 else:
945 ioout = self._IO_fdwrapper(None, ioout)
946
947 self.ioin[n] = ioin
948 self.ioout[n] = ioout
949 ioin.ioname = n
950 ioout.ioname = n
951 setattr(self.io, n, self._IO_mixer(ioout, ioin))
952 setattr(self.oi, n, self._IO_mixer(ioin, ioout))
953
954 for stname in self.states:
955 setattr(self, stname,
956 _instance_state(getattr(self, stname)))
957
958 self.start()

Callers

nothing calls this directly

Calls 7

startMethod · 0.95
PacketListClass · 0.90
ObjectPipeClass · 0.85
_instance_stateClass · 0.85
popMethod · 0.80
__new__Method · 0.45
getMethod · 0.45

Tested by

no test coverage detected