MCPcopy
hub / github.com/Lightning-AI/LitServe / test_batch_unbatch_stream

Function test_batch_unbatch_stream

tests/unit/test_litapi.py:113–131  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

111
112
113def test_batch_unbatch_stream():
114 api = TestStreamAPI(max_batch_size=4)
115 api.request_timeout = 30
116 api.pre_setup(spec=None)
117 inputs = [1, 2, 3, 4]
118 output = api.batch(inputs)
119 output = api.predict(output)
120 output = api.unbatch(output)
121 output = api.encode_response(output)
122 first_resp = [o["output"] for o in next(output)]
123 expected_outputs = [[0, 0, 0, 0], [1, 2, 3, 4], [2, 4, 6, 8], [3, 6, 9, 12]]
124 assert first_resp == expected_outputs[0], "First response should be 0s"
125 count = 1
126 for out, expected_output in zip(output, expected_outputs[1:]):
127 resp = [o["output"] for o in out]
128 assert resp == expected_output
129 count += 1
130
131 assert count == 4, "Should have 4 responses"
132
133
134def test_decode_request():

Callers

nothing calls this directly

Calls 6

predictMethod · 0.95
encode_responseMethod · 0.95
TestStreamAPIClass · 0.85
pre_setupMethod · 0.45
batchMethod · 0.45
unbatchMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…