MCPcopy Index your code
hub / github.com/HelloZeroNet/ZeroNet / testStreamFile

Method testStreamFile

src/Test/TestFileRequest.py:60–89  ·  view source on GitHub ↗
(self, file_server, site)

Source from the content-addressed store, hash-verified

58 client.stop()
59
60 def testStreamFile(self, file_server, site):
61 file_server.ip_incoming = {} # Reset flood protection
62 client = ConnectionServer(file_server.ip, 1545)
63 connection = client.getConnection(file_server.ip, 1544)
64 file_server.sites[site.address] = site
65
66 buff = io.BytesIO()
67 response = connection.request("streamFile", {"site": site.address, "inner_path": "content.json", "location": 0}, buff)
68 assert "stream_bytes" in response
69 assert b"sign" in buff.getvalue()
70
71 # Invalid file
72 buff = io.BytesIO()
73 response = connection.request("streamFile", {"site": site.address, "inner_path": "invalid.file", "location": 0}, buff)
74 assert "File read error" in response["error"]
75
76 # Location over size
77 buff = io.BytesIO()
78 response = connection.request(
79 "streamFile", {"site": site.address, "inner_path": "content.json", "location": 1024 * 1024}, buff
80 )
81 assert "File read error" in response["error"]
82
83 # Stream from parent dir
84 buff = io.BytesIO()
85 response = connection.request("streamFile", {"site": site.address, "inner_path": "../users.json", "location": 0}, buff)
86 assert "File read exception" in response["error"]
87
88 connection.close()
89 client.stop()
90
91 def testPex(self, file_server, site, site_temp):
92 file_server.sites[site.address] = site

Callers

nothing calls this directly

Calls 5

getConnectionMethod · 0.95
stopMethod · 0.95
ConnectionServerClass · 0.90
requestMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected