MCPcopy
hub / github.com/HelloZeroNet/ZeroNet / testStreaming

Method testStreaming

src/Test/TestMsgpack.py:41–62  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

39 assert messages[0] == messages[1]
40
41 def testStreaming(self):
42 bin_data = os.urandom(20)
43 f = Msgpack.FilePart("%s/users.json" % config.data_dir, "rb")
44 f.read_bytes = 30
45
46 data = {"cmd": "response", "body": f, "bin": bin_data}
47
48 out_buff = io.BytesIO()
49 Msgpack.stream(data, out_buff.write)
50 out_buff.seek(0)
51
52 data_packb = {
53 "cmd": "response",
54 "body": open("%s/users.json" % config.data_dir, "rb").read(30),
55 "bin": bin_data
56 }
57
58 out_buff.seek(0)
59 data_unpacked = Msgpack.unpack(out_buff.read())
60 assert data_unpacked == data_packb
61 assert data_unpacked["cmd"] == "response"
62 assert type(data_unpacked["body"]) == bytes
63
64 def testBackwardCompatibility(self):
65 packed = {}

Callers

nothing calls this directly

Calls 3

seekMethod · 0.45
readMethod · 0.45
unpackMethod · 0.45

Tested by

no test coverage detected