(self, task, timestamp)
| 435 | return self._execute(task, timestamp) |
| 436 | |
| 437 | def _execute(self, task, timestamp): |
| 438 | if self._pre_execute: |
| 439 | try: |
| 440 | self._run_pre_execute(task) |
| 441 | except CancelExecution: |
| 442 | self._emit(S.SIGNAL_CANCELED, task) |
| 443 | if task.chord_config is not None: |
| 444 | self._check_chord(task, None) |
| 445 | return |
| 446 | |
| 447 | start = time.monotonic() |
| 448 | exception = None |
| 449 | retry_eta = None |
| 450 | task_value = None |
| 451 | |
| 452 | # Set deadline for cooperative timeout. |
| 453 | if task.timeout: |
| 454 | task._deadline = start + task.timeout |
| 455 | |
| 456 | try: |
| 457 | self._tasks_in_flight.add(task) |
| 458 | try: |
| 459 | with self._timeout_context(task) as check_timeout: |
| 460 | task_value = task.execute() |
| 461 | finally: |
| 462 | self._tasks_in_flight.remove(task) |
| 463 | duration = time.monotonic() - start |
| 464 | except TaskTimeout as exc: |
| 465 | logger.warning('Task %s timed out after %ss.', task.id, |
| 466 | task.timeout) |
| 467 | exception = exc |
| 468 | self._emit(S.SIGNAL_TIMEOUT, task) |
| 469 | except RateLimitExceeded as exc: |
| 470 | delay = task.retry_delay or exc.delay |
| 471 | if exc.retry or task.retries: |
| 472 | logger.info('Task %s rate-limited on "%s", retrying in %s', |
| 473 | task.id, exc.key, delay) |
| 474 | retry_eta = normalize_time(None, delay, self.utc) |
| 475 | if exc.retry: |
| 476 | task.retries += 1 |
| 477 | else: |
| 478 | logger.info('Task %s rate-limited on "%s"', task.id, exc.key) |
| 479 | exception = exc |
| 480 | self._emit(S.SIGNAL_RATE_LIMITED, task) |
| 481 | except TaskLockedException as exc: |
| 482 | logger.warning('Task %s not run, %s.', task.id, exc) |
| 483 | exception = exc |
| 484 | self._emit(S.SIGNAL_LOCKED, task) |
| 485 | except RetryTask as exc: |
| 486 | logger.info('Task %s raised RetryTask, retrying.', task.id) |
| 487 | task.retries += 1 |
| 488 | if exc.eta or exc.delay is not None: |
| 489 | retry_eta = normalize_time(exc.eta, exc.delay, self.utc) |
| 490 | exception = exc |
| 491 | except CancelExecution as exc: |
| 492 | if exc.retry or (exc.retry is None and task.retries): |
| 493 | task.retries = max(task.retries, 1) |
| 494 | msg = '(task will be retried)' |
no test coverage detected