Canvas (Workflows)¶
Canvas primitives for composing task workflows.
Signature¶
Signature ¶
Bases: dict
Task Signature.
Class that wraps the arguments and execution options for a single task invocation.
Used as the parts in a :class:group and other constructs,
or to pass tasks around as callbacks while being compatible
with serializers with a strict type subset.
Signatures can also be created from tasks:
-
Using the
.signature()method that has the same signature asTask.apply_async:.. code-block:: pycon
>>> add.signature(args=(1,), kwargs={'kw': 2}, options={}) -
or the
.s()shortcut that works for star arguments:.. code-block:: pycon
>>> add.s(1, kw=2) -
the
.s()shortcut does not allow you to specify execution options but there's a chaining.setmethod that returns the signature:.. code-block:: pycon
>>> add.s(2, 2).set(countdown=10).set(expires=30).delay()
Note:
You should use :func:~celery.signature to create new signatures.
The Signature class is the type returned by that function and
should be used for isinstance checks for signatures.
See Also:
:ref:guide-canvas for the complete guide.
Arguments:
task (Union[Type[celery.app.task.Task], str]): Either a task
class/instance, or the name of a task.
args (Tuple): Positional arguments to apply.
kwargs (Dict): Keyword arguments to apply.
options (Dict): Additional options to :meth:Task.apply_async.
Note:
If the first argument is a :class:dict, the other
arguments will be ignored and the values in the dict will be used
instead::
>>> s = signature('tasks.add', args=(2, 2))
>>> signature(s)
{'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
__init__ ¶
__init__(
task=None,
args=None,
kwargs=None,
options=None,
type=None,
subtask_type=None,
immutable=False,
app=None,
**ex,
)
apply_async ¶
Apply this task asynchronously.
Arguments: args (Tuple): Partial args to be prepended to the existing args. kwargs (Dict): Partial kwargs to be merged with existing kwargs. options (Dict): Partial options to be merged with existing options.
Returns: ~@AsyncResult: promise of future evaluation.
See also:
:meth:~@Task.apply_async and the :ref:guide-calling guide.
apply ¶
Call task locally.
Same as :meth:apply_async but executed the task inline instead
of sending a task message.
set ¶
Set arbitrary execution options (same as .options.update(…)).
Returns:
Signature: This is a chaining method call
(i.e., it will return self).
clone ¶
Create a copy of this signature.
Arguments: args (Tuple): Partial args to be prepended to the existing args. kwargs (Dict): Partial kwargs to be merged with existing kwargs. options (Dict): Partial options to be merged with existing options.
link ¶
Add callback task to be applied if this task succeeds.
Returns:
Signature: the argument passed, for chaining
or use with :func:~functools.reduce.
link_error ¶
Add callback task to be applied on error in task execution.
Returns:
Signature: the argument passed, for chaining
or use with :func:~functools.reduce.
freeze ¶
Finalize the signature by adding a concrete task id.
The task won't be called and you shouldn't call the signature twice after freezing it as that'll result in two task messages using the same task id.
The arguments are used to override the signature's headers during freezing.
Arguments: _id (str): Task id to use if it didn't already have one. New UUID is generated if not provided. group_id (str): Group id to use if it didn't already have one. chord (Signature): Chord body when freezing a chord header. root_id (str): Root id to use. parent_id (str): Parent id to use. group_index (int): Group index to use.
Returns: ~@AsyncResult: promise of future evaluation.
chain¶
_chain ¶
Bases: Signature
unchain_tasks ¶
Return a list of tasks in the chain.
The tasks list would be cloned from the chain's tasks. All of the chain callbacks would be added to the last task in the (cloned) chain. All of the tasks would be linked to the same error callback as the chain itself, to ensure that the correct error callback is called if any of the (cloned) tasks of the chain fail.
aapply_async
async
¶
Async version of :meth:apply_async.
This method shares preparation logic with :meth:apply_async and
delegates to :meth:arun for async task execution.
Arguments and return value are the same as :meth:apply_async.
run ¶
run(
args=None,
kwargs=None,
group_id=None,
chord=None,
task_id=None,
link=None,
link_error=None,
publisher=None,
producer=None,
root_id=None,
parent_id=None,
app=None,
group_index=None,
**options,
)
Executes the chain.
Responsible for executing the chain in the correct order. In a case of a chain of a single task, the task is executed directly and the result is returned for that task specifically.
arun
async
¶
arun(
args=None,
kwargs=None,
group_id=None,
chord=None,
task_id=None,
link=None,
link_error=None,
publisher=None,
producer=None,
root_id=None,
parent_id=None,
app=None,
group_index=None,
**options,
)
Async version of :meth:run.
Executes the chain asynchronously.
prepare_steps ¶
prepare_steps(
args,
kwargs,
tasks,
root_id=None,
parent_id=None,
link_error=None,
app=None,
last_task_id=None,
group_id=None,
chord_body=None,
clone=True,
from_dict=Signature.from_dict,
group_index=None,
)
Prepare the chain for execution.
To execute a chain, we first need to unpack it correctly. During the unpacking, we might encounter other chains, groups, or chords which we need to unpack as well.
For example: chain(signature1, chain(signature2, signature3)) --> Upgrades to chain(signature1, signature2, signature3) chain(group(signature1, signature2), signature3) --> Upgrades to chord([signature1, signature2], signature3)
The responsibility of this method is to ensure that the chain is correctly unpacked, and then the correct callbacks are set up along the way.
Arguments: args (Tuple): Partial args to be prepended to the existing args. kwargs (Dict): Partial kwargs to be merged with existing kwargs. tasks (List[Signature]): The tasks of the chain. root_id (str): The id of the root task. parent_id (str): The id of the parent task. link_error (Union[List[Signature], Signature]): The error callback. will be set for all tasks in the chain. app (Celery): The Celery app instance. last_task_id (str): The id of the last task in the chain. group_id (str): The id of the group that the chain is a part of. chord_body (Signature): The body of the chord, used to synchronize with the chain's last task and the chord's body when used together. clone (bool): Whether to clone the chain's tasks before modifying them. from_dict (Callable): A function that takes a dict and returns a Signature.
Returns: Tuple[List[Signature], List[AsyncResult]]: The frozen tasks of the chain, and the async results
aapply
async
¶
Async version of :meth:apply.
Arguments and return value are the same as :meth:apply.
group¶
group ¶
Bases: Signature
Creates a group of tasks to be executed in parallel.
A group is lazy so you must call it to take action and evaluate the group.
Note:
If only one argument is passed, and that argument is an iterable
then that'll be used as the list of tasks instead: this
allows us to use group with generator expressions.
Example: >>> lazy_group = group([add.s(2, 2), add.s(4, 4)]) >>> promise = lazy_group() # <-- evaluate: returns lazy result. >>> promise.get() # <-- will wait for the task to return [4, 8]
Arguments: tasks (List[Signature]): A list of signatures that this group will call. If there's only one argument, and that argument is an iterable, then that'll define the list of signatures instead. *options (Any): Execution options applied to all tasks in the group.
Returns:
~celery.group: signature that when called will then call all of the
tasks in the group (and return a :class:GroupResult instance
that can be used to inspect the state of the group).
chord¶
_chord ¶
Bases: Signature
Barrier synchronization primitive.
A chord consists of a header and a body.
The header is a group of tasks that must complete before the callback is called. A chord is essentially a callback for a group of tasks.
The body is applied with the return values of all the header tasks as a list.
Example:
The chord:
.. code-block:: pycon
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())
is effectively :math:`\Sigma ((2 + 2) + (4 + 4))`:
.. code-block:: pycon
>>> res.get()
12
from_dict
classmethod
¶
Create a chord signature from a dictionary that represents a chord.
Example: >>> chord_dict = { "task": "celery.chord", "args": [], "kwargs": { "kwargs": {}, "header": [ { "task": "add", "args": [ 1, 2 ], "kwargs": {}, "options": {}, "subtask_type": None, "immutable": False }, { "task": "add", "args": [ 3, 4 ], "kwargs": {}, "options": {}, "subtask_type": None, "immutable": False } ], "body": { "task": "xsum", "args": [], "kwargs": {}, "options": {}, "subtask_type": None, "immutable": False } }, "options": {}, "subtask_type": "chord", "immutable": False } >>> chord_sig = chord.from_dict(chord_dict)
Iterates over the given tasks in the dictionary and convert them to signatures. Chord header needs to be defined in d['kwargs']['header'] as a sequence of tasks. Chord body needs to be defined in d['kwargs']['body'] as a single task.
The tasks themselves can be dictionaries or signatures (or both).
aapply_async
async
¶
aapply_async(
args=None,
kwargs=None,
task_id=None,
producer=None,
publisher=None,
connection=None,
router=None,
result_cls=None,
**options,
)
Async version of :meth:apply_async.
This method shares preparation logic with :meth:apply_async and
delegates to :meth:arun for async chord execution.
Arguments and return value are the same as :meth:apply_async.
aapply
async
¶
Async version of :meth:apply.
Arguments and return value are the same as :meth:apply.
__length_hint__ ¶
Return the number of tasks in this chord's header (recursively).
run ¶
run(
header,
body,
partial_args,
app=None,
interval=None,
countdown=1,
max_retries=None,
eager=False,
task_id=None,
kwargs=None,
**options,
)
Execute the chord.
Executing the chord means executing the header and sending the result to the body. In case of an empty header, the body is executed immediately.
Arguments: header (group): The header to execute. body (Signature): The body to execute. partial_args (tuple): Arguments to pass to the header. app (Celery): The Celery app instance. interval (float): The interval between retries. countdown (int): The countdown between retries. max_retries (int): The maximum number of retries. task_id (str): The task id to use for the body. kwargs (dict): Keyword arguments to pass to the header. options (dict): Options to pass to the header.
Returns: AsyncResult: The result of the body (with the result of the header in the parent of the body).
arun
async
¶
arun(
header,
body,
partial_args,
app=None,
interval=None,
countdown=1,
max_retries=None,
eager=False,
task_id=None,
kwargs=None,
**options,
)
Async version of :meth:run.
Execute the chord asynchronously.
link_error ¶
Links an error callback to the chord body, and potentially the header as well.
Note:
The task_allow_error_cb_on_chord_header setting controls whether
error callbacks are allowed on the header. If this setting is
False (the current default), then the error callback will only be
applied to the body.
set_immutable ¶
Sets the immutable flag on the chord header only.
Note: Does not affect the chord body.
Arguments: immutable (bool): The new mutability value for chord header.
chunks¶
chunks ¶
Bases: Signature
Partition of tasks into chunks of size n.
aapply_async
async
¶
Async version of :meth:apply_async.
This method shares preparation logic with :meth:apply_async and
delegates to the group's async apply method for the actual I/O.
Arguments and return value are the same as :meth:apply_async.
Helpers¶
signature ¶
Create new signature.
- if the first argument is a signature already then it's cloned.
- if the first argument is a dict, then a Signature version is returned.
Returns: Signature: The resulting signature.
maybe_signature ¶
Ensure obj is a signature, or None.
Arguments: d (Optional[Union[abstract.CallableSignature, Mapping]]): Signature or dict-serialized signature. app (celery.Celery): App to bind signature to. clone (bool): If d' is already a signature, the signature will be cloned when this flag is enabled.
Returns: Optional[abstract.CallableSignature]