Poll a batch job until it completes, fails, or times out. Args: client: An initialized OpenAI client. batch_id: The batch ID to poll. poll_interval: Seconds between status checks. max_wait_time: Maximum seconds to wait before giving up. Returns: Fina
(
client: OpenAI,
batch_id: str,
poll_interval: int = DEFAULT_POLL_INTERVAL,
max_wait_time: int = DEFAULT_MAX_WAIT_TIME,
)
| 188 | |
| 189 | |
| 190 | def poll_batch_until_complete( |
| 191 | client: OpenAI, |
| 192 | batch_id: str, |
| 193 | poll_interval: int = DEFAULT_POLL_INTERVAL, |
| 194 | max_wait_time: int = DEFAULT_MAX_WAIT_TIME, |
| 195 | ) -> BatchJobInfo: |
| 196 | """Poll a batch job until it completes, fails, or times out. |
| 197 | |
| 198 | Args: |
| 199 | client: An initialized OpenAI client. |
| 200 | batch_id: The batch ID to poll. |
| 201 | poll_interval: Seconds between status checks. |
| 202 | max_wait_time: Maximum seconds to wait before giving up. |
| 203 | |
| 204 | Returns: |
| 205 | Final BatchJobInfo when the batch reaches a terminal state. |
| 206 | |
| 207 | Raises: |
| 208 | TimeoutError: If max_wait_time is exceeded. |
| 209 | RuntimeError: If the batch fails or is cancelled. |
| 210 | """ |
| 211 | terminal_states = {"completed", "failed", "expired", "cancelled"} |
| 212 | start_time = time.time() |
| 213 | |
| 214 | logger.info( |
| 215 | f"Polling batch {batch_id} every {poll_interval}s " |
| 216 | f"(max wait: {max_wait_time}s)..." |
| 217 | ) |
| 218 | |
| 219 | while True: |
| 220 | elapsed = time.time() - start_time |
| 221 | if elapsed > max_wait_time: |
| 222 | raise TimeoutError( |
| 223 | f"Batch {batch_id} did not complete within " |
| 224 | f"{max_wait_time}s (last status check at {elapsed:.0f}s)" |
| 225 | ) |
| 226 | |
| 227 | info = get_batch_status(client, batch_id) |
| 228 | |
| 229 | logger.info( |
| 230 | f"Batch {batch_id}: {info.status} " |
| 231 | f"({info.completed_requests}/{info.total_requests} done, " |
| 232 | f"{info.failed_requests} failed)" |
| 233 | ) |
| 234 | |
| 235 | if info.status in terminal_states: |
| 236 | if info.status == "failed": |
| 237 | raise RuntimeError( |
| 238 | f"Batch {batch_id} failed. " |
| 239 | f"Error file: {info.error_file_id}" |
| 240 | ) |
| 241 | if info.status in {"expired", "cancelled"}: |
| 242 | raise RuntimeError( |
| 243 | f"Batch {batch_id} was {info.status}." |
| 244 | ) |
| 245 | return info |
| 246 | |
| 247 | time.sleep(poll_interval) |
no test coverage detected