Process each packet: matches the TCP seq/ack numbers to follow the TCP streams, and orders the fragments.
(self,
pkt: Packet,
cls: Optional[Type[Packet]] = None)
| 260 | return None |
| 261 | |
| 262 | def process(self, |
| 263 | pkt: Packet, |
| 264 | cls: Optional[Type[Packet]] = None) -> Optional[Packet]: |
| 265 | """Process each packet: matches the TCP seq/ack numbers |
| 266 | to follow the TCP streams, and orders the fragments. |
| 267 | """ |
| 268 | packet = None # type: Optional[Packet] |
| 269 | if self.app: |
| 270 | # Special mode: Application layer. Use on top of TCP |
| 271 | self.data.append(bytes(pkt)) |
| 272 | if cls is None and not isinstance(pkt, bytes): |
| 273 | cls = pkt.__class__ |
| 274 | if "tcp_reassemble" in self.metadata: |
| 275 | tcp_reassemble = self.metadata["tcp_reassemble"] |
| 276 | elif cls is not None: |
| 277 | self.metadata["tcp_reassemble"] = tcp_reassemble = streamcls(cls) |
| 278 | else: |
| 279 | return None |
| 280 | if self.data.full(): |
| 281 | packet = tcp_reassemble( |
| 282 | bytes(self.data), |
| 283 | self.metadata, |
| 284 | self.session, |
| 285 | ) |
| 286 | if packet: |
| 287 | padding = self._strip_padding(packet) |
| 288 | if padding: |
| 289 | # There is remaining data for the next payload. |
| 290 | self.data.shiftleft(len(self.data) - len(padding)) |
| 291 | # Skip full-padding |
| 292 | if isinstance(packet, conf.padding_layer): |
| 293 | return None |
| 294 | else: |
| 295 | # No padding (data) left. Clear |
| 296 | self.data.clear() |
| 297 | self.metadata.clear() |
| 298 | return packet |
| 299 | return None |
| 300 | |
| 301 | _pkt = super(TCPSession, self).process(pkt) |
| 302 | if _pkt is None: |
| 303 | return None |
| 304 | else: # Python 3.8 := would be nice |
| 305 | pkt = _pkt |
| 306 | |
| 307 | from scapy.layers.inet import IP, TCP |
| 308 | if not pkt: |
| 309 | return None |
| 310 | if TCP not in pkt: |
| 311 | return pkt |
| 312 | pay = pkt[TCP].payload |
| 313 | new_data = pay.original |
| 314 | # Match packets by a unique TCP identifier |
| 315 | ident = self._get_ident(pkt) |
| 316 | data, metadata = self.tcp_frags[ident] |
| 317 | tcp_session = self.tcp_sessions[self._get_ident(pkt, True)] |
| 318 | # Handle TCP sequence numbers |
| 319 | seq = pkt[TCP].seq |
no test coverage detected