| 113 | |
| 114 | |
| 115 | class ConnTrack: |
| 116 | |
| 117 | _instance = None |
| 118 | |
| 119 | def __new__(cls, *args, **kwargs): |
| 120 | if not cls._instance: |
| 121 | cls._instance = object.__new__(cls) |
| 122 | return cls._instance |
| 123 | raise RuntimeError("ConnTrack can not be instantiated multiple times") |
| 124 | |
| 125 | def __init__(self, name, max_connections=0) -> None: |
| 126 | self.struct_full_tuple = Struct(">" + "".join(("B", "B", "16s", "H", "16s", "H", "L", "B"))) |
| 127 | self.struct_src_tuple = Struct(">" + "".join(("B", "B", "16s", "H"))) |
| 128 | self.struct_state_tuple = Struct(">" + "".join(("L", "B"))) |
| 129 | |
| 130 | try: |
| 131 | self.max_connections = max_connections |
| 132 | self.shm_list = shared_memory.ShareableList( |
| 133 | [bytes(self.struct_full_tuple.size) for _ in range(max_connections)], name=name |
| 134 | ) |
| 135 | self.is_owner = True |
| 136 | self.next_slot = 0 |
| 137 | self.used_slots = set() |
| 138 | self.rlock = threading.RLock() |
| 139 | except FileExistsError: |
| 140 | self.is_owner = False |
| 141 | self.shm_list = shared_memory.ShareableList(name=name) |
| 142 | self.max_connections = len(self.shm_list) |
| 143 | |
| 144 | debug2( |
| 145 | f"ConnTrack: is_owner={self.is_owner} cap={len(self.shm_list)} item_sz={self.struct_full_tuple.size}B" |
| 146 | f"shm_name={self.shm_list.shm.name} shm_sz={self.shm_list.shm.size}B" |
| 147 | ) |
| 148 | |
| 149 | @synchronized_method("rlock") |
| 150 | def add(self, proto, src_addr, src_port, dst_addr, dst_port, state): |
| 151 | if not self.is_owner: |
| 152 | raise RuntimeError("Only owner can mutate ConnTrack") |
| 153 | if len(self.used_slots) >= self.max_connections: |
| 154 | raise RuntimeError(f"No slot available in ConnTrack {len(self.used_slots)}/{self.max_connections}") |
| 155 | |
| 156 | if self.get(proto, src_addr, src_port): |
| 157 | return |
| 158 | |
| 159 | for _ in range(self.max_connections): |
| 160 | if self.next_slot not in self.used_slots: |
| 161 | break |
| 162 | self.next_slot = (self.next_slot + 1) % self.max_connections |
| 163 | else: |
| 164 | raise RuntimeError("No slot available in ConnTrack") # should not be here |
| 165 | |
| 166 | src_addr = ip_address(src_addr) |
| 167 | dst_addr = ip_address(dst_addr) |
| 168 | assert src_addr.version == dst_addr.version |
| 169 | ip_version = src_addr.version |
| 170 | state_epoch = int(time.time()) |
| 171 | entry = (proto, ip_version, src_addr.packed, src_port, dst_addr.packed, dst_port, state_epoch, state) |
| 172 | packed = self.struct_full_tuple.pack(*entry) |
no outgoing calls
no test coverage detected