MCPcopy
hub / github.com/sanic-org/sanic / test_streaming_new_api

Function test_streaming_new_api

tests/test_request_stream.py:510–547  ·  view source on GitHub ↗
(app)

Source from the content-addressed store, hash-verified

508
509
510def test_streaming_new_api(app):
511 @app.post("/non-stream")
512 async def handler1(request):
513 assert request.body == b"x"
514 await request.receive_body() # This should do nothing
515 assert request.body == b"x"
516 return text("OK")
517
518 @app.post("/1", stream=True)
519 async def handler2(request):
520 assert request.stream
521 assert not request.body
522 await request.receive_body()
523 return text(request.body.decode().upper())
524
525 @app.post("/2", stream=True)
526 async def handler(request):
527 ret = []
528 async for data in request.stream:
529 # We should have no b"" or None, just proper chunks
530 assert data
531 assert isinstance(data, bytes)
532 ret.append(data.decode("ASCII"))
533 return json(ret)
534
535 request, response = app.test_client.post("/non-stream", data="x")
536 assert response.status == 200
537
538 request, response = app.test_client.post("/1", data="TEST data")
539 assert request.body == b"TEST data"
540 assert response.status == 200
541 assert response.text == "TEST DATA"
542
543 request, response = app.test_client.post("/2", data=data)
544 assert response.status == 200
545 res = response.json
546 assert isinstance(res, list)
547 assert "".join(res) == data
548
549
550def test_streaming_echo():

Callers

nothing calls this directly

Calls 2

postMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…