| 143 | |
| 144 | |
| 145 | class ClientPlayback: |
| 146 | playback_task: asyncio.Task | None = None |
| 147 | inflight: http.HTTPFlow | None |
| 148 | queue: asyncio.Queue |
| 149 | options: Options |
| 150 | replay_tasks: set[asyncio.Task] |
| 151 | |
| 152 | def __init__(self): |
| 153 | self.queue = asyncio.Queue() |
| 154 | self.inflight = None |
| 155 | self.task = None |
| 156 | self.replay_tasks = set() |
| 157 | |
| 158 | def running(self): |
| 159 | self.options = ctx.options |
| 160 | self.playback_task = asyncio_utils.create_task( |
| 161 | self.playback(), |
| 162 | name="client playback", |
| 163 | keep_ref=False, |
| 164 | ) |
| 165 | |
| 166 | async def done(self): |
| 167 | if self.playback_task: |
| 168 | self.playback_task.cancel() |
| 169 | try: |
| 170 | await self.playback_task |
| 171 | except asyncio.CancelledError: |
| 172 | pass |
| 173 | |
| 174 | async def playback(self): |
| 175 | while True: |
| 176 | self.inflight = await self.queue.get() |
| 177 | try: |
| 178 | assert self.inflight |
| 179 | h = ReplayHandler(self.inflight, self.options) |
| 180 | if ctx.options.client_replay_concurrency == -1: |
| 181 | t = asyncio_utils.create_task( |
| 182 | h.replay(), |
| 183 | name="client playback awaiting response", |
| 184 | keep_ref=False, |
| 185 | ) |
| 186 | # keep a reference so this is not garbage collected |
| 187 | self.replay_tasks.add(t) |
| 188 | t.add_done_callback(self.replay_tasks.remove) |
| 189 | else: |
| 190 | await h.replay() |
| 191 | except Exception: |
| 192 | logger.exception(f"Client replay has crashed!") |
| 193 | self.queue.task_done() |
| 194 | self.inflight = None |
| 195 | |
| 196 | def check(self, f: flow.Flow) -> str | None: |
| 197 | if f.live or f == self.inflight: |
| 198 | return "Can't replay live flow." |
| 199 | if f.intercepted: |
| 200 | return "Can't replay intercepted flow." |
| 201 | if isinstance(f, http.HTTPFlow): |
| 202 | if not f.request: |
no outgoing calls
searching dependent graphs…