| 41 | assert env["JUST"] == "ME" |
| 42 | |
| 43 | class run: |
| 44 | def calls_expected_paramiko_bits(self, remote): |
| 45 | # remote mocking makes generic safety checks like "were |
| 46 | # get_transport and open_session called", but we also want to make |
| 47 | # sure that exec_command got run with our arg to run(). |
| 48 | remote.expect(cmd=CMD) |
| 49 | _runner().run(CMD) |
| 50 | |
| 51 | def writes_remote_streams_to_local_streams(self, remote): |
| 52 | remote.expect(out=b"hello yes this is dog") |
| 53 | fakeout = StringIO() |
| 54 | _runner().run(CMD, out_stream=fakeout) |
| 55 | assert fakeout.getvalue() == "hello yes this is dog" |
| 56 | |
| 57 | def return_value_is_Result_subclass_exposing_cxn_used(self, remote): |
| 58 | c = _Connection("host") |
| 59 | result = Remote(context=c).run(CMD) |
| 60 | assert isinstance(result, Result) |
| 61 | # Mild safety check for other Result superclass bits |
| 62 | assert result.ok is True |
| 63 | assert result.exited == 0 |
| 64 | # Test the attr our own subclass adds |
| 65 | assert result.connection is c |
| 66 | |
| 67 | def channel_is_closed_normally(self, remote): |
| 68 | chan = remote.expect() |
| 69 | # I.e. Remote.stop() closes the channel automatically |
| 70 | _runner().run(CMD) |
| 71 | chan.close.assert_called_once_with() |
| 72 | |
| 73 | def channel_is_closed_on_body_exceptions(self, remote): |
| 74 | chan = remote.expect() |
| 75 | |
| 76 | # I.e. Remote.stop() is called within a try/finally. |
| 77 | # Technically is just testing invoke.Runner, but meh. |
| 78 | class Oops(Exception): |
| 79 | pass |
| 80 | |
| 81 | class _OopsRemote(Remote): |
| 82 | def wait(self): |
| 83 | raise Oops() |
| 84 | |
| 85 | r = _OopsRemote(context=_Connection("host")) |
| 86 | try: |
| 87 | r.run(CMD) |
| 88 | except Oops: |
| 89 | chan.close.assert_called_once_with() |
| 90 | else: |
| 91 | assert False, "Runner failed to raise exception!" |
| 92 | |
| 93 | def stop_calls_super_correctly(self, remote): |
| 94 | # RE: #2241 |
| 95 | Runner.stop = Mock() |
| 96 | _runner().run(CMD) |
| 97 | Runner.stop.assert_called_once_with() |
| 98 | |
| 99 | def channel_close_skipped_when_channel_not_even_made(self): |
| 100 | # I.e. if obtaining self.channel doesn't even happen (i.e. if |
no outgoing calls
no test coverage detected