| 412 | datetime.datetime.now()) |
| 413 | |
| 414 | def execute(self, task, timestamp=None): |
| 415 | if timestamp is None: |
| 416 | timestamp = self._get_timestamp() |
| 417 | |
| 418 | if not self.ready_to_run(task, timestamp): |
| 419 | self.add_schedule(task) |
| 420 | elif self.is_revoked(task, timestamp, False): |
| 421 | logger.warning('Task %s was revoked, not executing', task) |
| 422 | self._emit(S.SIGNAL_REVOKED, task) |
| 423 | if task.chord_config is not None: |
| 424 | # Contribute a placeholder result for the skipped task so the |
| 425 | # chord callback can still fire when the rest are done. |
| 426 | self._check_chord(task, None) |
| 427 | elif task.expires_resolved and task.expires_resolved < timestamp: |
| 428 | logger.info('Task %s expired, not executing.', task) |
| 429 | self._emit(S.SIGNAL_EXPIRED, task) |
| 430 | if task.chord_config is not None: |
| 431 | self._check_chord(task, None) |
| 432 | else: |
| 433 | logger.info('Executing %s', task) |
| 434 | self._emit(S.SIGNAL_EXECUTING, task) |
| 435 | return self._execute(task, timestamp) |
| 436 | |
| 437 | def _execute(self, task, timestamp): |
| 438 | if self._pre_execute: |