MCPcopy Index your code
hub / github.com/secdev/scapy / IterSocket

Class IterSocket

scapy/supersocket.py:612–654  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

610# More abstract objects
611
612class IterSocket(SuperSocket):
613 desc = "wrapper around an iterable"
614 nonblocking_socket = True
615
616 def __init__(self, obj):
617 # type: (_PacketIterable) -> None
618 if not obj:
619 self.iter = iter([]) # type: Iterator[Packet]
620 elif isinstance(obj, IterSocket):
621 self.iter = obj.iter
622 elif isinstance(obj, SndRcvList):
623 def _iter(obj=cast(SndRcvList, obj)):
624 # type: (SndRcvList) -> Iterator[Packet]
625 for s, r in obj:
626 if s.sent_time:
627 s.time = s.sent_time
628 yield s
629 yield r
630 self.iter = _iter()
631 elif isinstance(obj, (list, PacketList)):
632 if isinstance(obj[0], bytes):
633 self.iter = iter(obj)
634 else:
635 self.iter = (y for x in obj for y in x)
636 else:
637 self.iter = obj.__iter__()
638
639 @staticmethod
640 def select(sockets, remain=None):
641 # type: (List[SuperSocket], Any) -> List[SuperSocket]
642 return sockets
643
644 def recv(self, x=None, **kwargs):
645 # type: (Optional[int], Any) -> Optional[Packet]
646 try:
647 pkt = next(self.iter)
648 return pkt.__class__(bytes(pkt), **kwargs)
649 except StopIteration:
650 raise EOFError
651
652 def close(self):
653 # type: () -> None
654 pass

Callers 3

writeMethod · 0.90
writeMethod · 0.90
_runMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected