MCPcopy Index your code
hub / github.com/docker/docker-py / test_early_stream_response

Method test_early_stream_response

tests/unit/api_test.py:435–465  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

433 docker.constants.IS_WINDOWS_PLATFORM, reason='Unix only'
434 )
435 def test_early_stream_response(self):
436 self.request_handler = self.early_response_sending_handler
437 lines = []
438 for i in range(0, 50):
439 line = str(i).encode()
440 lines += [f'{len(line):x}'.encode(), line]
441 lines.append(b'0')
442 lines.append(b'')
443
444 self.response = (
445 b'HTTP/1.1 200 OK\r\n'
446 b'Transfer-Encoding: chunked\r\n'
447 b'\r\n'
448 ) + b'\r\n'.join(lines)
449
450 with APIClient(
451 base_url=f"http+unix://{self.socket_file}",
452 version=DEFAULT_DOCKER_API_VERSION) as client:
453 for i in range(5):
454 try:
455 stream = client.build(
456 path=self.build_context,
457 )
458 break
459 except requests.ConnectionError as e:
460 if i == 4:
461 raise e
462
463 assert list(stream) == [
464 str(i).encode() for i in range(50)
465 ]
466
467
468class TCPSocketStreamTest(unittest.TestCase):

Callers

nothing calls this directly

Calls 3

APIClientClass · 0.90
joinMethod · 0.80
buildMethod · 0.45

Tested by

no test coverage detected