(self, proto, src_addr, src_port, dst_addr, dst_port, state)
| 148 | |
| 149 | @synchronized_method("rlock") |
| 150 | def add(self, proto, src_addr, src_port, dst_addr, dst_port, state): |
| 151 | if not self.is_owner: |
| 152 | raise RuntimeError("Only owner can mutate ConnTrack") |
| 153 | if len(self.used_slots) >= self.max_connections: |
| 154 | raise RuntimeError(f"No slot available in ConnTrack {len(self.used_slots)}/{self.max_connections}") |
| 155 | |
| 156 | if self.get(proto, src_addr, src_port): |
| 157 | return |
| 158 | |
| 159 | for _ in range(self.max_connections): |
| 160 | if self.next_slot not in self.used_slots: |
| 161 | break |
| 162 | self.next_slot = (self.next_slot + 1) % self.max_connections |
| 163 | else: |
| 164 | raise RuntimeError("No slot available in ConnTrack") # should not be here |
| 165 | |
| 166 | src_addr = ip_address(src_addr) |
| 167 | dst_addr = ip_address(dst_addr) |
| 168 | assert src_addr.version == dst_addr.version |
| 169 | ip_version = src_addr.version |
| 170 | state_epoch = int(time.time()) |
| 171 | entry = (proto, ip_version, src_addr.packed, src_port, dst_addr.packed, dst_port, state_epoch, state) |
| 172 | packed = self.struct_full_tuple.pack(*entry) |
| 173 | self.shm_list[self.next_slot] = packed |
| 174 | self.used_slots.add(self.next_slot) |
| 175 | proto = IPProtocol(proto) |
| 176 | debug3( |
| 177 | f"ConnTrack: added ({proto.name} {src_addr}:{src_port}->{dst_addr}:{dst_port} @{state_epoch}:{state.name}) to " |
| 178 | f"slot={self.next_slot} | #ActiveConn={len(self.used_slots)}" |
| 179 | ) |
| 180 | |
| 181 | @synchronized_method("rlock") |
| 182 | def update(self, proto, src_addr, src_port, state): |
no test coverage detected