unlock_chord()   D
last analyzed

Complexity

Conditions 12

Size

Total Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
dl 0
loc 48
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like unlock_chord() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
"""Built-in Tasks.
3
4
The built-in tasks are always available in all app instances.
5
"""
6
from __future__ import absolute_import, unicode_literals
7
8
from celery._state import connect_on_app_finalize
9
from celery.utils.log import get_logger
10
11
__all__ = ()
12
logger = get_logger(__name__)
13
14
15
@connect_on_app_finalize
16
def add_backend_cleanup_task(app):
17
    """Task used to clean up expired results.
18
19
    If the configured backend requires periodic cleanup this task is also
20
    automatically configured to run every day at 4am (requires
21
    :program:`celery beat` to be running).
22
    """
23
    @app.task(name='celery.backend_cleanup', shared=False, lazy=False)
24
    def backend_cleanup():
25
        app.backend.cleanup()
26
    return backend_cleanup
27
28
29
@connect_on_app_finalize
30
def add_accumulate_task(app):
31
    """Task used by Task.replace when replacing task with group."""
32
    @app.task(bind=True, name='celery.accumulate', shared=False, lazy=False)
33
    def accumulate(self, *args, **kwargs):
34
        index = kwargs.get('index')
35
        return args[index] if index is not None else args
36
    return accumulate
37
38
39
@connect_on_app_finalize
40
def add_unlock_chord_task(app):
41
    """Task used by result backends without native chord support.
42
43
    Will joins chord by creating a task chain polling the header
44
    for completion.
45
    """
46
    from celery.canvas import maybe_signature
47
    from celery.exceptions import ChordError
48
    from celery.result import allow_join_result, result_from_tuple
49
50
    @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
51
              default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
52
    def unlock_chord(self, group_id, callback, interval=None,
53
                     max_retries=None, result=None,
54
                     Result=app.AsyncResult, GroupResult=app.GroupResult,
55
                     result_from_tuple=result_from_tuple, **kwargs):
56
        if interval is None:
57
            interval = self.default_retry_delay
58
59
        # check if the task group is ready, and if so apply the callback.
60
        callback = maybe_signature(callback, app)
61
        deps = GroupResult(
62
            group_id,
63
            [result_from_tuple(r, app=app) for r in result],
64
            app=app,
65
        )
66
        j = deps.join_native if deps.supports_native_join else deps.join
67
68
        try:
69
            ready = deps.ready()
70
        except Exception as exc:
71
            raise self.retry(
72
                exc=exc, countdown=interval, max_retries=max_retries,
73
            )
74
        else:
75
            if not ready:
76
                raise self.retry(countdown=interval, max_retries=max_retries)
77
78
        callback = maybe_signature(callback, app=app)
79
        try:
80
            with allow_join_result():
81
                ret = j(timeout=3.0, propagate=True)
82
        except Exception as exc:  # pylint: disable=broad-except
83
            try:
84
                culprit = next(deps._failed_join_report())
85
                reason = 'Dependency {0.id} raised {1!r}'.format(culprit, exc)
86
            except StopIteration:
87
                reason = repr(exc)
88
            logger.exception('Chord %r raised: %r', group_id, exc)
89
            app.backend.chord_error_from_stack(callback, ChordError(reason))
90
        else:
91
            try:
92
                callback.delay(ret)
93
            except Exception as exc:  # pylint: disable=broad-except
94
                logger.exception('Chord %r raised: %r', group_id, exc)
95
                app.backend.chord_error_from_stack(
96
                    callback,
97
                    exc=ChordError('Callback error: {0!r}'.format(exc)),
98
                )
99
    return unlock_chord
100
101
102
@connect_on_app_finalize
103
def add_map_task(app):
104
    from celery.canvas import signature
105
106
    @app.task(name='celery.map', shared=False, lazy=False)
107
    def xmap(task, it):
108
        task = signature(task, app=app).type
109
        return [task(item) for item in it]
110
    return xmap
111
112
113
@connect_on_app_finalize
114
def add_starmap_task(app):
115
    from celery.canvas import signature
116
117
    @app.task(name='celery.starmap', shared=False, lazy=False)
118
    def xstarmap(task, it):
119
        task = signature(task, app=app).type
120
        return [task(*item) for item in it]
121
    return xstarmap
122
123
124
@connect_on_app_finalize
125
def add_chunk_task(app):
126
    from celery.canvas import chunks as _chunks
127
128
    @app.task(name='celery.chunks', shared=False, lazy=False)
129
    def chunks(task, it, n):
130
        return _chunks.apply_chunks(task, it, n)
131
    return chunks
132
133
134
@connect_on_app_finalize
135
def add_group_task(app):
136
    """No longer used, but here for backwards compatibility."""
137
    from celery.canvas import maybe_signature
138
    from celery.result import result_from_tuple
139
140
    @app.task(name='celery.group', bind=True, shared=False, lazy=False)
141
    def group(self, tasks, result, group_id, partial_args, add_to_parent=True):
142
        app = self.app
143
        result = result_from_tuple(result, app)
144
        # any partial args are added to all tasks in the group
145
        taskit = (maybe_signature(task, app=app).clone(partial_args)
146
                  for i, task in enumerate(tasks))
147
        with app.producer_or_acquire() as producer:
148
            [stask.apply_async(group_id=group_id, producer=producer,
149
                               add_to_parent=False) for stask in taskit]
150
        parent = app.current_worker_task
151
        if add_to_parent and parent:
152
            parent.add_trail(result)
153
        return result
154
    return group
155
156
157
@connect_on_app_finalize
158
def add_chain_task(app):
159
    """No longer used, but here for backwards compatibility."""
160
    @app.task(name='celery.chain', shared=False, lazy=False)
161
    def chain(*args, **kwargs):
162
        raise NotImplementedError('chain is not a real task')
163
    return chain
164
165
166
@connect_on_app_finalize
167
def add_chord_task(app):
168
    """No longer used, but here for backwards compatibility."""
169
    from celery import group, chord as _chord
170
    from celery.canvas import maybe_signature
171
172
    @app.task(name='celery.chord', bind=True, ignore_result=False,
173
              shared=False, lazy=False)
174
    def chord(self, header, body, partial_args=(), interval=None,
175
              countdown=1, max_retries=None, eager=False, **kwargs):
176
        app = self.app
177
        # - convert back to group if serialized
178
        tasks = header.tasks if isinstance(header, group) else header
179
        header = group([
180
            maybe_signature(s, app=app) for s in tasks
181
        ], app=self.app)
182
        body = maybe_signature(body, app=app)
183
        ch = _chord(header, body)
184
        return ch.run(header, body, partial_args, app, interval,
185
                      countdown, max_retries, **kwargs)
186
    return chord
187