(self, header_result, body, countdown=1,
**kwargs)
| 798 | pass |
| 799 | |
| 800 | def fallback_chord_unlock(self, header_result, body, countdown=1, |
| 801 | **kwargs): |
| 802 | kwargs['result'] = [r.as_tuple() for r in header_result] |
| 803 | try: |
| 804 | body_type = getattr(body, 'type', None) |
| 805 | except NotRegistered: |
| 806 | body_type = None |
| 807 | |
| 808 | queue = body.options.get('queue', getattr(body_type, 'queue', None)) |
| 809 | |
| 810 | if queue is None: |
| 811 | # fallback to default routing if queue name was not |
| 812 | # explicitly passed to body callback |
| 813 | queue = self.app.amqp.router.route(kwargs, body.name)['queue'].name |
| 814 | |
| 815 | priority = body.options.get('priority', getattr(body_type, 'priority', 0)) |
| 816 | self.app.tasks['celery.chord_unlock'].apply_async( |
| 817 | (header_result.id, body,), kwargs, |
| 818 | countdown=countdown, |
| 819 | queue=queue, |
| 820 | priority=priority, |
| 821 | ) |
| 822 | |
| 823 | def ensure_chords_allowed(self): |
| 824 | pass |
no test coverage detected