Read any data available in the channel.
(
self,
backoff: bool = True,
backoff_max: float = 3.0,
delay_factor: Optional[float] = None,
)
| 1454 | return prompt |
| 1455 | |
| 1456 | def clear_buffer( |
| 1457 | self, |
| 1458 | backoff: bool = True, |
| 1459 | backoff_max: float = 3.0, |
| 1460 | delay_factor: Optional[float] = None, |
| 1461 | ) -> str: |
| 1462 | """Read any data available in the channel.""" |
| 1463 | |
| 1464 | if delay_factor is None: |
| 1465 | delay_factor = self.global_delay_factor |
| 1466 | sleep_time = 0.1 * delay_factor |
| 1467 | |
| 1468 | output = "" |
| 1469 | for _ in range(10): |
| 1470 | time.sleep(sleep_time) |
| 1471 | data = self.read_channel() |
| 1472 | data = self.strip_ansi_escape_codes(data) |
| 1473 | output += data |
| 1474 | if not data: |
| 1475 | break |
| 1476 | # Double sleep time each time we detect data |
| 1477 | log.debug("Clear buffer detects data in the channel") |
| 1478 | if backoff: |
| 1479 | sleep_time *= 2 |
| 1480 | sleep_time = backoff_max if sleep_time >= backoff_max else sleep_time |
| 1481 | return output |
| 1482 | |
| 1483 | def command_echo_read(self, cmd: str, read_timeout: float) -> str: |
| 1484 | # Make sure you read until you detect the command echo (avoid getting out of sync) |