MCPcopy Index your code
hub / github.com/danielgtaylor/python-betterproto / test_multiple_messages

Function test_multiple_messages

tests/test_streams.py:395–411  ·  view source on GitHub ↗
(compile_jar, tmp_path)

Source from the content-addressed store, hash-verified

393
394
395def test_multiple_messages(compile_jar, tmp_path):
396 # Write delimited messages to file
397 with open(tmp_path / "py_multiple_messages.out", "wb") as stream:
398 oneof_example.dump(stream, betterproto.SIZE_DELIMITED)
399 nested_example.dump(stream, betterproto.SIZE_DELIMITED)
400
401 # Have Java read and return the messages
402 run_jar("multiple_messages", tmp_path)
403
404 # Read and check the returned messages
405 with open(tmp_path / "java_multiple_messages.out", "rb") as stream:
406 returned_oneof = oneof.Test().load(stream, betterproto.SIZE_DELIMITED)
407 returned_nested = nested.Test().load(stream, betterproto.SIZE_DELIMITED)
408 assert stream.read() == b""
409
410 assert returned_oneof == oneof_example
411 assert returned_nested == nested_example
412
413
414def test_infinite_messages(compile_jar, tmp_path):

Callers

nothing calls this directly

Calls 3

run_jarFunction · 0.85
dumpMethod · 0.80
loadMethod · 0.80

Tested by

no test coverage detected