MCPcopy Index your code
hub / github.com/sshuttle/sshuttle / add

Method add

sshuttle/methods/windivert.py:150–179  ·  view source on GitHub ↗
(self, proto, src_addr, src_port, dst_addr, dst_port, state)

Source from the content-addressed store, hash-verified

148
149 @synchronized_method("rlock")
150 def add(self, proto, src_addr, src_port, dst_addr, dst_port, state):
151 if not self.is_owner:
152 raise RuntimeError("Only owner can mutate ConnTrack")
153 if len(self.used_slots) >= self.max_connections:
154 raise RuntimeError(f"No slot available in ConnTrack {len(self.used_slots)}/{self.max_connections}")
155
156 if self.get(proto, src_addr, src_port):
157 return
158
159 for _ in range(self.max_connections):
160 if self.next_slot not in self.used_slots:
161 break
162 self.next_slot = (self.next_slot + 1) % self.max_connections
163 else:
164 raise RuntimeError("No slot available in ConnTrack") # should not be here
165
166 src_addr = ip_address(src_addr)
167 dst_addr = ip_address(dst_addr)
168 assert src_addr.version == dst_addr.version
169 ip_version = src_addr.version
170 state_epoch = int(time.time())
171 entry = (proto, ip_version, src_addr.packed, src_port, dst_addr.packed, dst_port, state_epoch, state)
172 packed = self.struct_full_tuple.pack(*entry)
173 self.shm_list[self.next_slot] = packed
174 self.used_slots.add(self.next_slot)
175 proto = IPProtocol(proto)
176 debug3(
177 f"ConnTrack: added ({proto.name} {src_addr}:{src_port}->{dst_addr}:{dst_port} @{state_epoch}:{state.name}) to "
178 f"slot={self.next_slot} | #ActiveConn={len(self.used_slots)}"
179 )
180
181 @synchronized_method("rlock")
182 def update(self, proto, src_addr, src_port, state):

Callers 2

windows_nameserversFunction · 0.80
_egress_divertMethod · 0.80

Calls 3

getMethod · 0.95
debug3Function · 0.90
IPProtocolClass · 0.85

Tested by

no test coverage detected