(self,
count=0, # type: int
store=True, # type: bool
offline=None, # type: Any
quiet=False, # type: bool
prn=None, # type: Optional[Callable[[Packet], Any]]
lfilter=None, # type: Optional[Callable[[Packet], bool]]
L2socket=None, # type: Optional[Type[SuperSocket]]
timeout=None, # type: Optional[int]
opened_socket=None, # type: Optional[SuperSocket]
stop_filter=None, # type: Optional[Callable[[Packet], bool]]
iface=None, # type: Optional[_GlobInterfaceType]
started_callback=None, # type: Optional[Callable[[], Any]]
session=None, # type: Optional[_GlobSessionType]
chainCC=False, # type: bool
**karg # type: Any
)
| 1176 | self.thread.daemon = True |
| 1177 | |
| 1178 | def _run(self, |
| 1179 | count=0, # type: int |
| 1180 | store=True, # type: bool |
| 1181 | offline=None, # type: Any |
| 1182 | quiet=False, # type: bool |
| 1183 | prn=None, # type: Optional[Callable[[Packet], Any]] |
| 1184 | lfilter=None, # type: Optional[Callable[[Packet], bool]] |
| 1185 | L2socket=None, # type: Optional[Type[SuperSocket]] |
| 1186 | timeout=None, # type: Optional[int] |
| 1187 | opened_socket=None, # type: Optional[SuperSocket] |
| 1188 | stop_filter=None, # type: Optional[Callable[[Packet], bool]] |
| 1189 | iface=None, # type: Optional[_GlobInterfaceType] |
| 1190 | started_callback=None, # type: Optional[Callable[[], Any]] |
| 1191 | session=None, # type: Optional[_GlobSessionType] |
| 1192 | chainCC=False, # type: bool |
| 1193 | **karg # type: Any |
| 1194 | ): |
| 1195 | # type: (...) -> None |
| 1196 | self.running = True |
| 1197 | self.count = 0 |
| 1198 | lst = [] |
| 1199 | # Start main thread |
| 1200 | # instantiate session |
| 1201 | if not isinstance(session, DefaultSession): |
| 1202 | session = session or DefaultSession |
| 1203 | session = session() |
| 1204 | # sniff_sockets follows: {socket: label} |
| 1205 | sniff_sockets = {} # type: Dict[SuperSocket, _GlobInterfaceType] |
| 1206 | if opened_socket is not None: |
| 1207 | if isinstance(opened_socket, list): |
| 1208 | sniff_sockets.update( |
| 1209 | (s, "socket%d" % i) |
| 1210 | for i, s in enumerate(opened_socket) |
| 1211 | ) |
| 1212 | elif isinstance(opened_socket, dict): |
| 1213 | sniff_sockets.update( |
| 1214 | (s, label) |
| 1215 | for s, label in opened_socket.items() |
| 1216 | ) |
| 1217 | else: |
| 1218 | sniff_sockets[opened_socket] = "socket0" |
| 1219 | if offline is not None: |
| 1220 | flt = karg.get('filter') |
| 1221 | |
| 1222 | if isinstance(offline, str): |
| 1223 | # Single file |
| 1224 | offline = [offline] |
| 1225 | if isinstance(offline, list) and \ |
| 1226 | all(isinstance(elt, str) for elt in offline): |
| 1227 | # List of files |
| 1228 | sniff_sockets.update((PcapReader( # type: ignore |
| 1229 | fname if flt is None else |
| 1230 | tcpdump(fname, |
| 1231 | args=["-w", "-"], |
| 1232 | flt=flt, |
| 1233 | getfd=True, |
| 1234 | quiet=quiet) |
| 1235 | ), fname) for fname in offline) |
no test coverage detected