MCPcopy
hub / github.com/amoffat/sh / test_multiple_pipes

Method test_multiple_pipes

tests/sh_test.py:461–515  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

459 self.assertEqual(out, '["one two\'s three"]')
460
461 def test_multiple_pipes(self):
462 import time
463
464 py = create_tmp_test(
465 """
466import sys
467import os
468import time
469
470for l in "andrew":
471 sys.stdout.write(l)
472 time.sleep(.2)
473"""
474 )
475
476 inc_py = create_tmp_test(
477 """
478import sys
479while True:
480 letter = sys.stdin.read(1)
481 if not letter:
482 break
483 sys.stdout.write(chr(ord(letter)+1))
484"""
485 )
486
487 def inc(*args, **kwargs):
488 return python("-u", inc_py.name, *args, **kwargs)
489
490 class Derp:
491 def __init__(self):
492 self.times = []
493 self.stdout = []
494 self.last_received = None
495
496 def agg(self, line):
497 self.stdout.append(line.strip())
498 now = time.time()
499 if self.last_received:
500 self.times.append(now - self.last_received)
501 self.last_received = now
502
503 derp = Derp()
504
505 p = inc(
506 _in=inc(
507 _in=inc(_in=python("-u", py.name, _piped=True), _piped=True),
508 _piped=True,
509 ),
510 _out=derp.agg,
511 )
512
513 p.wait()
514 self.assertEqual("".join(derp.stdout), "dqguhz")
515 self.assertTrue(all([t > 0.15 for t in derp.times]))
516
517 def test_manual_stdin_string(self):
518 py = create_stdin_transform_test()

Callers

nothing calls this directly

Calls 3

create_tmp_testFunction · 0.85
DerpClass · 0.85
waitMethod · 0.45

Tested by

no test coverage detected