1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
"""Built-in Tasks. |
3
|
|
|
|
4
|
|
|
The built-in tasks are always available in all app instances. |
5
|
|
|
""" |
6
|
|
|
from __future__ import absolute_import, unicode_literals |
7
|
|
|
|
8
|
|
|
from celery._state import connect_on_app_finalize |
9
|
|
|
from celery.utils.log import get_logger |
10
|
|
|
|
11
|
|
|
__all__ = () |
12
|
|
|
logger = get_logger(__name__) |
13
|
|
|
|
14
|
|
|
|
15
|
|
|
@connect_on_app_finalize |
16
|
|
|
def add_backend_cleanup_task(app): |
17
|
|
|
"""Task used to clean up expired results. |
18
|
|
|
|
19
|
|
|
If the configured backend requires periodic cleanup this task is also |
20
|
|
|
automatically configured to run every day at 4am (requires |
21
|
|
|
:program:`celery beat` to be running). |
22
|
|
|
""" |
23
|
|
|
@app.task(name='celery.backend_cleanup', shared=False, lazy=False) |
24
|
|
|
def backend_cleanup(): |
25
|
|
|
app.backend.cleanup() |
26
|
|
|
return backend_cleanup |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
@connect_on_app_finalize |
30
|
|
|
def add_accumulate_task(app): |
31
|
|
|
"""Task used by Task.replace when replacing task with group.""" |
32
|
|
|
@app.task(bind=True, name='celery.accumulate', shared=False, lazy=False) |
33
|
|
|
def accumulate(self, *args, **kwargs): |
34
|
|
|
index = kwargs.get('index') |
35
|
|
|
return args[index] if index is not None else args |
36
|
|
|
return accumulate |
37
|
|
|
|
38
|
|
|
|
39
|
|
|
@connect_on_app_finalize |
40
|
|
|
def add_unlock_chord_task(app): |
41
|
|
|
"""Task used by result backends without native chord support. |
42
|
|
|
|
43
|
|
|
Will joins chord by creating a task chain polling the header |
44
|
|
|
for completion. |
45
|
|
|
""" |
46
|
|
|
from celery.canvas import maybe_signature |
47
|
|
|
from celery.exceptions import ChordError |
48
|
|
|
from celery.result import allow_join_result, result_from_tuple |
49
|
|
|
|
50
|
|
|
@app.task(name='celery.chord_unlock', max_retries=None, shared=False, |
51
|
|
|
default_retry_delay=1, ignore_result=True, lazy=False, bind=True) |
52
|
|
|
def unlock_chord(self, group_id, callback, interval=None, |
53
|
|
|
max_retries=None, result=None, |
54
|
|
|
Result=app.AsyncResult, GroupResult=app.GroupResult, |
55
|
|
|
result_from_tuple=result_from_tuple, **kwargs): |
56
|
|
|
if interval is None: |
57
|
|
|
interval = self.default_retry_delay |
58
|
|
|
|
59
|
|
|
# check if the task group is ready, and if so apply the callback. |
60
|
|
|
callback = maybe_signature(callback, app) |
61
|
|
|
deps = GroupResult( |
62
|
|
|
group_id, |
63
|
|
|
[result_from_tuple(r, app=app) for r in result], |
64
|
|
|
app=app, |
65
|
|
|
) |
66
|
|
|
j = deps.join_native if deps.supports_native_join else deps.join |
67
|
|
|
|
68
|
|
|
try: |
69
|
|
|
ready = deps.ready() |
70
|
|
|
except Exception as exc: |
71
|
|
|
raise self.retry( |
72
|
|
|
exc=exc, countdown=interval, max_retries=max_retries, |
73
|
|
|
) |
74
|
|
|
else: |
75
|
|
|
if not ready: |
76
|
|
|
raise self.retry(countdown=interval, max_retries=max_retries) |
77
|
|
|
|
78
|
|
|
callback = maybe_signature(callback, app=app) |
79
|
|
|
try: |
80
|
|
|
with allow_join_result(): |
81
|
|
|
ret = j(timeout=3.0, propagate=True) |
82
|
|
|
except Exception as exc: # pylint: disable=broad-except |
83
|
|
|
try: |
84
|
|
|
culprit = next(deps._failed_join_report()) |
85
|
|
|
reason = 'Dependency {0.id} raised {1!r}'.format(culprit, exc) |
86
|
|
|
except StopIteration: |
87
|
|
|
reason = repr(exc) |
88
|
|
|
logger.exception('Chord %r raised: %r', group_id, exc) |
89
|
|
|
app.backend.chord_error_from_stack(callback, ChordError(reason)) |
90
|
|
|
else: |
91
|
|
|
try: |
92
|
|
|
callback.delay(ret) |
93
|
|
|
except Exception as exc: # pylint: disable=broad-except |
94
|
|
|
logger.exception('Chord %r raised: %r', group_id, exc) |
95
|
|
|
app.backend.chord_error_from_stack( |
96
|
|
|
callback, |
97
|
|
|
exc=ChordError('Callback error: {0!r}'.format(exc)), |
98
|
|
|
) |
99
|
|
|
return unlock_chord |
100
|
|
|
|
101
|
|
|
|
102
|
|
|
@connect_on_app_finalize |
103
|
|
|
def add_map_task(app): |
104
|
|
|
from celery.canvas import signature |
105
|
|
|
|
106
|
|
|
@app.task(name='celery.map', shared=False, lazy=False) |
107
|
|
|
def xmap(task, it): |
108
|
|
|
task = signature(task, app=app).type |
109
|
|
|
return [task(item) for item in it] |
110
|
|
|
return xmap |
111
|
|
|
|
112
|
|
|
|
113
|
|
|
@connect_on_app_finalize |
114
|
|
|
def add_starmap_task(app): |
115
|
|
|
from celery.canvas import signature |
116
|
|
|
|
117
|
|
|
@app.task(name='celery.starmap', shared=False, lazy=False) |
118
|
|
|
def xstarmap(task, it): |
119
|
|
|
task = signature(task, app=app).type |
120
|
|
|
return [task(*item) for item in it] |
121
|
|
|
return xstarmap |
122
|
|
|
|
123
|
|
|
|
124
|
|
|
@connect_on_app_finalize |
125
|
|
|
def add_chunk_task(app): |
126
|
|
|
from celery.canvas import chunks as _chunks |
127
|
|
|
|
128
|
|
|
@app.task(name='celery.chunks', shared=False, lazy=False) |
129
|
|
|
def chunks(task, it, n): |
130
|
|
|
return _chunks.apply_chunks(task, it, n) |
131
|
|
|
return chunks |
132
|
|
|
|
133
|
|
|
|
134
|
|
|
@connect_on_app_finalize |
135
|
|
|
def add_group_task(app): |
136
|
|
|
"""No longer used, but here for backwards compatibility.""" |
137
|
|
|
from celery.canvas import maybe_signature |
138
|
|
|
from celery.result import result_from_tuple |
139
|
|
|
|
140
|
|
|
@app.task(name='celery.group', bind=True, shared=False, lazy=False) |
141
|
|
|
def group(self, tasks, result, group_id, partial_args, add_to_parent=True): |
142
|
|
|
app = self.app |
143
|
|
|
result = result_from_tuple(result, app) |
144
|
|
|
# any partial args are added to all tasks in the group |
145
|
|
|
taskit = (maybe_signature(task, app=app).clone(partial_args) |
146
|
|
|
for i, task in enumerate(tasks)) |
147
|
|
|
with app.producer_or_acquire() as producer: |
148
|
|
|
[stask.apply_async(group_id=group_id, producer=producer, |
149
|
|
|
add_to_parent=False) for stask in taskit] |
150
|
|
|
parent = app.current_worker_task |
151
|
|
|
if add_to_parent and parent: |
152
|
|
|
parent.add_trail(result) |
153
|
|
|
return result |
154
|
|
|
return group |
155
|
|
|
|
156
|
|
|
|
157
|
|
|
@connect_on_app_finalize |
158
|
|
|
def add_chain_task(app): |
159
|
|
|
"""No longer used, but here for backwards compatibility.""" |
160
|
|
|
@app.task(name='celery.chain', shared=False, lazy=False) |
161
|
|
|
def chain(*args, **kwargs): |
162
|
|
|
raise NotImplementedError('chain is not a real task') |
163
|
|
|
return chain |
164
|
|
|
|
165
|
|
|
|
166
|
|
|
@connect_on_app_finalize |
167
|
|
|
def add_chord_task(app): |
168
|
|
|
"""No longer used, but here for backwards compatibility.""" |
169
|
|
|
from celery import group, chord as _chord |
170
|
|
|
from celery.canvas import maybe_signature |
171
|
|
|
|
172
|
|
|
@app.task(name='celery.chord', bind=True, ignore_result=False, |
173
|
|
|
shared=False, lazy=False) |
174
|
|
|
def chord(self, header, body, partial_args=(), interval=None, |
175
|
|
|
countdown=1, max_retries=None, eager=False, **kwargs): |
176
|
|
|
app = self.app |
177
|
|
|
# - convert back to group if serialized |
178
|
|
|
tasks = header.tasks if isinstance(header, group) else header |
179
|
|
|
header = group([ |
180
|
|
|
maybe_signature(s, app=app) for s in tasks |
181
|
|
|
], app=self.app) |
182
|
|
|
body = maybe_signature(body, app=app) |
183
|
|
|
ch = _chord(header, body) |
184
|
|
|
return ch.run(header, body, partial_args, app, interval, |
185
|
|
|
countdown, max_retries, **kwargs) |
186
|
|
|
return chord |
187
|
|
|
|