(
wormhole_executable,
scripts_env,
mailbox,
transiturl,
tmpdir_factory,
as_subprocess=False,
mode="text",
addslash=False,
override_filename=False,
fake_tor=False,
overwrite=False,
mock_accept=False,
verify=False)
| 414 | |
| 415 | @pytest_twisted.ensureDeferred |
| 416 | async def _do_test( |
| 417 | wormhole_executable, |
| 418 | scripts_env, |
| 419 | mailbox, |
| 420 | transiturl, |
| 421 | tmpdir_factory, |
| 422 | as_subprocess=False, |
| 423 | mode="text", |
| 424 | addslash=False, |
| 425 | override_filename=False, |
| 426 | fake_tor=False, |
| 427 | overwrite=False, |
| 428 | mock_accept=False, |
| 429 | verify=False): |
| 430 | assert mode in ("text", "file", "empty-file", "directory", "slow-text", |
| 431 | "slow-sender-text") |
| 432 | if fake_tor: |
| 433 | assert not as_subprocess |
| 434 | send_cfg = config("send") |
| 435 | recv_cfg = config("receive") |
| 436 | message = "blah blah blah ponies" |
| 437 | |
| 438 | for cfg in [send_cfg, recv_cfg]: |
| 439 | cfg.hide_progress = True |
| 440 | cfg.relay_url = mailbox.url |
| 441 | cfg.transit_helper = "" |
| 442 | cfg.listen = True |
| 443 | cfg.code = "1-abc" |
| 444 | cfg.stdout = io.StringIO() |
| 445 | cfg.stderr = io.StringIO() |
| 446 | cfg.verify = verify |
| 447 | |
| 448 | send_dir = tmpdir_factory.mktemp("sender") |
| 449 | receive_dir = tmpdir_factory.mktemp("receiver") |
| 450 | |
| 451 | if mode in ("text", "slow-text", "slow-sender-text"): |
| 452 | send_cfg.text = message |
| 453 | |
| 454 | elif mode in ("file", "empty-file"): |
| 455 | if mode == "empty-file": |
| 456 | message = "" |
| 457 | send_filename = "testfil\u00EB" # e-with-diaeresis |
| 458 | with open(os.path.join(send_dir, send_filename), "w") as f: |
| 459 | f.write(message) |
| 460 | send_cfg.what = send_filename |
| 461 | receive_filename = send_filename |
| 462 | |
| 463 | recv_cfg.accept_file = False if mock_accept else True |
| 464 | if override_filename: |
| 465 | recv_cfg.output_file = receive_filename = "outfile" |
| 466 | if overwrite: |
| 467 | recv_cfg.output_file = receive_filename |
| 468 | existing_file = os.path.join(receive_dir, receive_filename) |
| 469 | with open(existing_file, 'w') as f: |
| 470 | f.write('pls overwrite me') |
| 471 | |
| 472 | elif mode == "directory": |
| 473 | # $send_dir/ |
no test coverage detected