Accept connection from the other places. Parameters ---------- listen_sock: Socket The socket used by listening process. tracker_conn : connection to tracker Tracker connection ping_period : float, optional ping tracker e
(listen_sock, tracker_conn, ping_period=2)
| 188 | """Listening loop of the server.""" |
| 189 | |
| 190 | def _accept_conn(listen_sock, tracker_conn, ping_period=2): |
| 191 | """Accept connection from the other places. |
| 192 | |
| 193 | Parameters |
| 194 | ---------- |
| 195 | listen_sock: Socket |
| 196 | The socket used by listening process. |
| 197 | |
| 198 | tracker_conn : connection to tracker |
| 199 | Tracker connection |
| 200 | |
| 201 | ping_period : float, optional |
| 202 | ping tracker every k seconds if no connection is accepted. |
| 203 | """ |
| 204 | old_keyset = set() |
| 205 | # Report resource to tracker |
| 206 | if tracker_conn: |
| 207 | matchkey = base.random_key(rpc_key) |
| 208 | base.sendjson(tracker_conn, [TrackerCode.PUT, rpc_key, (port, matchkey), custom_addr]) |
| 209 | assert base.recvjson(tracker_conn) == TrackerCode.SUCCESS |
| 210 | else: |
| 211 | matchkey = rpc_key |
| 212 | |
| 213 | unmatch_period_count = 0 |
| 214 | unmatch_timeout = 4 |
| 215 | # Wait until we get a valid connection |
| 216 | while True: |
| 217 | if tracker_conn: |
| 218 | trigger = select.select([listen_sock], [], [], ping_period) |
| 219 | if listen_sock not in trigger[0]: |
| 220 | base.sendjson(tracker_conn, [TrackerCode.GET_PENDING_MATCHKEYS]) |
| 221 | pending_keys = base.recvjson(tracker_conn) |
| 222 | old_keyset.add(matchkey) |
| 223 | # if match key not in pending key set |
| 224 | # it means the key is acquired by a client but not used. |
| 225 | if matchkey not in pending_keys: |
| 226 | unmatch_period_count += 1 |
| 227 | else: |
| 228 | unmatch_period_count = 0 |
| 229 | # regenerate match key if key is acquired but not used for a while |
| 230 | if unmatch_period_count * ping_period > unmatch_timeout + ping_period: |
| 231 | logger.info("no incoming connections, regenerate key ...") |
| 232 | matchkey = base.random_key(rpc_key, cmap=old_keyset) |
| 233 | base.sendjson( |
| 234 | tracker_conn, [TrackerCode.PUT, rpc_key, (port, matchkey), custom_addr] |
| 235 | ) |
| 236 | assert base.recvjson(tracker_conn) == TrackerCode.SUCCESS |
| 237 | unmatch_period_count = 0 |
| 238 | continue |
| 239 | conn, addr = listen_sock.accept() |
| 240 | magic = struct.unpack("<i", base.recvall(conn, 4))[0] |
| 241 | if magic != base.RPC_MAGIC: |
| 242 | conn.close() |
| 243 | continue |
| 244 | keylen = struct.unpack("<i", base.recvall(conn, 4))[0] |
| 245 | key = (base.recvall(conn, keylen)).decode("utf-8") |
| 246 | arr = key.split() |
| 247 | expect_header = "client:" + matchkey |
no test coverage detected
searching dependent graphs…