MCPcopy Index your code
hub / github.com/docker/docker-py / test_stream_helper_decoding

Method test_stream_helper_decoding

tests/unit/api_test.py:328–362  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

326 self.client.create_host_config(security_opt='wrong')
327
328 def test_stream_helper_decoding(self):
329 status_code, content = fake_api.fake_responses[f"{url_prefix}events"]()
330 content_str = json.dumps(content)
331 content_str = content_str.encode('utf-8')
332 body = io.BytesIO(content_str)
333
334 # mock a stream interface
335 raw_resp = urllib3.HTTPResponse(body=body)
336 raw_resp._fp.chunked = True
337 raw_resp._fp.chunk_left = len(body.getvalue()) - 1
338
339 # pass `decode=False` to the helper
340 raw_resp._fp.seek(0)
341 resp = response(status_code=status_code, content=content, raw=raw_resp)
342 result = next(self.client._stream_helper(resp))
343 assert result == content_str
344
345 # pass `decode=True` to the helper
346 raw_resp._fp.seek(0)
347 resp = response(status_code=status_code, content=content, raw=raw_resp)
348 result = next(self.client._stream_helper(resp, decode=True))
349 assert result == content
350
351 # non-chunked response, pass `decode=False` to the helper
352 raw_resp._fp.chunked = False
353 raw_resp._fp.seek(0)
354 resp = response(status_code=status_code, content=content, raw=raw_resp)
355 result = next(self.client._stream_helper(resp))
356 assert result == content_str.decode('utf-8')
357
358 # non-chunked response, pass `decode=True` to the helper
359 raw_resp._fp.seek(0)
360 resp = response(status_code=status_code, content=content, raw=raw_resp)
361 result = next(self.client._stream_helper(resp, decode=True))
362 assert result == content
363
364
365class UnixSocketStreamTest(unittest.TestCase):

Callers

nothing calls this directly

Calls 2

responseFunction · 0.85
_stream_helperMethod · 0.80

Tested by

no test coverage detected