MCPcopy Index your code
hub / github.com/sshuttle/sshuttle / ConnTrack

Class ConnTrack

sshuttle/methods/windivert.py:115–285  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

113
114
115class ConnTrack:
116
117 _instance = None
118
119 def __new__(cls, *args, **kwargs):
120 if not cls._instance:
121 cls._instance = object.__new__(cls)
122 return cls._instance
123 raise RuntimeError("ConnTrack can not be instantiated multiple times")
124
125 def __init__(self, name, max_connections=0) -> None:
126 self.struct_full_tuple = Struct(">" + "".join(("B", "B", "16s", "H", "16s", "H", "L", "B")))
127 self.struct_src_tuple = Struct(">" + "".join(("B", "B", "16s", "H")))
128 self.struct_state_tuple = Struct(">" + "".join(("L", "B")))
129
130 try:
131 self.max_connections = max_connections
132 self.shm_list = shared_memory.ShareableList(
133 [bytes(self.struct_full_tuple.size) for _ in range(max_connections)], name=name
134 )
135 self.is_owner = True
136 self.next_slot = 0
137 self.used_slots = set()
138 self.rlock = threading.RLock()
139 except FileExistsError:
140 self.is_owner = False
141 self.shm_list = shared_memory.ShareableList(name=name)
142 self.max_connections = len(self.shm_list)
143
144 debug2(
145 f"ConnTrack: is_owner={self.is_owner} cap={len(self.shm_list)} item_sz={self.struct_full_tuple.size}B"
146 f"shm_name={self.shm_list.shm.name} shm_sz={self.shm_list.shm.size}B"
147 )
148
149 @synchronized_method("rlock")
150 def add(self, proto, src_addr, src_port, dst_addr, dst_port, state):
151 if not self.is_owner:
152 raise RuntimeError("Only owner can mutate ConnTrack")
153 if len(self.used_slots) >= self.max_connections:
154 raise RuntimeError(f"No slot available in ConnTrack {len(self.used_slots)}/{self.max_connections}")
155
156 if self.get(proto, src_addr, src_port):
157 return
158
159 for _ in range(self.max_connections):
160 if self.next_slot not in self.used_slots:
161 break
162 self.next_slot = (self.next_slot + 1) % self.max_connections
163 else:
164 raise RuntimeError("No slot available in ConnTrack") # should not be here
165
166 src_addr = ip_address(src_addr)
167 dst_addr = ip_address(dst_addr)
168 assert src_addr.version == dst_addr.version
169 ip_version = src_addr.version
170 state_epoch = int(time.time())
171 entry = (proto, ip_version, src_addr.packed, src_port, dst_addr.packed, dst_port, state_epoch, state)
172 packed = self.struct_full_tuple.pack(*entry)

Callers 2

get_tcp_dstipMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected