Celery   F
last analyzed

Complexity

Total Complexity 163

Size/Duplication

Total Lines 1122
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 1122
rs 1.112
c 0
b 0
f 0
wmc 163

83 Methods

Rating   Name   Duplication   Size   Complexity  
A __reduce__() 0 2 2
A annotations() 0 3 1
A events() 0 4 1
A ResultSet() 0 3 1
A producer_pool() 0 3 1
A bugreport() 0 3 1
A loader() 0 4 1
A signature() 0 4 1
A _canvas() 0 4 1
A _add_periodic_task() 0 2 1
A log() 0 4 1
A __exit__() 0 2 1
A __enter__() 0 2 1
A uses_utc_timezone() 0 3 1
A current_task() 0 4 1
A __repr__() 0 2 1
A amqp() 0 4 1
A _rgetattr() 0 2 1
A backend() 0 4 1
A control() 0 4 1
A Task() 0 4 1
A gen_task_name() 0 2 1
A cons() 0 2 1
A __autoset() 0 4 2
A _autodiscover_tasks_from_names() 0 4 2
A config_from_cmdline() 0 3 1
A set_current() 0 3 1
A set_default() 0 3 1
A _autodiscover_tasks() 0 4 2
A on_init() 0 2 1
A now() 0 4 1
A prepare_config() 0 3 1
A connection_for_write() 0 7 1
A connection_or_acquire() 0 11 1
A __reduce_v1__() 0 7 1
A connection_for_read() 0 7 1
A GroupResult() 0 8 1
A either() 0 8 1
A close() 0 13 1
A start() 0 8 1
A _sig_to_periodic_task_entry() 0 11 2
A __reduce_keys__() 0 16 2
A Worker() 0 8 1
A _connection() 0 24 1
A config_from_object() 0 23 4
A worker_main() 0 8 1
D task() 0 67 13
A _acquire_connection() 0 5 2
A create_task_cls() 0 5 1
A oid() 0 8 1
A select_queues() 0 7 1
A _after_fork() 0 7 2
B finalize() 0 21 7
A Beat() 0 8 1
B _load_config() 0 33 6
A add_periodic_task() 0 9 2
A current_worker_task() 0 8 1
A setup_security() 0 28 1
A add_defaults() 0 23 4
A run() 0 13 3
B connection() 0 41 1
A subclass_with_self() 0 34 4
A _finalize_pending_conf() 0 9 1
A register_task() 0 16 2
A __reduce_args__() 0 6 2
B autodiscover_tasks() 0 44 2
B _create_task_cls() 0 16 6
C _task_from_fun() 0 52 11
F send_task() 0 70 14
A producer_or_acquire() 0 12 1
B inner_create_task_cls() 0 21 7
A _get_default_loader() 0 6 1
A config_from_envvar() 0 17 3
A WorkController() 0 8 1
A conf() 0 6 1
A tasks() 0 9 1
A pool() 0 13 2
A _ensure_after_fork() 0 5 3
A AsyncResult() 0 8 1
A timezone() 0 14 3
A _autodiscover_tasks_from_fixups() 0 6 4
A _get_backend() 0 5 1
C __init__() 0 74 6

How to fix   Complexity   

Complex Class

Complex classes like Celery often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
"""Actual App instance implementation."""
3
from __future__ import absolute_import, unicode_literals
4
5
import os
6
import threading
7
import warnings
8
from collections import defaultdict, deque
9
from datetime import datetime
10
from operator import attrgetter
11
12
from kombu import pools
13
from kombu.clocks import LamportClock
14
from kombu.common import oid_from
15
from kombu.utils.compat import register_after_fork
16
from kombu.utils.objects import cached_property
17
from kombu.utils.uuid import uuid
18
from vine import starpromise
19
from vine.utils import wraps
20
21
from celery import platforms, signals
22
from celery._state import (_announce_app_finalized, _deregister_app,
23
                           _register_app, _set_current_app, _task_stack,
24
                           connect_on_app_finalize, get_current_app,
25
                           get_current_worker_task, set_default_app)
26
from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
27
from celery.five import (UserDict, bytes_if_py2, python_2_unicode_compatible,
28
                         values)
29
from celery.loaders import get_loader_cls
30
from celery.local import PromiseProxy, maybe_evaluate
31
from celery.utils import abstract
32
from celery.utils.collections import AttributeDictMixin
33
from celery.utils.dispatch import Signal
34
from celery.utils.functional import first, head_from_fun, maybe_list
35
from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
36
from celery.utils.log import get_logger
37
from celery.utils.objects import FallbackContext, mro_lookup
38
from celery.utils.time import (get_exponential_backoff_interval, timezone,
39
                               to_utc)
40
41
# Load all builtin tasks
42
from . import builtins  # noqa
43
from . import backends
44
from .annotations import prepare as prepare_annotations
45
from .defaults import find_deprecated_settings
46
from .registry import TaskRegistry
47
from .utils import (AppPickler, Settings, _new_key_to_old, _old_key_to_new,
48
                    _unpickle_app, _unpickle_app_v2, appstr, bugreport,
49
                    detect_settings)
50
51
__all__ = ('Celery',)
52
53
logger = get_logger(__name__)
54
55
BUILTIN_FIXUPS = {
56
    'celery.fixups.django:fixup',
57
}
58
USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
59
60
ERR_ENVVAR_NOT_SET = """
61
The environment variable {0!r} is not set,
62
and as such the configuration could not be loaded.
63
64
Please set this variable and make sure it points to
65
a valid configuration module.
66
67
Example:
68
    {0}="proj.celeryconfig"
69
"""
70
71
72
def app_has_custom(app, attr):
73
    """Return true if app has customized method `attr`.
74
75
    Note:
76
        This is used for optimizations in cases where we know
77
        how the default behavior works, but need to account
78
        for someone using inheritance to override a method/property.
79
    """
80
    return mro_lookup(app.__class__, attr, stop={Celery, object},
81
                      monkey_patched=[__name__])
82
83
84
def _unpickle_appattr(reverse_name, args):
85
    """Unpickle app."""
86
    # Given an attribute name and a list of args, gets
87
    # the attribute from the current app and calls it.
88
    return get_current_app()._rgetattr(reverse_name)(*args)
89
90
91
def _after_fork_cleanup_app(app):
92
    # This is used with multiprocessing.register_after_fork,
93
    # so need to be at module level.
94
    try:
95
        app._after_fork()
96
    except Exception as exc:  # pylint: disable=broad-except
97
        logger.info('after forker raised exception: %r', exc, exc_info=1)
98
99
100
class PendingConfiguration(UserDict, AttributeDictMixin):
101
    # `app.conf` will be of this type before being explicitly configured,
102
    # meaning the app can keep any configuration set directly
103
    # on `app.conf` before the `app.config_from_object` call.
104
    #
105
    # accessing any key will finalize the configuration,
106
    # replacing `app.conf` with a concrete settings object.
107
108
    callback = None
109
    _data = None
110
111
    def __init__(self, conf, callback):
112
        object.__setattr__(self, '_data', conf)
113
        object.__setattr__(self, 'callback', callback)
114
115
    def __setitem__(self, key, value):
116
        self._data[key] = value
117
118
    def clear(self):
119
        self._data.clear()
120
121
    def update(self, *args, **kwargs):
122
        self._data.update(*args, **kwargs)
123
124
    def setdefault(self, *args, **kwargs):
125
        return self._data.setdefault(*args, **kwargs)
126
127
    def __contains__(self, key):
128
        # XXX will not show finalized configuration
129
        # setdefault will cause `key in d` to happen,
130
        # so for setdefault to be lazy, so does contains.
131
        return key in self._data
132
133
    def __len__(self):
134
        return len(self.data)
135
136
    def __repr__(self):
137
        return repr(self.data)
138
139
    @cached_property
140
    def data(self):
141
        return self.callback()
142
143
144
@python_2_unicode_compatible
145
class Celery(object):
146
    """Celery application.
147
148
    Arguments:
149
        main (str): Name of the main module if running as `__main__`.
150
            This is used as the prefix for auto-generated task names.
151
152
    Keyword Arguments:
153
        broker (str): URL of the default broker used.
154
        backend (Union[str, type]): The result store backend class,
155
            or the name of the backend class to use.
156
157
            Default is the value of the :setting:`result_backend` setting.
158
        autofinalize (bool): If set to False a :exc:`RuntimeError`
159
            will be raised if the task registry or tasks are used before
160
            the app is finalized.
161
        set_as_current (bool):  Make this the global current app.
162
        include (List[str]): List of modules every worker should import.
163
164
        amqp (Union[str, type]): AMQP object or class name.
165
        events (Union[str, type]): Events object or class name.
166
        log (Union[str, type]): Log object or class name.
167
        control (Union[str, type]): Control object or class name.
168
        tasks (Union[str, type]): A task registry, or the name of
169
            a registry class.
170
        fixups (List[str]): List of fix-up plug-ins (e.g., see
171
            :mod:`celery.fixups.django`).
172
        config_source (Union[str, type]): Take configuration from a class,
173
            or object.  Attributes may include any settings described in
174
            the documentation.
175
    """
176
177
    #: This is deprecated, use :meth:`reduce_keys` instead
178
    Pickler = AppPickler
179
180
    SYSTEM = platforms.SYSTEM
181
    IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS
182
183
    #: Name of the `__main__` module.  Required for standalone scripts.
184
    #:
185
    #: If set this will be used instead of `__main__` when automatically
186
    #: generating task names.
187
    main = None
188
189
    #: Custom options for command-line programs.
190
    #: See :ref:`extending-commandoptions`
191
    user_options = None
192
193
    #: Custom bootsteps to extend and modify the worker.
194
    #: See :ref:`extending-bootsteps`.
195
    steps = None
196
197
    builtin_fixups = BUILTIN_FIXUPS
198
199
    amqp_cls = 'celery.app.amqp:AMQP'
200
    backend_cls = None
201
    events_cls = 'celery.app.events:Events'
202
    loader_cls = None
203
    log_cls = 'celery.app.log:Logging'
204
    control_cls = 'celery.app.control:Control'
205
    task_cls = 'celery.app.task:Task'
206
    registry_cls = 'celery.app.registry:TaskRegistry'
207
208
    _fixups = None
209
    _pool = None
210
    _conf = None
211
    _after_fork_registered = False
212
213
    #: Signal sent when app is loading configuration.
214
    on_configure = None
215
216
    #: Signal sent after app has prepared the configuration.
217
    on_after_configure = None
218
219
    #: Signal sent after app has been finalized.
220
    on_after_finalize = None
221
222
    #: Signal sent by every new process after fork.
223
    on_after_fork = None
224
225
    def __init__(self, main=None, loader=None, backend=None,
226
                 amqp=None, events=None, log=None, control=None,
227
                 set_as_current=True, tasks=None, broker=None, include=None,
228
                 changes=None, config_source=None, fixups=None, task_cls=None,
229
                 autofinalize=True, namespace=None, strict_typing=True,
230
                 **kwargs):
231
        self.clock = LamportClock()
232
        self.main = main
233
        self.amqp_cls = amqp or self.amqp_cls
234
        self.events_cls = events or self.events_cls
235
        self.loader_cls = loader or self._get_default_loader()
236
        self.log_cls = log or self.log_cls
237
        self.control_cls = control or self.control_cls
238
        self.task_cls = task_cls or self.task_cls
239
        self.set_as_current = set_as_current
240
        self.registry_cls = symbol_by_name(self.registry_cls)
241
        self.user_options = defaultdict(set)
242
        self.steps = defaultdict(set)
243
        self.autofinalize = autofinalize
244
        self.namespace = namespace
245
        self.strict_typing = strict_typing
246
247
        self.configured = False
248
        self._config_source = config_source
249
        self._pending_defaults = deque()
250
        self._pending_periodic_tasks = deque()
251
252
        self.finalized = False
253
        self._finalize_mutex = threading.Lock()
254
        self._pending = deque()
255
        self._tasks = tasks
256
        if not isinstance(self._tasks, TaskRegistry):
257
            self._tasks = self.registry_cls(self._tasks or {})
258
259
        # If the class defines a custom __reduce_args__ we need to use
260
        # the old way of pickling apps: pickling a list of
261
        # args instead of the new way that pickles a dict of keywords.
262
        self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
263
264
        # these options are moved to the config to
265
        # simplify pickling of the app object.
266
        self._preconf = changes or {}
267
        self._preconf_set_by_auto = set()
268
        self.__autoset('broker_url', broker)
269
        self.__autoset('result_backend', backend)
270
        self.__autoset('include', include)
271
        self._conf = Settings(
272
            PendingConfiguration(
273
                self._preconf, self._finalize_pending_conf),
274
            prefix=self.namespace,
275
            keys=(_old_key_to_new, _new_key_to_old),
276
        )
277
278
        # - Apply fix-ups.
279
        self.fixups = set(self.builtin_fixups) if fixups is None else fixups
280
        # ...store fixup instances in _fixups to keep weakrefs alive.
281
        self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
282
283
        if self.set_as_current:
284
            self.set_current()
285
286
        # Signals
287
        if self.on_configure is None:
288
            # used to be a method pre 4.0
289
            self.on_configure = Signal(name='app.on_configure')
290
        self.on_after_configure = Signal(
291
            name='app.on_after_configure',
292
            providing_args={'source'},
293
        )
294
        self.on_after_finalize = Signal(name='app.on_after_finalize')
295
        self.on_after_fork = Signal(name='app.on_after_fork')
296
297
        self.on_init()
298
        _register_app(self)
299
300
    def _get_default_loader(self):
301
        # the --loader command-line argument sets the environment variable.
302
        return (
303
            os.environ.get('CELERY_LOADER') or
304
            self.loader_cls or
305
            'celery.loaders.app:AppLoader'
306
        )
307
308
    def on_init(self):
309
        """Optional callback called at init."""
310
311
    def __autoset(self, key, value):
312
        if value:
313
            self._preconf[key] = value
314
            self._preconf_set_by_auto.add(key)
315
316
    def set_current(self):
317
        """Make this the current app for this thread."""
318
        _set_current_app(self)
319
320
    def set_default(self):
321
        """Make this the default app for all threads."""
322
        set_default_app(self)
323
324
    def _ensure_after_fork(self):
325
        if not self._after_fork_registered:
326
            self._after_fork_registered = True
327
            if register_after_fork is not None:
328
                register_after_fork(self, _after_fork_cleanup_app)
329
330
    def close(self):
331
        """Clean up after the application.
332
333
        Only necessary for dynamically created apps, and you should
334
        probably use the :keyword:`with` statement instead.
335
336
        Example:
337
            >>> with Celery(set_as_current=False) as app:
338
            ...     with app.connection_for_write() as conn:
339
            ...         pass
340
        """
341
        self._pool = None
342
        _deregister_app(self)
343
344
    def start(self, argv=None):
345
        """Run :program:`celery` using `argv`.
346
347
        Uses :data:`sys.argv` if `argv` is not specified.
348
        """
349
        return instantiate(
350
            'celery.bin.celery:CeleryCommand', app=self
351
        ).execute_from_commandline(argv)
352
353
    def worker_main(self, argv=None):
354
        """Run :program:`celery worker` using `argv`.
355
356
        Uses :data:`sys.argv` if `argv` is not specified.
357
        """
358
        return instantiate(
359
            'celery.bin.worker:worker', app=self
360
        ).execute_from_commandline(argv)
361
362
    def task(self, *args, **opts):
363
        """Decorator to create a task class out of any callable.
364
365
        Examples:
366
            .. code-block:: python
367
368
                @app.task
369
                def refresh_feed(url):
370
                    store_feed(feedparser.parse(url))
371
372
            with setting extra options:
373
374
            .. code-block:: python
375
376
                @app.task(exchange='feeds')
377
                def refresh_feed(url):
378
                    return store_feed(feedparser.parse(url))
379
380
        Note:
381
            App Binding: For custom apps the task decorator will return
382
            a proxy object, so that the act of creating the task is not
383
            performed until the task is used or the task registry is accessed.
384
385
            If you're depending on binding to be deferred, then you must
386
            not access any attributes on the returned object until the
387
            application is fully set up (finalized).
388
        """
389
        if USING_EXECV and opts.get('lazy', True):
390
            # When using execv the task in the original module will point to a
391
            # different app, so doing things like 'add.request' will point to
392
            # a different task instance.  This makes sure it will always use
393
            # the task instance from the current app.
394
            # Really need a better solution for this :(
395
            from . import shared_task
396
            return shared_task(*args, lazy=False, **opts)
397
398
        def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
399
            _filt = filter  # stupid 2to3
400
401
            def _create_task_cls(fun):
402
                if shared:
403
                    def cons(app):
404
                        return app._task_from_fun(fun, **opts)
405
                    cons.__name__ = fun.__name__
406
                    connect_on_app_finalize(cons)
407
                if not lazy or self.finalized:
408
                    ret = self._task_from_fun(fun, **opts)
409
                else:
410
                    # return a proxy object that evaluates on first use
411
                    ret = PromiseProxy(self._task_from_fun, (fun,), opts,
412
                                       __doc__=fun.__doc__)
413
                    self._pending.append(ret)
414
                if _filt:
415
                    return _filt(ret)
416
                return ret
417
418
            return _create_task_cls
419
420
        if len(args) == 1:
421
            if callable(args[0]):
422
                return inner_create_task_cls(**opts)(*args)
423
            raise TypeError('argument 1 to @task() must be a callable')
424
        if args:
425
            raise TypeError(
426
                '@task() takes exactly 1 argument ({0} given)'.format(
427
                    sum([len(args), len(opts)])))
428
        return inner_create_task_cls(**opts)
429
430
    def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
431
        if not self.finalized and not self.autofinalize:
432
            raise RuntimeError('Contract breach: app not finalized')
433
        name = name or self.gen_task_name(fun.__name__, fun.__module__)
434
        base = base or self.Task
435
436
        if name not in self._tasks:
437
            run = fun if bind else staticmethod(fun)
438
            task = type(fun.__name__, (base,), dict({
439
                'app': self,
440
                'name': name,
441
                'run': run,
442
                '_decorated': True,
443
                '__doc__': fun.__doc__,
444
                '__module__': fun.__module__,
445
                '__header__': staticmethod(head_from_fun(fun, bound=bind)),
446
                '__wrapped__': run}, **options))()
447
            # for some reason __qualname__ cannot be set in type()
448
            # so we have to set it here.
449
            try:
450
                task.__qualname__ = fun.__qualname__
451
            except AttributeError:
452
                pass
453
            self._tasks[task.name] = task
454
            task.bind(self)  # connects task to this app
455
456
            autoretry_for = tuple(options.get('autoretry_for', ()))
457
            retry_kwargs = options.get('retry_kwargs', {})
458
            retry_backoff = int(options.get('retry_backoff', False))
459
            retry_backoff_max = int(options.get('retry_backoff_max', 600))
460
            retry_jitter = options.get('retry_jitter', True)
461
462
            if autoretry_for and not hasattr(task, '_orig_run'):
463
464
                @wraps(task.run)
465
                def run(*args, **kwargs):
466
                    try:
467
                        return task._orig_run(*args, **kwargs)
468
                    except autoretry_for as exc:
469
                        if retry_backoff:
470
                            retry_kwargs['countdown'] = \
471
                                get_exponential_backoff_interval(
472
                                    factor=retry_backoff,
473
                                    retries=task.request.retries,
474
                                    maximum=retry_backoff_max,
475
                                    full_jitter=retry_jitter)
476
                        raise task.retry(exc=exc, **retry_kwargs)
477
478
                task._orig_run, task.run = task.run, run
479
        else:
480
            task = self._tasks[name]
481
        return task
482
483
    def register_task(self, task):
484
        """Utility for registering a task-based class.
485
486
        Note:
487
            This is here for compatibility with old Celery 1.0
488
            style task classes, you should not need to use this for
489
            new projects.
490
        """
491
        if not task.name:
492
            task_cls = type(task)
493
            task.name = self.gen_task_name(
494
                task_cls.__name__, task_cls.__module__)
495
        self.tasks[task.name] = task
496
        task._app = self
497
        task.bind(self)
498
        return task
499
500
    def gen_task_name(self, name, module):
501
        return gen_task_name(self, name, module)
502
503
    def finalize(self, auto=False):
504
        """Finalize the app.
505
506
        This loads built-in tasks, evaluates pending task decorators,
507
        reads configuration, etc.
508
        """
509
        with self._finalize_mutex:
510
            if not self.finalized:
511
                if auto and not self.autofinalize:
512
                    raise RuntimeError('Contract breach: app not finalized')
513
                self.finalized = True
514
                _announce_app_finalized(self)
515
516
                pending = self._pending
517
                while pending:
518
                    maybe_evaluate(pending.popleft())
519
520
                for task in values(self._tasks):
521
                    task.bind(self)
522
523
                self.on_after_finalize.send(sender=self)
524
525
    def add_defaults(self, fun):
526
        """Add default configuration from dict ``d``.
527
528
        If the argument is a callable function then it will be regarded
529
        as a promise, and it won't be loaded until the configuration is
530
        actually needed.
531
532
        This method can be compared to:
533
534
        .. code-block:: pycon
535
536
            >>> celery.conf.update(d)
537
538
        with a difference that 1) no copy will be made and 2) the dict will
539
        not be transferred when the worker spawns child processes, so
540
        it's important that the same configuration happens at import time
541
        when pickle restores the object on the other side.
542
        """
543
        if not callable(fun):
544
            d, fun = fun, lambda: d
545
        if self.configured:
546
            return self._conf.add_defaults(fun())
547
        self._pending_defaults.append(fun)
548
549
    def config_from_object(self, obj,
550
                           silent=False, force=False, namespace=None):
551
        """Read configuration from object.
552
553
        Object is either an actual object or the name of a module to import.
554
555
        Example:
556
            >>> celery.config_from_object('myapp.celeryconfig')
557
558
            >>> from myapp import celeryconfig
559
            >>> celery.config_from_object(celeryconfig)
560
561
        Arguments:
562
            silent (bool): If true then import errors will be ignored.
563
            force (bool): Force reading configuration immediately.
564
                By default the configuration will be read only when required.
565
        """
566
        self._config_source = obj
567
        self.namespace = namespace or self.namespace
568
        if force or self.configured:
569
            self._conf = None
570
            if self.loader.config_from_object(obj, silent=silent):
571
                return self.conf
572
573
    def config_from_envvar(self, variable_name, silent=False, force=False):
574
        """Read configuration from environment variable.
575
576
        The value of the environment variable must be the name
577
        of a module to import.
578
579
        Example:
580
            >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
581
            >>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
582
        """
583
        module_name = os.environ.get(variable_name)
584
        if not module_name:
585
            if silent:
586
                return False
587
            raise ImproperlyConfigured(
588
                ERR_ENVVAR_NOT_SET.strip().format(variable_name))
589
        return self.config_from_object(module_name, silent=silent, force=force)
590
591
    def config_from_cmdline(self, argv, namespace='celery'):
592
        self._conf.update(
593
            self.loader.cmdline_config_parser(argv, namespace)
594
        )
595
596
    def setup_security(self, allowed_serializers=None, key=None, cert=None,
597
                       store=None, digest='sha1', serializer='json'):
598
        """Setup the message-signing serializer.
599
600
        This will affect all application instances (a global operation).
601
602
        Disables untrusted serializers and if configured to use the ``auth``
603
        serializer will register the ``auth`` serializer with the provided
604
        settings into the Kombu serializer registry.
605
606
        Arguments:
607
            allowed_serializers (Set[str]): List of serializer names, or
608
                content_types that should be exempt from being disabled.
609
            key (str): Name of private key file to use.
610
                Defaults to the :setting:`security_key` setting.
611
            cert (str): Name of certificate file to use.
612
                Defaults to the :setting:`security_certificate` setting.
613
            store (str): Directory containing certificates.
614
                Defaults to the :setting:`security_cert_store` setting.
615
            digest (str): Digest algorithm used when signing messages.
616
                Default is ``sha1``.
617
            serializer (str): Serializer used to encode messages after
618
                they've been signed.  See :setting:`task_serializer` for
619
                the serializers supported.  Default is ``json``.
620
        """
621
        from celery.security import setup_security
622
        return setup_security(allowed_serializers, key, cert,
623
                              store, digest, serializer, app=self)
624
625
    def autodiscover_tasks(self, packages=None,
626
                           related_name='tasks', force=False):
627
        """Auto-discover task modules.
628
629
        Searches a list of packages for a "tasks.py" module (or use
630
        related_name argument).
631
632
        If the name is empty, this will be delegated to fix-ups (e.g., Django).
633
634
        For example if you have a directory layout like this:
635
636
        .. code-block:: text
637
638
            foo/__init__.py
639
               tasks.py
640
               models.py
641
642
            bar/__init__.py
643
                tasks.py
644
                models.py
645
646
            baz/__init__.py
647
                models.py
648
649
        Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will
650
        result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
651
652
        Arguments:
653
            packages (List[str]): List of packages to search.
654
                This argument may also be a callable, in which case the
655
                value returned is used (for lazy evaluation).
656
            related_name (str): The name of the module to find.  Defaults
657
                to "tasks": meaning "look for 'module.tasks' for every
658
                module in ``packages``."
659
            force (bool): By default this call is lazy so that the actual
660
                auto-discovery won't happen until an application imports
661
                the default modules.  Forcing will cause the auto-discovery
662
                to happen immediately.
663
        """
664
        if force:
665
            return self._autodiscover_tasks(packages, related_name)
666
        signals.import_modules.connect(starpromise(
667
            self._autodiscover_tasks, packages, related_name,
668
        ), weak=False, sender=self)
669
670
    def _autodiscover_tasks(self, packages, related_name, **kwargs):
671
        if packages:
672
            return self._autodiscover_tasks_from_names(packages, related_name)
673
        return self._autodiscover_tasks_from_fixups(related_name)
674
675
    def _autodiscover_tasks_from_names(self, packages, related_name):
676
        # packages argument can be lazy
677
        return self.loader.autodiscover_tasks(
678
            packages() if callable(packages) else packages, related_name,
679
        )
680
681
    def _autodiscover_tasks_from_fixups(self, related_name):
682
        return self._autodiscover_tasks_from_names([
683
            pkg for fixup in self._fixups
684
            for pkg in fixup.autodiscover_tasks()
685
            if hasattr(fixup, 'autodiscover_tasks')
686
        ], related_name=related_name)
687
688
    def send_task(self, name, args=None, kwargs=None, countdown=None,
689
                  eta=None, task_id=None, producer=None, connection=None,
690
                  router=None, result_cls=None, expires=None,
691
                  publisher=None, link=None, link_error=None,
692
                  add_to_parent=True, group_id=None, retries=0, chord=None,
693
                  reply_to=None, time_limit=None, soft_time_limit=None,
694
                  root_id=None, parent_id=None, route_name=None,
695
                  shadow=None, chain=None, task_type=None, **options):
696
        """Send task by name.
697
698
        Supports the same arguments as :meth:`@-Task.apply_async`.
699
700
        Arguments:
701
            name (str): Name of task to call (e.g., `"tasks.add"`).
702
            result_cls (AsyncResult): Specify custom result class.
703
        """
704
        parent = have_parent = None
705
        amqp = self.amqp
706
        task_id = task_id or uuid()
707
        producer = producer or publisher  # XXX compat
708
        router = router or amqp.router
709
        conf = self.conf
710
        if conf.task_always_eager:  # pragma: no cover
711
            warnings.warn(AlwaysEagerIgnored(
712
                'task_always_eager has no effect on send_task',
713
            ), stacklevel=2)
714
715
        ignored_result = options.pop('ignore_result', False)
716
        options = router.route(
717
            options, route_name or name, args, kwargs, task_type)
718
719
        if not root_id or not parent_id:
720
            parent = self.current_worker_task
721
            if parent:
722
                if not root_id:
723
                    root_id = parent.request.root_id or parent.request.id
724
                if not parent_id:
725
                    parent_id = parent.request.id
726
727
        message = amqp.create_task_message(
728
            task_id, name, args, kwargs, countdown, eta, group_id,
729
            expires, retries, chord,
730
            maybe_list(link), maybe_list(link_error),
731
            reply_to or self.oid, time_limit, soft_time_limit,
732
            self.conf.task_send_sent_event,
733
            root_id, parent_id, shadow, chain,
734
            argsrepr=options.get('argsrepr'),
735
            kwargsrepr=options.get('kwargsrepr'),
736
        )
737
738
        if connection:
739
            producer = amqp.Producer(connection, auto_declare=False)
740
741
        with self.producer_or_acquire(producer) as P:
742
            with P.connection._reraise_as_library_errors():
743
                if not ignored_result:
744
                    self.backend.on_task_call(P, task_id)
745
                amqp.send_task_message(P, name, message, **options)
746
        result = (result_cls or self.AsyncResult)(task_id)
747
        # We avoid using the constructor since a custom result class
748
        # can be used, in which case the constructor may still use
749
        # the old signature.
750
        result.ignored = ignored_result
751
752
        if add_to_parent:
753
            if not have_parent:
754
                parent, have_parent = self.current_worker_task, True
755
            if parent:
756
                parent.add_trail(result)
757
        return result
758
759
    def connection_for_read(self, url=None, **kwargs):
760
        """Establish connection used for consuming.
761
762
        See Also:
763
            :meth:`connection` for supported arguments.
764
        """
765
        return self._connection(url or self.conf.broker_read_url, **kwargs)
766
767
    def connection_for_write(self, url=None, **kwargs):
768
        """Establish connection used for producing.
769
770
        See Also:
771
            :meth:`connection` for supported arguments.
772
        """
773
        return self._connection(url or self.conf.broker_write_url, **kwargs)
774
775
    def connection(self, hostname=None, userid=None, password=None,
776
                   virtual_host=None, port=None, ssl=None,
777
                   connect_timeout=None, transport=None,
778
                   transport_options=None, heartbeat=None,
779
                   login_method=None, failover_strategy=None, **kwargs):
780
        """Establish a connection to the message broker.
781
782
        Please use :meth:`connection_for_read` and
783
        :meth:`connection_for_write` instead, to convey the intent
784
        of use for this connection.
785
786
        Arguments:
787
            url: Either the URL or the hostname of the broker to use.
788
            hostname (str): URL, Hostname/IP-address of the broker.
789
                If a URL is used, then the other argument below will
790
                be taken from the URL instead.
791
            userid (str): Username to authenticate as.
792
            password (str): Password to authenticate with
793
            virtual_host (str): Virtual host to use (domain).
794
            port (int): Port to connect to.
795
            ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl`
796
                setting.
797
            transport (str): defaults to the :setting:`broker_transport`
798
                setting.
799
            transport_options (Dict): Dictionary of transport specific options.
800
            heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only).
801
            login_method (str): Custom login method to use (AMQP only).
802
            failover_strategy (str, Callable): Custom failover strategy.
803
            **kwargs: Additional arguments to :class:`kombu.Connection`.
804
805
        Returns:
806
            kombu.Connection: the lazy connection instance.
807
        """
808
        return self.connection_for_write(
809
            hostname or self.conf.broker_write_url,
810
            userid=userid, password=password,
811
            virtual_host=virtual_host, port=port, ssl=ssl,
812
            connect_timeout=connect_timeout, transport=transport,
813
            transport_options=transport_options, heartbeat=heartbeat,
814
            login_method=login_method, failover_strategy=failover_strategy,
815
            **kwargs
816
        )
817
818
    def _connection(self, url, userid=None, password=None,
819
                    virtual_host=None, port=None, ssl=None,
820
                    connect_timeout=None, transport=None,
821
                    transport_options=None, heartbeat=None,
822
                    login_method=None, failover_strategy=None, **kwargs):
823
        conf = self.conf
824
        return self.amqp.Connection(
825
            url,
826
            userid or conf.broker_user,
827
            password or conf.broker_password,
828
            virtual_host or conf.broker_vhost,
829
            port or conf.broker_port,
830
            transport=transport or conf.broker_transport,
831
            ssl=self.either('broker_use_ssl', ssl),
832
            heartbeat=heartbeat or self.conf.broker_heartbeat,
833
            login_method=login_method or conf.broker_login_method,
834
            failover_strategy=(
835
                failover_strategy or conf.broker_failover_strategy
836
            ),
837
            transport_options=dict(
838
                conf.broker_transport_options, **transport_options or {}
839
            ),
840
            connect_timeout=self.either(
841
                'broker_connection_timeout', connect_timeout
842
            ),
843
        )
844
    broker_connection = connection
845
846
    def _acquire_connection(self, pool=True):
847
        """Helper for :meth:`connection_or_acquire`."""
848
        if pool:
849
            return self.pool.acquire(block=True)
850
        return self.connection_for_write()
851
852
    def connection_or_acquire(self, connection=None, pool=True, *_, **__):
853
        """Context used to acquire a connection from the pool.
854
855
        For use within a :keyword:`with` statement to get a connection
856
        from the pool if one is not already provided.
857
858
        Arguments:
859
            connection (kombu.Connection): If not provided, a connection
860
                will be acquired from the connection pool.
861
        """
862
        return FallbackContext(connection, self._acquire_connection, pool=pool)
863
    default_connection = connection_or_acquire  # XXX compat
864
865
    def producer_or_acquire(self, producer=None):
866
        """Context used to acquire a producer from the pool.
867
868
        For use within a :keyword:`with` statement to get a producer
869
        from the pool if one is not already provided
870
871
        Arguments:
872
            producer (kombu.Producer): If not provided, a producer
873
                will be acquired from the producer pool.
874
        """
875
        return FallbackContext(
876
            producer, self.producer_pool.acquire, block=True,
877
        )
878
    default_producer = producer_or_acquire  # XXX compat
879
880
    def prepare_config(self, c):
881
        """Prepare configuration before it is merged with the defaults."""
882
        return find_deprecated_settings(c)
883
884
    def now(self):
885
        """Return the current time and date as a datetime."""
886
        now_in_utc = to_utc(datetime.utcnow())
887
        return now_in_utc.astimezone(self.timezone)
888
889
    def select_queues(self, queues=None):
890
        """Select subset of queues.
891
892
        Arguments:
893
            queues (Sequence[str]): a list of queue names to keep.
894
        """
895
        return self.amqp.queues.select(queues)
896
897
    def either(self, default_key, *defaults):
898
        """Get key from configuration or use default values.
899
900
        Fallback to the value of a configuration key if none of the
901
        `*values` are true.
902
        """
903
        return first(None, [
904
            first(None, defaults), starpromise(self.conf.get, default_key),
905
        ])
906
907
    def bugreport(self):
908
        """Return information useful in bug reports."""
909
        return bugreport(self)
910
911
    def _get_backend(self):
912
        backend, url = backends.by_url(
913
            self.backend_cls or self.conf.result_backend,
914
            self.loader)
915
        return backend(app=self, url=url)
916
917
    def _finalize_pending_conf(self):
918
        """Get config value by key and finalize loading the configuration.
919
920
        Note:
921
            This is used by PendingConfiguration:
922
                as soon as you access a key the configuration is read.
923
        """
924
        conf = self._conf = self._load_config()
925
        return conf
926
927
    def _load_config(self):
928
        if isinstance(self.on_configure, Signal):
929
            self.on_configure.send(sender=self)
930
        else:
931
            # used to be a method pre 4.0
932
            self.on_configure()
933
        if self._config_source:
934
            self.loader.config_from_object(self._config_source)
935
        self.configured = True
936
        settings = detect_settings(
937
            self.prepare_config(self.loader.conf), self._preconf,
938
            ignore_keys=self._preconf_set_by_auto, prefix=self.namespace,
939
        )
940
        if self._conf is not None:
941
            # replace in place, as someone may have referenced app.conf,
942
            # done some changes, accessed a key, and then try to make more
943
            # changes to the reference and not the finalized value.
944
            self._conf.swap_with(settings)
945
        else:
946
            self._conf = settings
947
948
        # load lazy config dict initializers.
949
        pending_def = self._pending_defaults
950
        while pending_def:
951
            self._conf.add_defaults(maybe_evaluate(pending_def.popleft()()))
952
953
        # load lazy periodic tasks
954
        pending_beat = self._pending_periodic_tasks
955
        while pending_beat:
956
            self._add_periodic_task(*pending_beat.popleft())
957
958
        self.on_after_configure.send(sender=self, source=self._conf)
959
        return self._conf
960
961
    def _after_fork(self):
962
        self._pool = None
963
        try:
964
            self.__dict__['amqp']._producer_pool = None
965
        except (AttributeError, KeyError):
966
            pass
967
        self.on_after_fork.send(sender=self)
968
969
    def signature(self, *args, **kwargs):
970
        """Return a new :class:`~celery.Signature` bound to this app."""
971
        kwargs['app'] = self
972
        return self._canvas.signature(*args, **kwargs)
973
974
    def add_periodic_task(self, schedule, sig,
975
                          args=(), kwargs=(), name=None, **opts):
976
        key, entry = self._sig_to_periodic_task_entry(
977
            schedule, sig, args, kwargs, name, **opts)
978
        if self.configured:
979
            self._add_periodic_task(key, entry)
980
        else:
981
            self._pending_periodic_tasks.append((key, entry))
982
        return key
983
984
    def _sig_to_periodic_task_entry(self, schedule, sig,
985
                                    args=(), kwargs={}, name=None, **opts):
986
        sig = (sig.clone(args, kwargs)
987
               if isinstance(sig, abstract.CallableSignature)
988
               else self.signature(sig.name, args, kwargs))
989
        return name or repr(sig), {
990
            'schedule': schedule,
991
            'task': sig.name,
992
            'args': sig.args,
993
            'kwargs': sig.kwargs,
994
            'options': dict(sig.options, **opts),
995
        }
996
997
    def _add_periodic_task(self, key, entry):
998
        self._conf.beat_schedule[key] = entry
999
1000
    def create_task_cls(self):
1001
        """Create a base task class bound to this app."""
1002
        return self.subclass_with_self(
1003
            self.task_cls, name='Task', attribute='_app',
1004
            keep_reduce=True, abstract=True,
1005
        )
1006
1007
    def subclass_with_self(self, Class, name=None, attribute='app',
1008
                           reverse=None, keep_reduce=False, **kw):
1009
        """Subclass an app-compatible class.
1010
1011
        App-compatible means that the class has a class attribute that
1012
        provides the default app it should use, for example:
1013
        ``class Foo: app = None``.
1014
1015
        Arguments:
1016
            Class (type): The app-compatible class to subclass.
1017
            name (str): Custom name for the target class.
1018
            attribute (str): Name of the attribute holding the app,
1019
                Default is 'app'.
1020
            reverse (str): Reverse path to this object used for pickling
1021
                purposes. For example, to get ``app.AsyncResult``,
1022
                use ``"AsyncResult"``.
1023
            keep_reduce (bool): If enabled a custom ``__reduce__``
1024
                implementation won't be provided.
1025
        """
1026
        Class = symbol_by_name(Class)
1027
        reverse = reverse if reverse else Class.__name__
1028
1029
        def __reduce__(self):
1030
            return _unpickle_appattr, (reverse, self.__reduce_args__())
1031
1032
        attrs = dict(
1033
            {attribute: self},
1034
            __module__=Class.__module__,
1035
            __doc__=Class.__doc__,
1036
            **kw)
1037
        if not keep_reduce:
1038
            attrs['__reduce__'] = __reduce__
1039
1040
        return type(bytes_if_py2(name or Class.__name__), (Class,), attrs)
1041
1042
    def _rgetattr(self, path):
1043
        return attrgetter(path)(self)
1044
1045
    def __enter__(self):
1046
        return self
1047
1048
    def __exit__(self, *exc_info):
1049
        self.close()
1050
1051
    def __repr__(self):
1052
        return '<{0} {1}>'.format(type(self).__name__, appstr(self))
1053
1054
    def __reduce__(self):
1055
        if self._using_v1_reduce:
1056
            return self.__reduce_v1__()
1057
        return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
1058
1059
    def __reduce_v1__(self):
1060
        # Reduce only pickles the configuration changes,
1061
        # so the default configuration doesn't have to be passed
1062
        # between processes.
1063
        return (
1064
            _unpickle_app,
1065
            (self.__class__, self.Pickler) + self.__reduce_args__(),
1066
        )
1067
1068
    def __reduce_keys__(self):
1069
        """Keyword arguments used to reconstruct the object when unpickling."""
1070
        return {
1071
            'main': self.main,
1072
            'changes':
1073
                self._conf.changes if self.configured else self._preconf,
1074
            'loader': self.loader_cls,
1075
            'backend': self.backend_cls,
1076
            'amqp': self.amqp_cls,
1077
            'events': self.events_cls,
1078
            'log': self.log_cls,
1079
            'control': self.control_cls,
1080
            'fixups': self.fixups,
1081
            'config_source': self._config_source,
1082
            'task_cls': self.task_cls,
1083
            'namespace': self.namespace,
1084
        }
1085
1086
    def __reduce_args__(self):
1087
        """Deprecated method, please use :meth:`__reduce_keys__` instead."""
1088
        return (self.main, self._conf.changes if self.configured else {},
1089
                self.loader_cls, self.backend_cls, self.amqp_cls,
1090
                self.events_cls, self.log_cls, self.control_cls,
1091
                False, self._config_source)
1092
1093
    @cached_property
1094
    def Worker(self):
1095
        """Worker application.
1096
1097
        See Also:
1098
            :class:`~@Worker`.
1099
        """
1100
        return self.subclass_with_self('celery.apps.worker:Worker')
1101
1102
    @cached_property
1103
    def WorkController(self, **kwargs):
1104
        """Embeddable worker.
1105
1106
        See Also:
1107
            :class:`~@WorkController`.
1108
        """
1109
        return self.subclass_with_self('celery.worker:WorkController')
1110
1111
    @cached_property
1112
    def Beat(self, **kwargs):
1113
        """:program:`celery beat` scheduler application.
1114
1115
        See Also:
1116
            :class:`~@Beat`.
1117
        """
1118
        return self.subclass_with_self('celery.apps.beat:Beat')
1119
1120
    @cached_property
1121
    def Task(self):
1122
        """Base task class for this app."""
1123
        return self.create_task_cls()
1124
1125
    @cached_property
1126
    def annotations(self):
1127
        return prepare_annotations(self.conf.task_annotations)
1128
1129
    @cached_property
1130
    def AsyncResult(self):
1131
        """Create new result instance.
1132
1133
        See Also:
1134
            :class:`celery.result.AsyncResult`.
1135
        """
1136
        return self.subclass_with_self('celery.result:AsyncResult')
1137
1138
    @cached_property
1139
    def ResultSet(self):
1140
        return self.subclass_with_self('celery.result:ResultSet')
1141
1142
    @cached_property
1143
    def GroupResult(self):
1144
        """Create new group result instance.
1145
1146
        See Also:
1147
            :class:`celery.result.GroupResult`.
1148
        """
1149
        return self.subclass_with_self('celery.result:GroupResult')
1150
1151
    @property
1152
    def pool(self):
1153
        """Broker connection pool: :class:`~@pool`.
1154
1155
        Note:
1156
            This attribute is not related to the workers concurrency pool.
1157
        """
1158
        if self._pool is None:
1159
            self._ensure_after_fork()
1160
            limit = self.conf.broker_pool_limit
1161
            pools.set_limit(limit)
1162
            self._pool = pools.connections[self.connection_for_write()]
1163
        return self._pool
1164
1165
    @property
1166
    def current_task(self):
1167
        """Instance of task being executed, or :const:`None`."""
1168
        return _task_stack.top
1169
1170
    @property
1171
    def current_worker_task(self):
1172
        """The task currently being executed by a worker or :const:`None`.
1173
1174
        Differs from :data:`current_task` in that it's not affected
1175
        by tasks calling other tasks directly, or eagerly.
1176
        """
1177
        return get_current_worker_task()
1178
1179
    @cached_property
1180
    def oid(self):
1181
        """Universally unique identifier for this app."""
1182
        # since 4.0: thread.get_ident() is not included when
1183
        # generating the process id.  This is due to how the RPC
1184
        # backend now dedicates a single thread to receive results,
1185
        # which would not work if each thread has a separate id.
1186
        return oid_from(self, threads=False)
1187
1188
    @cached_property
1189
    def amqp(self):
1190
        """AMQP related functionality: :class:`~@amqp`."""
1191
        return instantiate(self.amqp_cls, app=self)
1192
1193
    @cached_property
1194
    def backend(self):
1195
        """Current backend instance."""
1196
        return self._get_backend()
1197
1198
    @property
1199
    def conf(self):
1200
        """Current configuration."""
1201
        if self._conf is None:
1202
            self._conf = self._load_config()
1203
        return self._conf
1204
1205
    @conf.setter
1206
    def conf(self, d):  # noqa
1207
        self._conf = d
1208
1209
    @cached_property
1210
    def control(self):
1211
        """Remote control: :class:`~@control`."""
1212
        return instantiate(self.control_cls, app=self)
1213
1214
    @cached_property
1215
    def events(self):
1216
        """Consuming and sending events: :class:`~@events`."""
1217
        return instantiate(self.events_cls, app=self)
1218
1219
    @cached_property
1220
    def loader(self):
1221
        """Current loader instance."""
1222
        return get_loader_cls(self.loader_cls)(app=self)
1223
1224
    @cached_property
1225
    def log(self):
1226
        """Logging: :class:`~@log`."""
1227
        return instantiate(self.log_cls, app=self)
1228
1229
    @cached_property
1230
    def _canvas(self):
1231
        from celery import canvas
1232
        return canvas
1233
1234
    @cached_property
1235
    def tasks(self):
1236
        """Task registry.
1237
1238
        Warning:
1239
            Accessing this attribute will also auto-finalize the app.
1240
        """
1241
        self.finalize(auto=True)
1242
        return self._tasks
1243
1244
    @property
1245
    def producer_pool(self):
1246
        return self.amqp.producer_pool
1247
1248
    def uses_utc_timezone(self):
1249
        """Check if the application uses the UTC timezone."""
1250
        return self.timezone == timezone.utc
1251
1252
    @cached_property
1253
    def timezone(self):
1254
        """Current timezone for this app.
1255
1256
        This is a cached property taking the time zone from the
1257
        :setting:`timezone` setting.
1258
        """
1259
        conf = self.conf
1260
        if not conf.timezone:
1261
            if conf.enable_utc:
1262
                return timezone.utc
1263
            else:
1264
                return timezone.local
1265
        return timezone.get_timezone(conf.timezone)
1266
1267
1268
App = Celery  # noqa: E305 XXX compat
1269