MCPcopy Index your code
hub / github.com/docker/docker-py / ExecDemuxTest

Class ExecDemuxTest

tests/integration/api_exec_test.py:231–313  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

229
230
231class ExecDemuxTest(BaseAPIIntegrationTest):
232 cmd = 'sh -c "{}"'.format(' ; '.join([
233 # Write something on stdout
234 'echo hello out',
235 # Busybox's sleep does not handle sub-second times.
236 # This loops takes ~0.3 second to execute on my machine.
237 'sleep 0.5',
238 # Write something on stderr
239 'echo hello err >&2'])
240 )
241
242 def setUp(self):
243 super().setUp()
244 self.container = self.client.create_container(
245 TEST_IMG, 'cat', detach=True, stdin_open=True
246 )
247 self.client.start(self.container)
248 self.tmp_containers.append(self.container)
249
250 def test_exec_command_no_stream_no_demux(self):
251 # tty=False, stream=False, demux=False
252 res = self.client.exec_create(self.container, self.cmd)
253 exec_log = self.client.exec_start(res)
254 assert b'hello out\n' in exec_log
255 assert b'hello err\n' in exec_log
256
257 def test_exec_command_stream_no_demux(self):
258 # tty=False, stream=True, demux=False
259 res = self.client.exec_create(self.container, self.cmd)
260 exec_log = list(self.client.exec_start(res, stream=True))
261 assert len(exec_log) == 2
262 assert b'hello out\n' in exec_log
263 assert b'hello err\n' in exec_log
264
265 def test_exec_command_no_stream_demux(self):
266 # tty=False, stream=False, demux=True
267 res = self.client.exec_create(self.container, self.cmd)
268 exec_log = self.client.exec_start(res, demux=True)
269 assert exec_log == (b'hello out\n', b'hello err\n')
270
271 def test_exec_command_stream_demux(self):
272 # tty=False, stream=True, demux=True
273 res = self.client.exec_create(self.container, self.cmd)
274 exec_log = list(self.client.exec_start(res, demux=True, stream=True))
275 assert len(exec_log) == 2
276 assert (b'hello out\n', None) in exec_log
277 assert (None, b'hello err\n') in exec_log
278
279 def test_exec_command_tty_no_stream_no_demux(self):
280 # tty=True, stream=False, demux=False
281 res = self.client.exec_create(self.container, self.cmd, tty=True)
282 exec_log = self.client.exec_start(res)
283 assert exec_log == b'hello out\r\nhello err\r\n'
284
285 def test_exec_command_tty_stream_no_demux(self):
286 # tty=True, stream=True, demux=False
287 res = self.client.exec_create(self.container, self.cmd, tty=True)
288 exec_log = list(self.client.exec_start(res, stream=True))

Callers

nothing calls this directly

Calls 1

joinMethod · 0.80

Tested by

no test coverage detected