(self, proto, src_addr, src_port)
| 201 | |
| 202 | @synchronized_method("rlock") |
| 203 | def remove(self, proto, src_addr, src_port): |
| 204 | if not self.is_owner: |
| 205 | raise RuntimeError("Only owner can mutate ConnTrack") |
| 206 | src_addr = ip_address(src_addr) |
| 207 | packed = self.struct_src_tuple.pack(proto, src_addr.version, src_addr.packed, src_port) |
| 208 | for i in self.used_slots: |
| 209 | if self.shm_list[i].startswith(packed): |
| 210 | conn = self._unpack(self.shm_list[i]) |
| 211 | self.shm_list[i] = b"" |
| 212 | self.used_slots.remove(i) |
| 213 | debug3( |
| 214 | f"ConnTrack: removed ({proto.name} src={src_addr}:{src_port} state={conn.state.name}) from slot={i} | " |
| 215 | f"#ActiveConn={len(self.used_slots)}" |
| 216 | ) |
| 217 | return conn |
| 218 | else: |
| 219 | debug3( |
| 220 | f"ConnTrack: ({proto.name} src={src_addr}:{src_port}) is not found to remove |" |
| 221 | f" #ActiveConn={len(self.used_slots)}" |
| 222 | ) |
| 223 | |
| 224 | def get(self, proto, src_addr, src_port): |
| 225 | src_addr = ip_address(src_addr) |
no test coverage detected