MCPcopy
hub / github.com/Lightning-AI/LitServe / TestStreamAPI

Class TestStreamAPI

tests/unit/test_litapi.py:49–64  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

47
48
49class TestStreamAPI(ls.LitAPI):
50 def setup(self, device) -> None:
51 self.model = None
52
53 def decode_request(self, request):
54 return request["input"]
55
56 def predict(self, x):
57 # x is a list of integers
58 for i in range(4):
59 yield np.asarray(x) * i
60
61 def encode_response(self, output_stream):
62 for output in output_stream:
63 output = list(output)
64 yield [{"output": o} for o in output]
65
66
67def test_default_batch_unbatch():

Callers 1

Calls

no outgoing calls

Tested by 1

Used in the wild real call sites across dependent graphs

searching dependent graphs…