1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
"""Actual App instance implementation.""" |
3
|
|
|
from __future__ import absolute_import, unicode_literals |
4
|
|
|
|
5
|
|
|
import os |
6
|
|
|
import threading |
7
|
|
|
import warnings |
8
|
|
|
from collections import defaultdict, deque |
9
|
|
|
from datetime import datetime |
10
|
|
|
from operator import attrgetter |
11
|
|
|
|
12
|
|
|
from kombu import pools |
13
|
|
|
from kombu.clocks import LamportClock |
14
|
|
|
from kombu.common import oid_from |
15
|
|
|
from kombu.utils.compat import register_after_fork |
16
|
|
|
from kombu.utils.objects import cached_property |
17
|
|
|
from kombu.utils.uuid import uuid |
18
|
|
|
from vine import starpromise |
19
|
|
|
from vine.utils import wraps |
20
|
|
|
|
21
|
|
|
from celery import platforms, signals |
22
|
|
|
from celery._state import (_announce_app_finalized, _deregister_app, |
23
|
|
|
_register_app, _set_current_app, _task_stack, |
24
|
|
|
connect_on_app_finalize, get_current_app, |
25
|
|
|
get_current_worker_task, set_default_app) |
26
|
|
|
from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured |
27
|
|
|
from celery.five import (UserDict, bytes_if_py2, python_2_unicode_compatible, |
28
|
|
|
values) |
29
|
|
|
from celery.loaders import get_loader_cls |
30
|
|
|
from celery.local import PromiseProxy, maybe_evaluate |
31
|
|
|
from celery.utils import abstract |
32
|
|
|
from celery.utils.collections import AttributeDictMixin |
33
|
|
|
from celery.utils.dispatch import Signal |
34
|
|
|
from celery.utils.functional import first, head_from_fun, maybe_list |
35
|
|
|
from celery.utils.imports import gen_task_name, instantiate, symbol_by_name |
36
|
|
|
from celery.utils.log import get_logger |
37
|
|
|
from celery.utils.objects import FallbackContext, mro_lookup |
38
|
|
|
from celery.utils.time import (get_exponential_backoff_interval, timezone, |
39
|
|
|
to_utc) |
40
|
|
|
|
41
|
|
|
# Load all builtin tasks |
42
|
|
|
from . import builtins # noqa |
43
|
|
|
from . import backends |
44
|
|
|
from .annotations import prepare as prepare_annotations |
45
|
|
|
from .defaults import find_deprecated_settings |
46
|
|
|
from .registry import TaskRegistry |
47
|
|
|
from .utils import (AppPickler, Settings, _new_key_to_old, _old_key_to_new, |
48
|
|
|
_unpickle_app, _unpickle_app_v2, appstr, bugreport, |
49
|
|
|
detect_settings) |
50
|
|
|
|
51
|
|
|
__all__ = ('Celery',) |
52
|
|
|
|
53
|
|
|
logger = get_logger(__name__) |
54
|
|
|
|
55
|
|
|
BUILTIN_FIXUPS = { |
56
|
|
|
'celery.fixups.django:fixup', |
57
|
|
|
} |
58
|
|
|
USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING') |
59
|
|
|
|
60
|
|
|
ERR_ENVVAR_NOT_SET = """ |
61
|
|
|
The environment variable {0!r} is not set, |
62
|
|
|
and as such the configuration could not be loaded. |
63
|
|
|
|
64
|
|
|
Please set this variable and make sure it points to |
65
|
|
|
a valid configuration module. |
66
|
|
|
|
67
|
|
|
Example: |
68
|
|
|
{0}="proj.celeryconfig" |
69
|
|
|
""" |
70
|
|
|
|
71
|
|
|
|
72
|
|
|
def app_has_custom(app, attr): |
73
|
|
|
"""Return true if app has customized method `attr`. |
74
|
|
|
|
75
|
|
|
Note: |
76
|
|
|
This is used for optimizations in cases where we know |
77
|
|
|
how the default behavior works, but need to account |
78
|
|
|
for someone using inheritance to override a method/property. |
79
|
|
|
""" |
80
|
|
|
return mro_lookup(app.__class__, attr, stop={Celery, object}, |
81
|
|
|
monkey_patched=[__name__]) |
82
|
|
|
|
83
|
|
|
|
84
|
|
|
def _unpickle_appattr(reverse_name, args): |
85
|
|
|
"""Unpickle app.""" |
86
|
|
|
# Given an attribute name and a list of args, gets |
87
|
|
|
# the attribute from the current app and calls it. |
88
|
|
|
return get_current_app()._rgetattr(reverse_name)(*args) |
89
|
|
|
|
90
|
|
|
|
91
|
|
|
def _after_fork_cleanup_app(app): |
92
|
|
|
# This is used with multiprocessing.register_after_fork, |
93
|
|
|
# so need to be at module level. |
94
|
|
|
try: |
95
|
|
|
app._after_fork() |
96
|
|
|
except Exception as exc: # pylint: disable=broad-except |
97
|
|
|
logger.info('after forker raised exception: %r', exc, exc_info=1) |
98
|
|
|
|
99
|
|
|
|
100
|
|
|
class PendingConfiguration(UserDict, AttributeDictMixin): |
101
|
|
|
# `app.conf` will be of this type before being explicitly configured, |
102
|
|
|
# meaning the app can keep any configuration set directly |
103
|
|
|
# on `app.conf` before the `app.config_from_object` call. |
104
|
|
|
# |
105
|
|
|
# accessing any key will finalize the configuration, |
106
|
|
|
# replacing `app.conf` with a concrete settings object. |
107
|
|
|
|
108
|
|
|
callback = None |
109
|
|
|
_data = None |
110
|
|
|
|
111
|
|
|
def __init__(self, conf, callback): |
112
|
|
|
object.__setattr__(self, '_data', conf) |
113
|
|
|
object.__setattr__(self, 'callback', callback) |
114
|
|
|
|
115
|
|
|
def __setitem__(self, key, value): |
116
|
|
|
self._data[key] = value |
117
|
|
|
|
118
|
|
|
def clear(self): |
119
|
|
|
self._data.clear() |
120
|
|
|
|
121
|
|
|
def update(self, *args, **kwargs): |
122
|
|
|
self._data.update(*args, **kwargs) |
123
|
|
|
|
124
|
|
|
def setdefault(self, *args, **kwargs): |
125
|
|
|
return self._data.setdefault(*args, **kwargs) |
126
|
|
|
|
127
|
|
|
def __contains__(self, key): |
128
|
|
|
# XXX will not show finalized configuration |
129
|
|
|
# setdefault will cause `key in d` to happen, |
130
|
|
|
# so for setdefault to be lazy, so does contains. |
131
|
|
|
return key in self._data |
132
|
|
|
|
133
|
|
|
def __len__(self): |
134
|
|
|
return len(self.data) |
135
|
|
|
|
136
|
|
|
def __repr__(self): |
137
|
|
|
return repr(self.data) |
138
|
|
|
|
139
|
|
|
@cached_property |
140
|
|
|
def data(self): |
141
|
|
|
return self.callback() |
142
|
|
|
|
143
|
|
|
|
144
|
|
|
@python_2_unicode_compatible |
145
|
|
|
class Celery(object): |
146
|
|
|
"""Celery application. |
147
|
|
|
|
148
|
|
|
Arguments: |
149
|
|
|
main (str): Name of the main module if running as `__main__`. |
150
|
|
|
This is used as the prefix for auto-generated task names. |
151
|
|
|
|
152
|
|
|
Keyword Arguments: |
153
|
|
|
broker (str): URL of the default broker used. |
154
|
|
|
backend (Union[str, type]): The result store backend class, |
155
|
|
|
or the name of the backend class to use. |
156
|
|
|
|
157
|
|
|
Default is the value of the :setting:`result_backend` setting. |
158
|
|
|
autofinalize (bool): If set to False a :exc:`RuntimeError` |
159
|
|
|
will be raised if the task registry or tasks are used before |
160
|
|
|
the app is finalized. |
161
|
|
|
set_as_current (bool): Make this the global current app. |
162
|
|
|
include (List[str]): List of modules every worker should import. |
163
|
|
|
|
164
|
|
|
amqp (Union[str, type]): AMQP object or class name. |
165
|
|
|
events (Union[str, type]): Events object or class name. |
166
|
|
|
log (Union[str, type]): Log object or class name. |
167
|
|
|
control (Union[str, type]): Control object or class name. |
168
|
|
|
tasks (Union[str, type]): A task registry, or the name of |
169
|
|
|
a registry class. |
170
|
|
|
fixups (List[str]): List of fix-up plug-ins (e.g., see |
171
|
|
|
:mod:`celery.fixups.django`). |
172
|
|
|
config_source (Union[str, type]): Take configuration from a class, |
173
|
|
|
or object. Attributes may include any settings described in |
174
|
|
|
the documentation. |
175
|
|
|
""" |
176
|
|
|
|
177
|
|
|
#: This is deprecated, use :meth:`reduce_keys` instead |
178
|
|
|
Pickler = AppPickler |
179
|
|
|
|
180
|
|
|
SYSTEM = platforms.SYSTEM |
181
|
|
|
IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS |
182
|
|
|
|
183
|
|
|
#: Name of the `__main__` module. Required for standalone scripts. |
184
|
|
|
#: |
185
|
|
|
#: If set this will be used instead of `__main__` when automatically |
186
|
|
|
#: generating task names. |
187
|
|
|
main = None |
188
|
|
|
|
189
|
|
|
#: Custom options for command-line programs. |
190
|
|
|
#: See :ref:`extending-commandoptions` |
191
|
|
|
user_options = None |
192
|
|
|
|
193
|
|
|
#: Custom bootsteps to extend and modify the worker. |
194
|
|
|
#: See :ref:`extending-bootsteps`. |
195
|
|
|
steps = None |
196
|
|
|
|
197
|
|
|
builtin_fixups = BUILTIN_FIXUPS |
198
|
|
|
|
199
|
|
|
amqp_cls = 'celery.app.amqp:AMQP' |
200
|
|
|
backend_cls = None |
201
|
|
|
events_cls = 'celery.app.events:Events' |
202
|
|
|
loader_cls = None |
203
|
|
|
log_cls = 'celery.app.log:Logging' |
204
|
|
|
control_cls = 'celery.app.control:Control' |
205
|
|
|
task_cls = 'celery.app.task:Task' |
206
|
|
|
registry_cls = 'celery.app.registry:TaskRegistry' |
207
|
|
|
|
208
|
|
|
_fixups = None |
209
|
|
|
_pool = None |
210
|
|
|
_conf = None |
211
|
|
|
_after_fork_registered = False |
212
|
|
|
|
213
|
|
|
#: Signal sent when app is loading configuration. |
214
|
|
|
on_configure = None |
215
|
|
|
|
216
|
|
|
#: Signal sent after app has prepared the configuration. |
217
|
|
|
on_after_configure = None |
218
|
|
|
|
219
|
|
|
#: Signal sent after app has been finalized. |
220
|
|
|
on_after_finalize = None |
221
|
|
|
|
222
|
|
|
#: Signal sent by every new process after fork. |
223
|
|
|
on_after_fork = None |
224
|
|
|
|
225
|
|
|
def __init__(self, main=None, loader=None, backend=None, |
226
|
|
|
amqp=None, events=None, log=None, control=None, |
227
|
|
|
set_as_current=True, tasks=None, broker=None, include=None, |
228
|
|
|
changes=None, config_source=None, fixups=None, task_cls=None, |
229
|
|
|
autofinalize=True, namespace=None, strict_typing=True, |
230
|
|
|
**kwargs): |
231
|
|
|
self.clock = LamportClock() |
232
|
|
|
self.main = main |
233
|
|
|
self.amqp_cls = amqp or self.amqp_cls |
234
|
|
|
self.events_cls = events or self.events_cls |
235
|
|
|
self.loader_cls = loader or self._get_default_loader() |
236
|
|
|
self.log_cls = log or self.log_cls |
237
|
|
|
self.control_cls = control or self.control_cls |
238
|
|
|
self.task_cls = task_cls or self.task_cls |
239
|
|
|
self.set_as_current = set_as_current |
240
|
|
|
self.registry_cls = symbol_by_name(self.registry_cls) |
241
|
|
|
self.user_options = defaultdict(set) |
242
|
|
|
self.steps = defaultdict(set) |
243
|
|
|
self.autofinalize = autofinalize |
244
|
|
|
self.namespace = namespace |
245
|
|
|
self.strict_typing = strict_typing |
246
|
|
|
|
247
|
|
|
self.configured = False |
248
|
|
|
self._config_source = config_source |
249
|
|
|
self._pending_defaults = deque() |
250
|
|
|
self._pending_periodic_tasks = deque() |
251
|
|
|
|
252
|
|
|
self.finalized = False |
253
|
|
|
self._finalize_mutex = threading.Lock() |
254
|
|
|
self._pending = deque() |
255
|
|
|
self._tasks = tasks |
256
|
|
|
if not isinstance(self._tasks, TaskRegistry): |
257
|
|
|
self._tasks = self.registry_cls(self._tasks or {}) |
258
|
|
|
|
259
|
|
|
# If the class defines a custom __reduce_args__ we need to use |
260
|
|
|
# the old way of pickling apps: pickling a list of |
261
|
|
|
# args instead of the new way that pickles a dict of keywords. |
262
|
|
|
self._using_v1_reduce = app_has_custom(self, '__reduce_args__') |
263
|
|
|
|
264
|
|
|
# these options are moved to the config to |
265
|
|
|
# simplify pickling of the app object. |
266
|
|
|
self._preconf = changes or {} |
267
|
|
|
self._preconf_set_by_auto = set() |
268
|
|
|
self.__autoset('broker_url', broker) |
269
|
|
|
self.__autoset('result_backend', backend) |
270
|
|
|
self.__autoset('include', include) |
271
|
|
|
self._conf = Settings( |
272
|
|
|
PendingConfiguration( |
273
|
|
|
self._preconf, self._finalize_pending_conf), |
274
|
|
|
prefix=self.namespace, |
275
|
|
|
keys=(_old_key_to_new, _new_key_to_old), |
276
|
|
|
) |
277
|
|
|
|
278
|
|
|
# - Apply fix-ups. |
279
|
|
|
self.fixups = set(self.builtin_fixups) if fixups is None else fixups |
280
|
|
|
# ...store fixup instances in _fixups to keep weakrefs alive. |
281
|
|
|
self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups] |
282
|
|
|
|
283
|
|
|
if self.set_as_current: |
284
|
|
|
self.set_current() |
285
|
|
|
|
286
|
|
|
# Signals |
287
|
|
|
if self.on_configure is None: |
288
|
|
|
# used to be a method pre 4.0 |
289
|
|
|
self.on_configure = Signal(name='app.on_configure') |
290
|
|
|
self.on_after_configure = Signal( |
291
|
|
|
name='app.on_after_configure', |
292
|
|
|
providing_args={'source'}, |
293
|
|
|
) |
294
|
|
|
self.on_after_finalize = Signal(name='app.on_after_finalize') |
295
|
|
|
self.on_after_fork = Signal(name='app.on_after_fork') |
296
|
|
|
|
297
|
|
|
self.on_init() |
298
|
|
|
_register_app(self) |
299
|
|
|
|
300
|
|
|
def _get_default_loader(self): |
301
|
|
|
# the --loader command-line argument sets the environment variable. |
302
|
|
|
return ( |
303
|
|
|
os.environ.get('CELERY_LOADER') or |
304
|
|
|
self.loader_cls or |
305
|
|
|
'celery.loaders.app:AppLoader' |
306
|
|
|
) |
307
|
|
|
|
308
|
|
|
def on_init(self): |
309
|
|
|
"""Optional callback called at init.""" |
310
|
|
|
|
311
|
|
|
def __autoset(self, key, value): |
312
|
|
|
if value: |
313
|
|
|
self._preconf[key] = value |
314
|
|
|
self._preconf_set_by_auto.add(key) |
315
|
|
|
|
316
|
|
|
def set_current(self): |
317
|
|
|
"""Make this the current app for this thread.""" |
318
|
|
|
_set_current_app(self) |
319
|
|
|
|
320
|
|
|
def set_default(self): |
321
|
|
|
"""Make this the default app for all threads.""" |
322
|
|
|
set_default_app(self) |
323
|
|
|
|
324
|
|
|
def _ensure_after_fork(self): |
325
|
|
|
if not self._after_fork_registered: |
326
|
|
|
self._after_fork_registered = True |
327
|
|
|
if register_after_fork is not None: |
328
|
|
|
register_after_fork(self, _after_fork_cleanup_app) |
329
|
|
|
|
330
|
|
|
def close(self): |
331
|
|
|
"""Clean up after the application. |
332
|
|
|
|
333
|
|
|
Only necessary for dynamically created apps, and you should |
334
|
|
|
probably use the :keyword:`with` statement instead. |
335
|
|
|
|
336
|
|
|
Example: |
337
|
|
|
>>> with Celery(set_as_current=False) as app: |
338
|
|
|
... with app.connection_for_write() as conn: |
339
|
|
|
... pass |
340
|
|
|
""" |
341
|
|
|
self._pool = None |
342
|
|
|
_deregister_app(self) |
343
|
|
|
|
344
|
|
|
def start(self, argv=None): |
345
|
|
|
"""Run :program:`celery` using `argv`. |
346
|
|
|
|
347
|
|
|
Uses :data:`sys.argv` if `argv` is not specified. |
348
|
|
|
""" |
349
|
|
|
return instantiate( |
350
|
|
|
'celery.bin.celery:CeleryCommand', app=self |
351
|
|
|
).execute_from_commandline(argv) |
352
|
|
|
|
353
|
|
|
def worker_main(self, argv=None): |
354
|
|
|
"""Run :program:`celery worker` using `argv`. |
355
|
|
|
|
356
|
|
|
Uses :data:`sys.argv` if `argv` is not specified. |
357
|
|
|
""" |
358
|
|
|
return instantiate( |
359
|
|
|
'celery.bin.worker:worker', app=self |
360
|
|
|
).execute_from_commandline(argv) |
361
|
|
|
|
362
|
|
|
def task(self, *args, **opts): |
363
|
|
|
"""Decorator to create a task class out of any callable. |
364
|
|
|
|
365
|
|
|
Examples: |
366
|
|
|
.. code-block:: python |
367
|
|
|
|
368
|
|
|
@app.task |
369
|
|
|
def refresh_feed(url): |
370
|
|
|
store_feed(feedparser.parse(url)) |
371
|
|
|
|
372
|
|
|
with setting extra options: |
373
|
|
|
|
374
|
|
|
.. code-block:: python |
375
|
|
|
|
376
|
|
|
@app.task(exchange='feeds') |
377
|
|
|
def refresh_feed(url): |
378
|
|
|
return store_feed(feedparser.parse(url)) |
379
|
|
|
|
380
|
|
|
Note: |
381
|
|
|
App Binding: For custom apps the task decorator will return |
382
|
|
|
a proxy object, so that the act of creating the task is not |
383
|
|
|
performed until the task is used or the task registry is accessed. |
384
|
|
|
|
385
|
|
|
If you're depending on binding to be deferred, then you must |
386
|
|
|
not access any attributes on the returned object until the |
387
|
|
|
application is fully set up (finalized). |
388
|
|
|
""" |
389
|
|
|
if USING_EXECV and opts.get('lazy', True): |
390
|
|
|
# When using execv the task in the original module will point to a |
391
|
|
|
# different app, so doing things like 'add.request' will point to |
392
|
|
|
# a different task instance. This makes sure it will always use |
393
|
|
|
# the task instance from the current app. |
394
|
|
|
# Really need a better solution for this :( |
395
|
|
|
from . import shared_task |
396
|
|
|
return shared_task(*args, lazy=False, **opts) |
397
|
|
|
|
398
|
|
|
def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): |
399
|
|
|
_filt = filter # stupid 2to3 |
400
|
|
|
|
401
|
|
|
def _create_task_cls(fun): |
402
|
|
|
if shared: |
403
|
|
|
def cons(app): |
404
|
|
|
return app._task_from_fun(fun, **opts) |
405
|
|
|
cons.__name__ = fun.__name__ |
406
|
|
|
connect_on_app_finalize(cons) |
407
|
|
|
if not lazy or self.finalized: |
408
|
|
|
ret = self._task_from_fun(fun, **opts) |
409
|
|
|
else: |
410
|
|
|
# return a proxy object that evaluates on first use |
411
|
|
|
ret = PromiseProxy(self._task_from_fun, (fun,), opts, |
412
|
|
|
__doc__=fun.__doc__) |
413
|
|
|
self._pending.append(ret) |
414
|
|
|
if _filt: |
415
|
|
|
return _filt(ret) |
416
|
|
|
return ret |
417
|
|
|
|
418
|
|
|
return _create_task_cls |
419
|
|
|
|
420
|
|
|
if len(args) == 1: |
421
|
|
|
if callable(args[0]): |
422
|
|
|
return inner_create_task_cls(**opts)(*args) |
423
|
|
|
raise TypeError('argument 1 to @task() must be a callable') |
424
|
|
|
if args: |
425
|
|
|
raise TypeError( |
426
|
|
|
'@task() takes exactly 1 argument ({0} given)'.format( |
427
|
|
|
sum([len(args), len(opts)]))) |
428
|
|
|
return inner_create_task_cls(**opts) |
429
|
|
|
|
430
|
|
|
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): |
431
|
|
|
if not self.finalized and not self.autofinalize: |
432
|
|
|
raise RuntimeError('Contract breach: app not finalized') |
433
|
|
|
name = name or self.gen_task_name(fun.__name__, fun.__module__) |
434
|
|
|
base = base or self.Task |
435
|
|
|
|
436
|
|
|
if name not in self._tasks: |
437
|
|
|
run = fun if bind else staticmethod(fun) |
438
|
|
|
task = type(fun.__name__, (base,), dict({ |
439
|
|
|
'app': self, |
440
|
|
|
'name': name, |
441
|
|
|
'run': run, |
442
|
|
|
'_decorated': True, |
443
|
|
|
'__doc__': fun.__doc__, |
444
|
|
|
'__module__': fun.__module__, |
445
|
|
|
'__header__': staticmethod(head_from_fun(fun, bound=bind)), |
446
|
|
|
'__wrapped__': run}, **options))() |
447
|
|
|
# for some reason __qualname__ cannot be set in type() |
448
|
|
|
# so we have to set it here. |
449
|
|
|
try: |
450
|
|
|
task.__qualname__ = fun.__qualname__ |
451
|
|
|
except AttributeError: |
452
|
|
|
pass |
453
|
|
|
self._tasks[task.name] = task |
454
|
|
|
task.bind(self) # connects task to this app |
455
|
|
|
|
456
|
|
|
autoretry_for = tuple(options.get('autoretry_for', ())) |
457
|
|
|
retry_kwargs = options.get('retry_kwargs', {}) |
458
|
|
|
retry_backoff = int(options.get('retry_backoff', False)) |
459
|
|
|
retry_backoff_max = int(options.get('retry_backoff_max', 600)) |
460
|
|
|
retry_jitter = options.get('retry_jitter', True) |
461
|
|
|
|
462
|
|
|
if autoretry_for and not hasattr(task, '_orig_run'): |
463
|
|
|
|
464
|
|
|
@wraps(task.run) |
465
|
|
|
def run(*args, **kwargs): |
466
|
|
|
try: |
467
|
|
|
return task._orig_run(*args, **kwargs) |
468
|
|
|
except autoretry_for as exc: |
469
|
|
|
if retry_backoff: |
470
|
|
|
retry_kwargs['countdown'] = \ |
471
|
|
|
get_exponential_backoff_interval( |
472
|
|
|
factor=retry_backoff, |
473
|
|
|
retries=task.request.retries, |
474
|
|
|
maximum=retry_backoff_max, |
475
|
|
|
full_jitter=retry_jitter) |
476
|
|
|
raise task.retry(exc=exc, **retry_kwargs) |
477
|
|
|
|
478
|
|
|
task._orig_run, task.run = task.run, run |
479
|
|
|
else: |
480
|
|
|
task = self._tasks[name] |
481
|
|
|
return task |
482
|
|
|
|
483
|
|
|
def register_task(self, task): |
484
|
|
|
"""Utility for registering a task-based class. |
485
|
|
|
|
486
|
|
|
Note: |
487
|
|
|
This is here for compatibility with old Celery 1.0 |
488
|
|
|
style task classes, you should not need to use this for |
489
|
|
|
new projects. |
490
|
|
|
""" |
491
|
|
|
if not task.name: |
492
|
|
|
task_cls = type(task) |
493
|
|
|
task.name = self.gen_task_name( |
494
|
|
|
task_cls.__name__, task_cls.__module__) |
495
|
|
|
self.tasks[task.name] = task |
496
|
|
|
task._app = self |
497
|
|
|
task.bind(self) |
498
|
|
|
return task |
499
|
|
|
|
500
|
|
|
def gen_task_name(self, name, module): |
501
|
|
|
return gen_task_name(self, name, module) |
502
|
|
|
|
503
|
|
|
def finalize(self, auto=False): |
504
|
|
|
"""Finalize the app. |
505
|
|
|
|
506
|
|
|
This loads built-in tasks, evaluates pending task decorators, |
507
|
|
|
reads configuration, etc. |
508
|
|
|
""" |
509
|
|
|
with self._finalize_mutex: |
510
|
|
|
if not self.finalized: |
511
|
|
|
if auto and not self.autofinalize: |
512
|
|
|
raise RuntimeError('Contract breach: app not finalized') |
513
|
|
|
self.finalized = True |
514
|
|
|
_announce_app_finalized(self) |
515
|
|
|
|
516
|
|
|
pending = self._pending |
517
|
|
|
while pending: |
518
|
|
|
maybe_evaluate(pending.popleft()) |
519
|
|
|
|
520
|
|
|
for task in values(self._tasks): |
521
|
|
|
task.bind(self) |
522
|
|
|
|
523
|
|
|
self.on_after_finalize.send(sender=self) |
524
|
|
|
|
525
|
|
|
def add_defaults(self, fun): |
526
|
|
|
"""Add default configuration from dict ``d``. |
527
|
|
|
|
528
|
|
|
If the argument is a callable function then it will be regarded |
529
|
|
|
as a promise, and it won't be loaded until the configuration is |
530
|
|
|
actually needed. |
531
|
|
|
|
532
|
|
|
This method can be compared to: |
533
|
|
|
|
534
|
|
|
.. code-block:: pycon |
535
|
|
|
|
536
|
|
|
>>> celery.conf.update(d) |
537
|
|
|
|
538
|
|
|
with a difference that 1) no copy will be made and 2) the dict will |
539
|
|
|
not be transferred when the worker spawns child processes, so |
540
|
|
|
it's important that the same configuration happens at import time |
541
|
|
|
when pickle restores the object on the other side. |
542
|
|
|
""" |
543
|
|
|
if not callable(fun): |
544
|
|
|
d, fun = fun, lambda: d |
545
|
|
|
if self.configured: |
546
|
|
|
return self._conf.add_defaults(fun()) |
547
|
|
|
self._pending_defaults.append(fun) |
548
|
|
|
|
549
|
|
|
def config_from_object(self, obj, |
550
|
|
|
silent=False, force=False, namespace=None): |
551
|
|
|
"""Read configuration from object. |
552
|
|
|
|
553
|
|
|
Object is either an actual object or the name of a module to import. |
554
|
|
|
|
555
|
|
|
Example: |
556
|
|
|
>>> celery.config_from_object('myapp.celeryconfig') |
557
|
|
|
|
558
|
|
|
>>> from myapp import celeryconfig |
559
|
|
|
>>> celery.config_from_object(celeryconfig) |
560
|
|
|
|
561
|
|
|
Arguments: |
562
|
|
|
silent (bool): If true then import errors will be ignored. |
563
|
|
|
force (bool): Force reading configuration immediately. |
564
|
|
|
By default the configuration will be read only when required. |
565
|
|
|
""" |
566
|
|
|
self._config_source = obj |
567
|
|
|
self.namespace = namespace or self.namespace |
568
|
|
|
if force or self.configured: |
569
|
|
|
self._conf = None |
570
|
|
|
if self.loader.config_from_object(obj, silent=silent): |
571
|
|
|
return self.conf |
572
|
|
|
|
573
|
|
|
def config_from_envvar(self, variable_name, silent=False, force=False): |
574
|
|
|
"""Read configuration from environment variable. |
575
|
|
|
|
576
|
|
|
The value of the environment variable must be the name |
577
|
|
|
of a module to import. |
578
|
|
|
|
579
|
|
|
Example: |
580
|
|
|
>>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig' |
581
|
|
|
>>> celery.config_from_envvar('CELERY_CONFIG_MODULE') |
582
|
|
|
""" |
583
|
|
|
module_name = os.environ.get(variable_name) |
584
|
|
|
if not module_name: |
585
|
|
|
if silent: |
586
|
|
|
return False |
587
|
|
|
raise ImproperlyConfigured( |
588
|
|
|
ERR_ENVVAR_NOT_SET.strip().format(variable_name)) |
589
|
|
|
return self.config_from_object(module_name, silent=silent, force=force) |
590
|
|
|
|
591
|
|
|
def config_from_cmdline(self, argv, namespace='celery'): |
592
|
|
|
self._conf.update( |
593
|
|
|
self.loader.cmdline_config_parser(argv, namespace) |
594
|
|
|
) |
595
|
|
|
|
596
|
|
|
def setup_security(self, allowed_serializers=None, key=None, cert=None, |
597
|
|
|
store=None, digest='sha1', serializer='json'): |
598
|
|
|
"""Setup the message-signing serializer. |
599
|
|
|
|
600
|
|
|
This will affect all application instances (a global operation). |
601
|
|
|
|
602
|
|
|
Disables untrusted serializers and if configured to use the ``auth`` |
603
|
|
|
serializer will register the ``auth`` serializer with the provided |
604
|
|
|
settings into the Kombu serializer registry. |
605
|
|
|
|
606
|
|
|
Arguments: |
607
|
|
|
allowed_serializers (Set[str]): List of serializer names, or |
608
|
|
|
content_types that should be exempt from being disabled. |
609
|
|
|
key (str): Name of private key file to use. |
610
|
|
|
Defaults to the :setting:`security_key` setting. |
611
|
|
|
cert (str): Name of certificate file to use. |
612
|
|
|
Defaults to the :setting:`security_certificate` setting. |
613
|
|
|
store (str): Directory containing certificates. |
614
|
|
|
Defaults to the :setting:`security_cert_store` setting. |
615
|
|
|
digest (str): Digest algorithm used when signing messages. |
616
|
|
|
Default is ``sha1``. |
617
|
|
|
serializer (str): Serializer used to encode messages after |
618
|
|
|
they've been signed. See :setting:`task_serializer` for |
619
|
|
|
the serializers supported. Default is ``json``. |
620
|
|
|
""" |
621
|
|
|
from celery.security import setup_security |
622
|
|
|
return setup_security(allowed_serializers, key, cert, |
623
|
|
|
store, digest, serializer, app=self) |
624
|
|
|
|
625
|
|
|
def autodiscover_tasks(self, packages=None, |
626
|
|
|
related_name='tasks', force=False): |
627
|
|
|
"""Auto-discover task modules. |
628
|
|
|
|
629
|
|
|
Searches a list of packages for a "tasks.py" module (or use |
630
|
|
|
related_name argument). |
631
|
|
|
|
632
|
|
|
If the name is empty, this will be delegated to fix-ups (e.g., Django). |
633
|
|
|
|
634
|
|
|
For example if you have a directory layout like this: |
635
|
|
|
|
636
|
|
|
.. code-block:: text |
637
|
|
|
|
638
|
|
|
foo/__init__.py |
639
|
|
|
tasks.py |
640
|
|
|
models.py |
641
|
|
|
|
642
|
|
|
bar/__init__.py |
643
|
|
|
tasks.py |
644
|
|
|
models.py |
645
|
|
|
|
646
|
|
|
baz/__init__.py |
647
|
|
|
models.py |
648
|
|
|
|
649
|
|
|
Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will |
650
|
|
|
result in the modules ``foo.tasks`` and ``bar.tasks`` being imported. |
651
|
|
|
|
652
|
|
|
Arguments: |
653
|
|
|
packages (List[str]): List of packages to search. |
654
|
|
|
This argument may also be a callable, in which case the |
655
|
|
|
value returned is used (for lazy evaluation). |
656
|
|
|
related_name (str): The name of the module to find. Defaults |
657
|
|
|
to "tasks": meaning "look for 'module.tasks' for every |
658
|
|
|
module in ``packages``." |
659
|
|
|
force (bool): By default this call is lazy so that the actual |
660
|
|
|
auto-discovery won't happen until an application imports |
661
|
|
|
the default modules. Forcing will cause the auto-discovery |
662
|
|
|
to happen immediately. |
663
|
|
|
""" |
664
|
|
|
if force: |
665
|
|
|
return self._autodiscover_tasks(packages, related_name) |
666
|
|
|
signals.import_modules.connect(starpromise( |
667
|
|
|
self._autodiscover_tasks, packages, related_name, |
668
|
|
|
), weak=False, sender=self) |
669
|
|
|
|
670
|
|
|
def _autodiscover_tasks(self, packages, related_name, **kwargs): |
671
|
|
|
if packages: |
672
|
|
|
return self._autodiscover_tasks_from_names(packages, related_name) |
673
|
|
|
return self._autodiscover_tasks_from_fixups(related_name) |
674
|
|
|
|
675
|
|
|
def _autodiscover_tasks_from_names(self, packages, related_name): |
676
|
|
|
# packages argument can be lazy |
677
|
|
|
return self.loader.autodiscover_tasks( |
678
|
|
|
packages() if callable(packages) else packages, related_name, |
679
|
|
|
) |
680
|
|
|
|
681
|
|
|
def _autodiscover_tasks_from_fixups(self, related_name): |
682
|
|
|
return self._autodiscover_tasks_from_names([ |
683
|
|
|
pkg for fixup in self._fixups |
684
|
|
|
for pkg in fixup.autodiscover_tasks() |
685
|
|
|
if hasattr(fixup, 'autodiscover_tasks') |
686
|
|
|
], related_name=related_name) |
687
|
|
|
|
688
|
|
|
def send_task(self, name, args=None, kwargs=None, countdown=None, |
689
|
|
|
eta=None, task_id=None, producer=None, connection=None, |
690
|
|
|
router=None, result_cls=None, expires=None, |
691
|
|
|
publisher=None, link=None, link_error=None, |
692
|
|
|
add_to_parent=True, group_id=None, retries=0, chord=None, |
693
|
|
|
reply_to=None, time_limit=None, soft_time_limit=None, |
694
|
|
|
root_id=None, parent_id=None, route_name=None, |
695
|
|
|
shadow=None, chain=None, task_type=None, **options): |
696
|
|
|
"""Send task by name. |
697
|
|
|
|
698
|
|
|
Supports the same arguments as :meth:`@-Task.apply_async`. |
699
|
|
|
|
700
|
|
|
Arguments: |
701
|
|
|
name (str): Name of task to call (e.g., `"tasks.add"`). |
702
|
|
|
result_cls (AsyncResult): Specify custom result class. |
703
|
|
|
""" |
704
|
|
|
parent = have_parent = None |
705
|
|
|
amqp = self.amqp |
706
|
|
|
task_id = task_id or uuid() |
707
|
|
|
producer = producer or publisher # XXX compat |
708
|
|
|
router = router or amqp.router |
709
|
|
|
conf = self.conf |
710
|
|
|
if conf.task_always_eager: # pragma: no cover |
711
|
|
|
warnings.warn(AlwaysEagerIgnored( |
712
|
|
|
'task_always_eager has no effect on send_task', |
713
|
|
|
), stacklevel=2) |
714
|
|
|
|
715
|
|
|
ignored_result = options.pop('ignore_result', False) |
716
|
|
|
options = router.route( |
717
|
|
|
options, route_name or name, args, kwargs, task_type) |
718
|
|
|
|
719
|
|
|
if not root_id or not parent_id: |
720
|
|
|
parent = self.current_worker_task |
721
|
|
|
if parent: |
722
|
|
|
if not root_id: |
723
|
|
|
root_id = parent.request.root_id or parent.request.id |
724
|
|
|
if not parent_id: |
725
|
|
|
parent_id = parent.request.id |
726
|
|
|
|
727
|
|
|
message = amqp.create_task_message( |
728
|
|
|
task_id, name, args, kwargs, countdown, eta, group_id, |
729
|
|
|
expires, retries, chord, |
730
|
|
|
maybe_list(link), maybe_list(link_error), |
731
|
|
|
reply_to or self.oid, time_limit, soft_time_limit, |
732
|
|
|
self.conf.task_send_sent_event, |
733
|
|
|
root_id, parent_id, shadow, chain, |
734
|
|
|
argsrepr=options.get('argsrepr'), |
735
|
|
|
kwargsrepr=options.get('kwargsrepr'), |
736
|
|
|
) |
737
|
|
|
|
738
|
|
|
if connection: |
739
|
|
|
producer = amqp.Producer(connection, auto_declare=False) |
740
|
|
|
|
741
|
|
|
with self.producer_or_acquire(producer) as P: |
742
|
|
|
with P.connection._reraise_as_library_errors(): |
743
|
|
|
if not ignored_result: |
744
|
|
|
self.backend.on_task_call(P, task_id) |
745
|
|
|
amqp.send_task_message(P, name, message, **options) |
746
|
|
|
result = (result_cls or self.AsyncResult)(task_id) |
747
|
|
|
# We avoid using the constructor since a custom result class |
748
|
|
|
# can be used, in which case the constructor may still use |
749
|
|
|
# the old signature. |
750
|
|
|
result.ignored = ignored_result |
751
|
|
|
|
752
|
|
|
if add_to_parent: |
753
|
|
|
if not have_parent: |
754
|
|
|
parent, have_parent = self.current_worker_task, True |
755
|
|
|
if parent: |
756
|
|
|
parent.add_trail(result) |
757
|
|
|
return result |
758
|
|
|
|
759
|
|
|
def connection_for_read(self, url=None, **kwargs): |
760
|
|
|
"""Establish connection used for consuming. |
761
|
|
|
|
762
|
|
|
See Also: |
763
|
|
|
:meth:`connection` for supported arguments. |
764
|
|
|
""" |
765
|
|
|
return self._connection(url or self.conf.broker_read_url, **kwargs) |
766
|
|
|
|
767
|
|
|
def connection_for_write(self, url=None, **kwargs): |
768
|
|
|
"""Establish connection used for producing. |
769
|
|
|
|
770
|
|
|
See Also: |
771
|
|
|
:meth:`connection` for supported arguments. |
772
|
|
|
""" |
773
|
|
|
return self._connection(url or self.conf.broker_write_url, **kwargs) |
774
|
|
|
|
775
|
|
|
def connection(self, hostname=None, userid=None, password=None, |
776
|
|
|
virtual_host=None, port=None, ssl=None, |
777
|
|
|
connect_timeout=None, transport=None, |
778
|
|
|
transport_options=None, heartbeat=None, |
779
|
|
|
login_method=None, failover_strategy=None, **kwargs): |
780
|
|
|
"""Establish a connection to the message broker. |
781
|
|
|
|
782
|
|
|
Please use :meth:`connection_for_read` and |
783
|
|
|
:meth:`connection_for_write` instead, to convey the intent |
784
|
|
|
of use for this connection. |
785
|
|
|
|
786
|
|
|
Arguments: |
787
|
|
|
url: Either the URL or the hostname of the broker to use. |
788
|
|
|
hostname (str): URL, Hostname/IP-address of the broker. |
789
|
|
|
If a URL is used, then the other argument below will |
790
|
|
|
be taken from the URL instead. |
791
|
|
|
userid (str): Username to authenticate as. |
792
|
|
|
password (str): Password to authenticate with |
793
|
|
|
virtual_host (str): Virtual host to use (domain). |
794
|
|
|
port (int): Port to connect to. |
795
|
|
|
ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl` |
796
|
|
|
setting. |
797
|
|
|
transport (str): defaults to the :setting:`broker_transport` |
798
|
|
|
setting. |
799
|
|
|
transport_options (Dict): Dictionary of transport specific options. |
800
|
|
|
heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only). |
801
|
|
|
login_method (str): Custom login method to use (AMQP only). |
802
|
|
|
failover_strategy (str, Callable): Custom failover strategy. |
803
|
|
|
**kwargs: Additional arguments to :class:`kombu.Connection`. |
804
|
|
|
|
805
|
|
|
Returns: |
806
|
|
|
kombu.Connection: the lazy connection instance. |
807
|
|
|
""" |
808
|
|
|
return self.connection_for_write( |
809
|
|
|
hostname or self.conf.broker_write_url, |
810
|
|
|
userid=userid, password=password, |
811
|
|
|
virtual_host=virtual_host, port=port, ssl=ssl, |
812
|
|
|
connect_timeout=connect_timeout, transport=transport, |
813
|
|
|
transport_options=transport_options, heartbeat=heartbeat, |
814
|
|
|
login_method=login_method, failover_strategy=failover_strategy, |
815
|
|
|
**kwargs |
816
|
|
|
) |
817
|
|
|
|
818
|
|
|
def _connection(self, url, userid=None, password=None, |
819
|
|
|
virtual_host=None, port=None, ssl=None, |
820
|
|
|
connect_timeout=None, transport=None, |
821
|
|
|
transport_options=None, heartbeat=None, |
822
|
|
|
login_method=None, failover_strategy=None, **kwargs): |
823
|
|
|
conf = self.conf |
824
|
|
|
return self.amqp.Connection( |
825
|
|
|
url, |
826
|
|
|
userid or conf.broker_user, |
827
|
|
|
password or conf.broker_password, |
828
|
|
|
virtual_host or conf.broker_vhost, |
829
|
|
|
port or conf.broker_port, |
830
|
|
|
transport=transport or conf.broker_transport, |
831
|
|
|
ssl=self.either('broker_use_ssl', ssl), |
832
|
|
|
heartbeat=heartbeat or self.conf.broker_heartbeat, |
833
|
|
|
login_method=login_method or conf.broker_login_method, |
834
|
|
|
failover_strategy=( |
835
|
|
|
failover_strategy or conf.broker_failover_strategy |
836
|
|
|
), |
837
|
|
|
transport_options=dict( |
838
|
|
|
conf.broker_transport_options, **transport_options or {} |
839
|
|
|
), |
840
|
|
|
connect_timeout=self.either( |
841
|
|
|
'broker_connection_timeout', connect_timeout |
842
|
|
|
), |
843
|
|
|
) |
844
|
|
|
broker_connection = connection |
845
|
|
|
|
846
|
|
|
def _acquire_connection(self, pool=True): |
847
|
|
|
"""Helper for :meth:`connection_or_acquire`.""" |
848
|
|
|
if pool: |
849
|
|
|
return self.pool.acquire(block=True) |
850
|
|
|
return self.connection_for_write() |
851
|
|
|
|
852
|
|
|
def connection_or_acquire(self, connection=None, pool=True, *_, **__): |
853
|
|
|
"""Context used to acquire a connection from the pool. |
854
|
|
|
|
855
|
|
|
For use within a :keyword:`with` statement to get a connection |
856
|
|
|
from the pool if one is not already provided. |
857
|
|
|
|
858
|
|
|
Arguments: |
859
|
|
|
connection (kombu.Connection): If not provided, a connection |
860
|
|
|
will be acquired from the connection pool. |
861
|
|
|
""" |
862
|
|
|
return FallbackContext(connection, self._acquire_connection, pool=pool) |
863
|
|
|
default_connection = connection_or_acquire # XXX compat |
864
|
|
|
|
865
|
|
|
def producer_or_acquire(self, producer=None): |
866
|
|
|
"""Context used to acquire a producer from the pool. |
867
|
|
|
|
868
|
|
|
For use within a :keyword:`with` statement to get a producer |
869
|
|
|
from the pool if one is not already provided |
870
|
|
|
|
871
|
|
|
Arguments: |
872
|
|
|
producer (kombu.Producer): If not provided, a producer |
873
|
|
|
will be acquired from the producer pool. |
874
|
|
|
""" |
875
|
|
|
return FallbackContext( |
876
|
|
|
producer, self.producer_pool.acquire, block=True, |
877
|
|
|
) |
878
|
|
|
default_producer = producer_or_acquire # XXX compat |
879
|
|
|
|
880
|
|
|
def prepare_config(self, c): |
881
|
|
|
"""Prepare configuration before it is merged with the defaults.""" |
882
|
|
|
return find_deprecated_settings(c) |
883
|
|
|
|
884
|
|
|
def now(self): |
885
|
|
|
"""Return the current time and date as a datetime.""" |
886
|
|
|
now_in_utc = to_utc(datetime.utcnow()) |
887
|
|
|
return now_in_utc.astimezone(self.timezone) |
888
|
|
|
|
889
|
|
|
def select_queues(self, queues=None): |
890
|
|
|
"""Select subset of queues. |
891
|
|
|
|
892
|
|
|
Arguments: |
893
|
|
|
queues (Sequence[str]): a list of queue names to keep. |
894
|
|
|
""" |
895
|
|
|
return self.amqp.queues.select(queues) |
896
|
|
|
|
897
|
|
|
def either(self, default_key, *defaults): |
898
|
|
|
"""Get key from configuration or use default values. |
899
|
|
|
|
900
|
|
|
Fallback to the value of a configuration key if none of the |
901
|
|
|
`*values` are true. |
902
|
|
|
""" |
903
|
|
|
return first(None, [ |
904
|
|
|
first(None, defaults), starpromise(self.conf.get, default_key), |
905
|
|
|
]) |
906
|
|
|
|
907
|
|
|
def bugreport(self): |
908
|
|
|
"""Return information useful in bug reports.""" |
909
|
|
|
return bugreport(self) |
910
|
|
|
|
911
|
|
|
def _get_backend(self): |
912
|
|
|
backend, url = backends.by_url( |
913
|
|
|
self.backend_cls or self.conf.result_backend, |
914
|
|
|
self.loader) |
915
|
|
|
return backend(app=self, url=url) |
916
|
|
|
|
917
|
|
|
def _finalize_pending_conf(self): |
918
|
|
|
"""Get config value by key and finalize loading the configuration. |
919
|
|
|
|
920
|
|
|
Note: |
921
|
|
|
This is used by PendingConfiguration: |
922
|
|
|
as soon as you access a key the configuration is read. |
923
|
|
|
""" |
924
|
|
|
conf = self._conf = self._load_config() |
925
|
|
|
return conf |
926
|
|
|
|
927
|
|
|
def _load_config(self): |
928
|
|
|
if isinstance(self.on_configure, Signal): |
929
|
|
|
self.on_configure.send(sender=self) |
930
|
|
|
else: |
931
|
|
|
# used to be a method pre 4.0 |
932
|
|
|
self.on_configure() |
933
|
|
|
if self._config_source: |
934
|
|
|
self.loader.config_from_object(self._config_source) |
935
|
|
|
self.configured = True |
936
|
|
|
settings = detect_settings( |
937
|
|
|
self.prepare_config(self.loader.conf), self._preconf, |
938
|
|
|
ignore_keys=self._preconf_set_by_auto, prefix=self.namespace, |
939
|
|
|
) |
940
|
|
|
if self._conf is not None: |
941
|
|
|
# replace in place, as someone may have referenced app.conf, |
942
|
|
|
# done some changes, accessed a key, and then try to make more |
943
|
|
|
# changes to the reference and not the finalized value. |
944
|
|
|
self._conf.swap_with(settings) |
945
|
|
|
else: |
946
|
|
|
self._conf = settings |
947
|
|
|
|
948
|
|
|
# load lazy config dict initializers. |
949
|
|
|
pending_def = self._pending_defaults |
950
|
|
|
while pending_def: |
951
|
|
|
self._conf.add_defaults(maybe_evaluate(pending_def.popleft()())) |
952
|
|
|
|
953
|
|
|
# load lazy periodic tasks |
954
|
|
|
pending_beat = self._pending_periodic_tasks |
955
|
|
|
while pending_beat: |
956
|
|
|
self._add_periodic_task(*pending_beat.popleft()) |
957
|
|
|
|
958
|
|
|
self.on_after_configure.send(sender=self, source=self._conf) |
959
|
|
|
return self._conf |
960
|
|
|
|
961
|
|
|
def _after_fork(self): |
962
|
|
|
self._pool = None |
963
|
|
|
try: |
964
|
|
|
self.__dict__['amqp']._producer_pool = None |
965
|
|
|
except (AttributeError, KeyError): |
966
|
|
|
pass |
967
|
|
|
self.on_after_fork.send(sender=self) |
968
|
|
|
|
969
|
|
|
def signature(self, *args, **kwargs): |
970
|
|
|
"""Return a new :class:`~celery.Signature` bound to this app.""" |
971
|
|
|
kwargs['app'] = self |
972
|
|
|
return self._canvas.signature(*args, **kwargs) |
973
|
|
|
|
974
|
|
|
def add_periodic_task(self, schedule, sig, |
975
|
|
|
args=(), kwargs=(), name=None, **opts): |
976
|
|
|
key, entry = self._sig_to_periodic_task_entry( |
977
|
|
|
schedule, sig, args, kwargs, name, **opts) |
978
|
|
|
if self.configured: |
979
|
|
|
self._add_periodic_task(key, entry) |
980
|
|
|
else: |
981
|
|
|
self._pending_periodic_tasks.append((key, entry)) |
982
|
|
|
return key |
983
|
|
|
|
984
|
|
|
def _sig_to_periodic_task_entry(self, schedule, sig, |
985
|
|
|
args=(), kwargs={}, name=None, **opts): |
986
|
|
|
sig = (sig.clone(args, kwargs) |
987
|
|
|
if isinstance(sig, abstract.CallableSignature) |
988
|
|
|
else self.signature(sig.name, args, kwargs)) |
989
|
|
|
return name or repr(sig), { |
990
|
|
|
'schedule': schedule, |
991
|
|
|
'task': sig.name, |
992
|
|
|
'args': sig.args, |
993
|
|
|
'kwargs': sig.kwargs, |
994
|
|
|
'options': dict(sig.options, **opts), |
995
|
|
|
} |
996
|
|
|
|
997
|
|
|
def _add_periodic_task(self, key, entry): |
998
|
|
|
self._conf.beat_schedule[key] = entry |
999
|
|
|
|
1000
|
|
|
def create_task_cls(self): |
1001
|
|
|
"""Create a base task class bound to this app.""" |
1002
|
|
|
return self.subclass_with_self( |
1003
|
|
|
self.task_cls, name='Task', attribute='_app', |
1004
|
|
|
keep_reduce=True, abstract=True, |
1005
|
|
|
) |
1006
|
|
|
|
1007
|
|
|
def subclass_with_self(self, Class, name=None, attribute='app', |
1008
|
|
|
reverse=None, keep_reduce=False, **kw): |
1009
|
|
|
"""Subclass an app-compatible class. |
1010
|
|
|
|
1011
|
|
|
App-compatible means that the class has a class attribute that |
1012
|
|
|
provides the default app it should use, for example: |
1013
|
|
|
``class Foo: app = None``. |
1014
|
|
|
|
1015
|
|
|
Arguments: |
1016
|
|
|
Class (type): The app-compatible class to subclass. |
1017
|
|
|
name (str): Custom name for the target class. |
1018
|
|
|
attribute (str): Name of the attribute holding the app, |
1019
|
|
|
Default is 'app'. |
1020
|
|
|
reverse (str): Reverse path to this object used for pickling |
1021
|
|
|
purposes. For example, to get ``app.AsyncResult``, |
1022
|
|
|
use ``"AsyncResult"``. |
1023
|
|
|
keep_reduce (bool): If enabled a custom ``__reduce__`` |
1024
|
|
|
implementation won't be provided. |
1025
|
|
|
""" |
1026
|
|
|
Class = symbol_by_name(Class) |
1027
|
|
|
reverse = reverse if reverse else Class.__name__ |
1028
|
|
|
|
1029
|
|
|
def __reduce__(self): |
1030
|
|
|
return _unpickle_appattr, (reverse, self.__reduce_args__()) |
1031
|
|
|
|
1032
|
|
|
attrs = dict( |
1033
|
|
|
{attribute: self}, |
1034
|
|
|
__module__=Class.__module__, |
1035
|
|
|
__doc__=Class.__doc__, |
1036
|
|
|
**kw) |
1037
|
|
|
if not keep_reduce: |
1038
|
|
|
attrs['__reduce__'] = __reduce__ |
1039
|
|
|
|
1040
|
|
|
return type(bytes_if_py2(name or Class.__name__), (Class,), attrs) |
1041
|
|
|
|
1042
|
|
|
def _rgetattr(self, path): |
1043
|
|
|
return attrgetter(path)(self) |
1044
|
|
|
|
1045
|
|
|
def __enter__(self): |
1046
|
|
|
return self |
1047
|
|
|
|
1048
|
|
|
def __exit__(self, *exc_info): |
1049
|
|
|
self.close() |
1050
|
|
|
|
1051
|
|
|
def __repr__(self): |
1052
|
|
|
return '<{0} {1}>'.format(type(self).__name__, appstr(self)) |
1053
|
|
|
|
1054
|
|
|
def __reduce__(self): |
1055
|
|
|
if self._using_v1_reduce: |
1056
|
|
|
return self.__reduce_v1__() |
1057
|
|
|
return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__())) |
1058
|
|
|
|
1059
|
|
|
def __reduce_v1__(self): |
1060
|
|
|
# Reduce only pickles the configuration changes, |
1061
|
|
|
# so the default configuration doesn't have to be passed |
1062
|
|
|
# between processes. |
1063
|
|
|
return ( |
1064
|
|
|
_unpickle_app, |
1065
|
|
|
(self.__class__, self.Pickler) + self.__reduce_args__(), |
1066
|
|
|
) |
1067
|
|
|
|
1068
|
|
|
def __reduce_keys__(self): |
1069
|
|
|
"""Keyword arguments used to reconstruct the object when unpickling.""" |
1070
|
|
|
return { |
1071
|
|
|
'main': self.main, |
1072
|
|
|
'changes': |
1073
|
|
|
self._conf.changes if self.configured else self._preconf, |
1074
|
|
|
'loader': self.loader_cls, |
1075
|
|
|
'backend': self.backend_cls, |
1076
|
|
|
'amqp': self.amqp_cls, |
1077
|
|
|
'events': self.events_cls, |
1078
|
|
|
'log': self.log_cls, |
1079
|
|
|
'control': self.control_cls, |
1080
|
|
|
'fixups': self.fixups, |
1081
|
|
|
'config_source': self._config_source, |
1082
|
|
|
'task_cls': self.task_cls, |
1083
|
|
|
'namespace': self.namespace, |
1084
|
|
|
} |
1085
|
|
|
|
1086
|
|
|
def __reduce_args__(self): |
1087
|
|
|
"""Deprecated method, please use :meth:`__reduce_keys__` instead.""" |
1088
|
|
|
return (self.main, self._conf.changes if self.configured else {}, |
1089
|
|
|
self.loader_cls, self.backend_cls, self.amqp_cls, |
1090
|
|
|
self.events_cls, self.log_cls, self.control_cls, |
1091
|
|
|
False, self._config_source) |
1092
|
|
|
|
1093
|
|
|
@cached_property |
1094
|
|
|
def Worker(self): |
1095
|
|
|
"""Worker application. |
1096
|
|
|
|
1097
|
|
|
See Also: |
1098
|
|
|
:class:`~@Worker`. |
1099
|
|
|
""" |
1100
|
|
|
return self.subclass_with_self('celery.apps.worker:Worker') |
1101
|
|
|
|
1102
|
|
|
@cached_property |
1103
|
|
|
def WorkController(self, **kwargs): |
1104
|
|
|
"""Embeddable worker. |
1105
|
|
|
|
1106
|
|
|
See Also: |
1107
|
|
|
:class:`~@WorkController`. |
1108
|
|
|
""" |
1109
|
|
|
return self.subclass_with_self('celery.worker:WorkController') |
1110
|
|
|
|
1111
|
|
|
@cached_property |
1112
|
|
|
def Beat(self, **kwargs): |
1113
|
|
|
""":program:`celery beat` scheduler application. |
1114
|
|
|
|
1115
|
|
|
See Also: |
1116
|
|
|
:class:`~@Beat`. |
1117
|
|
|
""" |
1118
|
|
|
return self.subclass_with_self('celery.apps.beat:Beat') |
1119
|
|
|
|
1120
|
|
|
@cached_property |
1121
|
|
|
def Task(self): |
1122
|
|
|
"""Base task class for this app.""" |
1123
|
|
|
return self.create_task_cls() |
1124
|
|
|
|
1125
|
|
|
@cached_property |
1126
|
|
|
def annotations(self): |
1127
|
|
|
return prepare_annotations(self.conf.task_annotations) |
1128
|
|
|
|
1129
|
|
|
@cached_property |
1130
|
|
|
def AsyncResult(self): |
1131
|
|
|
"""Create new result instance. |
1132
|
|
|
|
1133
|
|
|
See Also: |
1134
|
|
|
:class:`celery.result.AsyncResult`. |
1135
|
|
|
""" |
1136
|
|
|
return self.subclass_with_self('celery.result:AsyncResult') |
1137
|
|
|
|
1138
|
|
|
@cached_property |
1139
|
|
|
def ResultSet(self): |
1140
|
|
|
return self.subclass_with_self('celery.result:ResultSet') |
1141
|
|
|
|
1142
|
|
|
@cached_property |
1143
|
|
|
def GroupResult(self): |
1144
|
|
|
"""Create new group result instance. |
1145
|
|
|
|
1146
|
|
|
See Also: |
1147
|
|
|
:class:`celery.result.GroupResult`. |
1148
|
|
|
""" |
1149
|
|
|
return self.subclass_with_self('celery.result:GroupResult') |
1150
|
|
|
|
1151
|
|
|
@property |
1152
|
|
|
def pool(self): |
1153
|
|
|
"""Broker connection pool: :class:`~@pool`. |
1154
|
|
|
|
1155
|
|
|
Note: |
1156
|
|
|
This attribute is not related to the workers concurrency pool. |
1157
|
|
|
""" |
1158
|
|
|
if self._pool is None: |
1159
|
|
|
self._ensure_after_fork() |
1160
|
|
|
limit = self.conf.broker_pool_limit |
1161
|
|
|
pools.set_limit(limit) |
1162
|
|
|
self._pool = pools.connections[self.connection_for_write()] |
1163
|
|
|
return self._pool |
1164
|
|
|
|
1165
|
|
|
@property |
1166
|
|
|
def current_task(self): |
1167
|
|
|
"""Instance of task being executed, or :const:`None`.""" |
1168
|
|
|
return _task_stack.top |
1169
|
|
|
|
1170
|
|
|
@property |
1171
|
|
|
def current_worker_task(self): |
1172
|
|
|
"""The task currently being executed by a worker or :const:`None`. |
1173
|
|
|
|
1174
|
|
|
Differs from :data:`current_task` in that it's not affected |
1175
|
|
|
by tasks calling other tasks directly, or eagerly. |
1176
|
|
|
""" |
1177
|
|
|
return get_current_worker_task() |
1178
|
|
|
|
1179
|
|
|
@cached_property |
1180
|
|
|
def oid(self): |
1181
|
|
|
"""Universally unique identifier for this app.""" |
1182
|
|
|
# since 4.0: thread.get_ident() is not included when |
1183
|
|
|
# generating the process id. This is due to how the RPC |
1184
|
|
|
# backend now dedicates a single thread to receive results, |
1185
|
|
|
# which would not work if each thread has a separate id. |
1186
|
|
|
return oid_from(self, threads=False) |
1187
|
|
|
|
1188
|
|
|
@cached_property |
1189
|
|
|
def amqp(self): |
1190
|
|
|
"""AMQP related functionality: :class:`~@amqp`.""" |
1191
|
|
|
return instantiate(self.amqp_cls, app=self) |
1192
|
|
|
|
1193
|
|
|
@cached_property |
1194
|
|
|
def backend(self): |
1195
|
|
|
"""Current backend instance.""" |
1196
|
|
|
return self._get_backend() |
1197
|
|
|
|
1198
|
|
|
@property |
1199
|
|
|
def conf(self): |
1200
|
|
|
"""Current configuration.""" |
1201
|
|
|
if self._conf is None: |
1202
|
|
|
self._conf = self._load_config() |
1203
|
|
|
return self._conf |
1204
|
|
|
|
1205
|
|
|
@conf.setter |
1206
|
|
|
def conf(self, d): # noqa |
1207
|
|
|
self._conf = d |
1208
|
|
|
|
1209
|
|
|
@cached_property |
1210
|
|
|
def control(self): |
1211
|
|
|
"""Remote control: :class:`~@control`.""" |
1212
|
|
|
return instantiate(self.control_cls, app=self) |
1213
|
|
|
|
1214
|
|
|
@cached_property |
1215
|
|
|
def events(self): |
1216
|
|
|
"""Consuming and sending events: :class:`~@events`.""" |
1217
|
|
|
return instantiate(self.events_cls, app=self) |
1218
|
|
|
|
1219
|
|
|
@cached_property |
1220
|
|
|
def loader(self): |
1221
|
|
|
"""Current loader instance.""" |
1222
|
|
|
return get_loader_cls(self.loader_cls)(app=self) |
1223
|
|
|
|
1224
|
|
|
@cached_property |
1225
|
|
|
def log(self): |
1226
|
|
|
"""Logging: :class:`~@log`.""" |
1227
|
|
|
return instantiate(self.log_cls, app=self) |
1228
|
|
|
|
1229
|
|
|
@cached_property |
1230
|
|
|
def _canvas(self): |
1231
|
|
|
from celery import canvas |
1232
|
|
|
return canvas |
1233
|
|
|
|
1234
|
|
|
@cached_property |
1235
|
|
|
def tasks(self): |
1236
|
|
|
"""Task registry. |
1237
|
|
|
|
1238
|
|
|
Warning: |
1239
|
|
|
Accessing this attribute will also auto-finalize the app. |
1240
|
|
|
""" |
1241
|
|
|
self.finalize(auto=True) |
1242
|
|
|
return self._tasks |
1243
|
|
|
|
1244
|
|
|
@property |
1245
|
|
|
def producer_pool(self): |
1246
|
|
|
return self.amqp.producer_pool |
1247
|
|
|
|
1248
|
|
|
def uses_utc_timezone(self): |
1249
|
|
|
"""Check if the application uses the UTC timezone.""" |
1250
|
|
|
return self.timezone == timezone.utc |
1251
|
|
|
|
1252
|
|
|
@cached_property |
1253
|
|
|
def timezone(self): |
1254
|
|
|
"""Current timezone for this app. |
1255
|
|
|
|
1256
|
|
|
This is a cached property taking the time zone from the |
1257
|
|
|
:setting:`timezone` setting. |
1258
|
|
|
""" |
1259
|
|
|
conf = self.conf |
1260
|
|
|
if not conf.timezone: |
1261
|
|
|
if conf.enable_utc: |
1262
|
|
|
return timezone.utc |
1263
|
|
|
else: |
1264
|
|
|
return timezone.local |
1265
|
|
|
return timezone.get_timezone(conf.timezone) |
1266
|
|
|
|
1267
|
|
|
|
1268
|
|
|
App = Celery # noqa: E305 XXX compat |
1269
|
|
|
|