| 200 | |
| 201 | |
| 202 | class IPNetwork(object): |
| 203 | ADDRLENGTH = {socket.AF_INET: 32, socket.AF_INET6: 128, False: 0} |
| 204 | |
| 205 | def __init__(self, addrs): |
| 206 | self._network_list_v4 = [] |
| 207 | self._network_list_v6 = [] |
| 208 | if type(addrs) == str: |
| 209 | addrs = addrs.split(',') |
| 210 | list(map(self.add_network, addrs)) |
| 211 | |
| 212 | def add_network(self, addr): |
| 213 | if addr is "": |
| 214 | return |
| 215 | block = addr.split('/') |
| 216 | addr_family = is_ip(block[0]) |
| 217 | addr_len = IPNetwork.ADDRLENGTH[addr_family] |
| 218 | if addr_family is socket.AF_INET: |
| 219 | ip, = struct.unpack("!I", socket.inet_aton(block[0])) |
| 220 | elif addr_family is socket.AF_INET6: |
| 221 | hi, lo = struct.unpack("!QQ", inet_pton(addr_family, block[0])) |
| 222 | ip = (hi << 64) | lo |
| 223 | else: |
| 224 | raise Exception("Not a valid CIDR notation: %s" % addr) |
| 225 | if len(block) is 1: |
| 226 | prefix_size = 0 |
| 227 | while (ip & 1) == 0 and ip is not 0: |
| 228 | ip >>= 1 |
| 229 | prefix_size += 1 |
| 230 | logging.warn("You did't specify CIDR routing prefix size for %s, " |
| 231 | "implicit treated as %s/%d" % (addr, addr, addr_len)) |
| 232 | elif block[1].isdigit() and int(block[1]) <= addr_len: |
| 233 | prefix_size = addr_len - int(block[1]) |
| 234 | ip >>= prefix_size |
| 235 | else: |
| 236 | raise Exception("Not a valid CIDR notation: %s" % addr) |
| 237 | if addr_family is socket.AF_INET: |
| 238 | self._network_list_v4.append((ip, prefix_size)) |
| 239 | else: |
| 240 | self._network_list_v6.append((ip, prefix_size)) |
| 241 | |
| 242 | def __contains__(self, addr): |
| 243 | addr_family = is_ip(addr) |
| 244 | if addr_family is socket.AF_INET: |
| 245 | ip, = struct.unpack("!I", socket.inet_aton(addr)) |
| 246 | return any(map(lambda n_ps: n_ps[0] == ip >> n_ps[1], |
| 247 | self._network_list_v4)) |
| 248 | elif addr_family is socket.AF_INET6: |
| 249 | hi, lo = struct.unpack("!QQ", inet_pton(addr_family, addr)) |
| 250 | ip = (hi << 64) | lo |
| 251 | return any(map(lambda n_ps: n_ps[0] == ip >> n_ps[1], |
| 252 | self._network_list_v6)) |
| 253 | else: |
| 254 | return False |
| 255 | |
| 256 | |
| 257 | def test_inet_conv(): |
no outgoing calls