MCPcopy Index your code
hub / github.com/danielgtaylor/python-betterproto / test_infinite_messages

Function test_infinite_messages

tests/test_streams.py:414–434  ·  view source on GitHub ↗
(compile_jar, tmp_path)

Source from the content-addressed store, hash-verified

412
413
414def test_infinite_messages(compile_jar, tmp_path):
415 num_messages = 5
416
417 # Write delimited messages to file
418 with open(tmp_path / "py_infinite_messages.out", "wb") as stream:
419 for x in range(num_messages):
420 oneof_example.dump(stream, betterproto.SIZE_DELIMITED)
421
422 # Have Java read and return the messages
423 run_jar("infinite_messages", tmp_path)
424
425 # Read and check the returned messages
426 messages = []
427 with open(tmp_path / "java_infinite_messages.out", "rb") as stream:
428 while True:
429 try:
430 messages.append(oneof.Test().load(stream, betterproto.SIZE_DELIMITED))
431 except EOFError:
432 break
433
434 assert len(messages) == num_messages

Callers

nothing calls this directly

Calls 3

run_jarFunction · 0.85
dumpMethod · 0.80
loadMethod · 0.80

Tested by

no test coverage detected