(self, file_server, site)
| 58 | client.stop() |
| 59 | |
| 60 | def testStreamFile(self, file_server, site): |
| 61 | file_server.ip_incoming = {} # Reset flood protection |
| 62 | client = ConnectionServer(file_server.ip, 1545) |
| 63 | connection = client.getConnection(file_server.ip, 1544) |
| 64 | file_server.sites[site.address] = site |
| 65 | |
| 66 | buff = io.BytesIO() |
| 67 | response = connection.request("streamFile", {"site": site.address, "inner_path": "content.json", "location": 0}, buff) |
| 68 | assert "stream_bytes" in response |
| 69 | assert b"sign" in buff.getvalue() |
| 70 | |
| 71 | # Invalid file |
| 72 | buff = io.BytesIO() |
| 73 | response = connection.request("streamFile", {"site": site.address, "inner_path": "invalid.file", "location": 0}, buff) |
| 74 | assert "File read error" in response["error"] |
| 75 | |
| 76 | # Location over size |
| 77 | buff = io.BytesIO() |
| 78 | response = connection.request( |
| 79 | "streamFile", {"site": site.address, "inner_path": "content.json", "location": 1024 * 1024}, buff |
| 80 | ) |
| 81 | assert "File read error" in response["error"] |
| 82 | |
| 83 | # Stream from parent dir |
| 84 | buff = io.BytesIO() |
| 85 | response = connection.request("streamFile", {"site": site.address, "inner_path": "../users.json", "location": 0}, buff) |
| 86 | assert "File read exception" in response["error"] |
| 87 | |
| 88 | connection.close() |
| 89 | client.stop() |
| 90 | |
| 91 | def testPex(self, file_server, site, site_temp): |
| 92 | file_server.sites[site.address] = site |
nothing calls this directly
no test coverage detected