MCPcopy
hub / github.com/celery/celery / fallback_chord_unlock

Method fallback_chord_unlock

celery/backends/base.py:800–821  ·  view source on GitHub ↗
(self, header_result, body, countdown=1,
                              **kwargs)

Source from the content-addressed store, hash-verified

798 pass
799
800 def fallback_chord_unlock(self, header_result, body, countdown=1,
801 **kwargs):
802 kwargs['result'] = [r.as_tuple() for r in header_result]
803 try:
804 body_type = getattr(body, 'type', None)
805 except NotRegistered:
806 body_type = None
807
808 queue = body.options.get('queue', getattr(body_type, 'queue', None))
809
810 if queue is None:
811 # fallback to default routing if queue name was not
812 # explicitly passed to body callback
813 queue = self.app.amqp.router.route(kwargs, body.name)['queue'].name
814
815 priority = body.options.get('priority', getattr(body_type, 'priority', 0))
816 self.app.tasks['celery.chord_unlock'].apply_async(
817 (header_result.id, body,), kwargs,
818 countdown=countdown,
819 queue=queue,
820 priority=priority,
821 )
822
823 def ensure_chords_allowed(self):
824 pass

Callers 1

apply_chordMethod · 0.95

Calls 4

routeMethod · 0.80
as_tupleMethod · 0.45
getMethod · 0.45
apply_asyncMethod · 0.45

Tested by

no test coverage detected