| 7 | |
| 8 | class _Handler(socketserver.StreamRequestHandler): |
| 9 | def handle(self): |
| 10 | while line := self.rfile.readline(): |
| 11 | cmd = line.decode().strip() |
| 12 | if cmd == "INFO": |
| 13 | self.wfile.write(json.dumps({"node0": ["node0", f"127.0.0.1:{self.server.server_address[1]}"]}).encode() + b"\r\n") |
| 14 | elif cmd.startswith("STORE_IN"): |
| 15 | data = self.rfile.read(int(cmd.split()[1])) |
| 16 | hashes = bytearray() |
| 17 | for i in range(math.ceil(len(data) / Tensor.CHUNK_SIZE)): |
| 18 | chunk = data[i*Tensor.CHUNK_SIZE:(i+1)*Tensor.CHUNK_SIZE].ljust(Tensor.CHUNK_SIZE, b'\0') |
| 19 | h = _python_hash_1mb(chunk) |
| 20 | _chunks[h] = chunk |
| 21 | hashes.extend(h) |
| 22 | self.wfile.write(hashes) |
| 23 | elif cmd.startswith("LOAD_IN"): |
| 24 | hashes = self.rfile.read(int(cmd.split()[1])) |
| 25 | self.wfile.write(json.dumps(["node0"] * (len(hashes) // 16)).encode() + b"\r\n") |
| 26 | elif cmd.startswith("CHUNK_OUT"): |
| 27 | size = int(cmd.split()[1]) |
| 28 | self.wfile.write(_chunks.get(self.rfile.read(16), bytes(size))[:size]) |
| 29 | self.wfile.flush() |
| 30 | |
| 31 | # regressed in 55d3a5def "preallocate all realized buffers" |
| 32 | class TestTinyFS(unittest.TestCase): |