(self, timeout_sec=5)
| 122 | self.stream_in.flush() |
| 123 | |
| 124 | def recv(self, timeout_sec=5) -> dict: |
| 125 | while True: |
| 126 | msg = self.stream_reader.get_next(timeout_sec) |
| 127 | if msg.get("method", None) == "window/logMessage": |
| 128 | self.logs.append(msg["params"]["message"]) |
| 129 | continue |
| 130 | return msg |
| 131 | |
| 132 | def add_file(self, path: str | pathlib.Path, contents: str) -> typing.Any: |
| 133 | if isinstance(path, str): |
no test coverage detected