MCPcopy Index your code
hub / github.com/ipython/ipython / test_script_streams_continuously

Function test_script_streams_continuously

tests/test_magic.py:1579–1607  ·  view source on GitHub ↗
(capsys)

Source from the content-addressed store, hash-verified

1577
1578@pytest.mark.asyncio
1579async def test_script_streams_continuously(capsys):
1580 ip = get_ipython()
1581 # Windows is slow to start up a thread on CI
1582 is_windows = os.name == "nt"
1583 step = 3 if is_windows else 1
1584 code = dedent(
1585 f"""\
1586 import time
1587 for _ in range(3):
1588 time.sleep({step})
1589 print(".", flush=True, end="")
1590 """
1591 )
1592
1593 def print_numbers():
1594 for i in range(3):
1595 sleep(step)
1596 print(i, flush=True, end="")
1597
1598 thread = Thread(target=print_numbers)
1599 thread.start()
1600 sleep(step / 2)
1601 ip.run_cell_magic("script", sys.executable, code)
1602 thread.join()
1603
1604 captured = capsys.readouterr()
1605 # If the streaming was line-wise or broken
1606 # we would get `012...`
1607 assert captured.out == "0.1.2."
1608
1609
1610def test_script_streams_multibyte_unicode(capsys):

Callers

nothing calls this directly

Calls 4

get_ipythonFunction · 0.90
dedentFunction · 0.85
run_cell_magicMethod · 0.80
startMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…