Create task message in protocol 1 format.
(
name, # type: str
id=None, # type: str
args=(), # type: Sequence
kwargs=None, # type: Mapping
callbacks=None, # type: Sequence[Signature]
errbacks=None, # type: Sequence[Signature]
chain=None, # type: Sequence[Signature]
**options # type: Any
)
| 43 | |
| 44 | |
| 45 | def TaskMessage1( |
| 46 | name, # type: str |
| 47 | id=None, # type: str |
| 48 | args=(), # type: Sequence |
| 49 | kwargs=None, # type: Mapping |
| 50 | callbacks=None, # type: Sequence[Signature] |
| 51 | errbacks=None, # type: Sequence[Signature] |
| 52 | chain=None, # type: Sequence[Signature] |
| 53 | **options # type: Any |
| 54 | ): |
| 55 | # type: (...) -> Any |
| 56 | """Create task message in protocol 1 format.""" |
| 57 | kwargs = {} if not kwargs else kwargs |
| 58 | from kombu.serialization import dumps |
| 59 | |
| 60 | from celery import uuid |
| 61 | id = id or uuid() |
| 62 | message = Mock(name=f'TaskMessage-{id}') |
| 63 | message.headers = {} |
| 64 | message.payload = { |
| 65 | 'task': name, |
| 66 | 'id': id, |
| 67 | 'args': args, |
| 68 | 'kwargs': kwargs, |
| 69 | 'callbacks': callbacks, |
| 70 | 'errbacks': errbacks, |
| 71 | } |
| 72 | message.payload.update(options) |
| 73 | message.content_type, message.content_encoding, message.body = dumps( |
| 74 | message.payload, |
| 75 | ) |
| 76 | return message |
| 77 | |
| 78 | |
| 79 | def task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage): |
nothing calls this directly
no test coverage detected
searching dependent graphs…