(self, fd, to_send)
| 122 | self.ratelimit = None |
| 123 | |
| 124 | def write(self, fd, to_send): |
| 125 | if self.ratelimit: |
| 126 | now = time.monotonic() |
| 127 | if self.ratelimit_last + RATELIMIT_PERIOD <= now: |
| 128 | self.ratelimit_quota += self.ratelimit |
| 129 | if self.ratelimit_quota > 2 * self.ratelimit: |
| 130 | self.ratelimit_quota = 2 * self.ratelimit |
| 131 | self.ratelimit_last = now |
| 132 | if self.ratelimit_quota == 0: |
| 133 | tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now |
| 134 | time.sleep(tosleep) |
| 135 | self.ratelimit_quota += self.ratelimit |
| 136 | self.ratelimit_last = time.monotonic() |
| 137 | if len(to_send) > self.ratelimit_quota: |
| 138 | to_send = to_send[: self.ratelimit_quota] |
| 139 | try: |
| 140 | written = os.write(fd, to_send) |
| 141 | except BrokenPipeError: |
| 142 | raise ConnectionBrokenWithHint("Broken Pipe") from None |
| 143 | if self.ratelimit: |
| 144 | self.ratelimit_quota -= written |
| 145 | return written |
| 146 | |
| 147 | |
| 148 | def api(*, since, **kwargs_decorator): |
no test coverage detected