(remote, local_session)
| 168 | @pytest.mark.skipif(not env.has_rpc(), reason="need rpc") |
| 169 | def test_rpc_echo(): |
| 170 | def check(remote, local_session): |
| 171 | fecho = remote.get_function("testing.echo") |
| 172 | assert fecho(1, 2, 3) == 1 |
| 173 | assert fecho(100, 2, 3) == 100 |
| 174 | assert fecho("xyz") == "xyz" |
| 175 | assert bytes(fecho(bytearray(b"123"))) == b"123" |
| 176 | |
| 177 | with pytest.raises(RuntimeError): |
| 178 | raise_err = remote.get_function("testing.test_raise_error") |
| 179 | raise_err("RuntimeError", "msg") |
| 180 | |
| 181 | remote.cpu().sync() |
| 182 | # tests around system lib are not threadsafe by design |
| 183 | # and do not work well with multithread pytest |
| 184 | # skip local session as they are being tested elsewhere |
| 185 | if not local_session: |
| 186 | with pytest.raises(AttributeError): |
| 187 | f3 = remote.system_lib()["notexist"] |
| 188 | |
| 189 | temp = rpc.server._server_env([]) |
| 190 | server = rpc.Server() |
no test coverage detected
searching dependent graphs…