Completed
Pull Request — master (#60)
by Paolo
06:42
created

common.tasks   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 258
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 24
eloc 114
dl 0
loc 258
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
A clearsessions() 0 16 1
B redis_lock() 0 52 6

7 Methods

Rating   Name   Duplication   Size   Complexity  
A BatchFailurelMixin.on_failure() 0 28 1
A ExclusiveTask.__init__() 0 6 2
A BaseTask.__init__() 0 8 3
A BaseTask.on_failure() 0 2 1
A BatchUpdateMixin.batch_update() 0 23 5
A ExclusiveTask.delay() 0 29 3
A BaseTask.debug_task() 0 4 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jan 15 16:42:24 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
11
from contextlib import contextmanager
12
from celery.five import monotonic
13
from celery.utils.log import get_task_logger
14
15
from django.conf import settings
16
from django.utils import timezone
17
from django.core import management
18
19
from image.celery import app as celery_app
20
from submissions.helpers import send_message
21
from validation.helpers import construct_validation_message
22
from common.constants import NEED_REVISION, ERROR
23
24
# Lock expires in 10 minutes
25
LOCK_EXPIRE = 60 * 10
26
27
# Get an instance of a logger
28
logger = get_task_logger(__name__)
29
30
31
class BaseTask(celery_app.Task):
32
    """Base class to celery tasks. Define logs for on_failure and debug_task"""
33
34
    name = None
35
    description = None
36
    action = None
37
38
    def __init__(self, *args, **kwargs):
39
        super().__init__(*args, **kwargs)
40
41
        if self.name is None:
42
            self.name = str(self.__class__)
43
44
        if self.action is None:
45
            self.action = str(self.__class__)
46
47
    def on_failure(self, exc, task_id, args, kwargs, einfo):
48
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
49
50
    def debug_task(self):
51
        # this doesn't throw an error when debugging a task called with run()
52
        if self.request_stack:
53
            logger.debug('Request: {0!r}'.format(self.request))
54
55
56
# https://stackoverflow.com/a/51429597
57
@celery_app.task(bind=True, base=BaseTask)
58
def clearsessions(self):
59
    """Cleanup expired sessions by using Django management command."""
60
61
    logger.info("Clearing session with celery...")
62
63
    # debugging instance
64
    self.debug_task()
65
66
    # calling management command
67
    management.call_command("clearsessions", verbosity=1)
68
69
    # debug
70
    logger.info("Sessions cleaned!")
71
72
    return "Session cleaned with success"
73
74
75
class BatchFailurelMixin():
76
    """Common mixin for batch task failure. Need to setup ``batch_type``
77
    (update/delete) and ``model_type`` (animal/sample)
78
    """
79
80
    batch_type = None
81
    model_type = None
82
    submission_cls = None
83
84
    # Ovverride default on failure method
85
    # This is not a failed validation for a wrong value, this is an
86
    # error in task that mean an error in coding
87
    def on_failure(self, exc, task_id, args, kwargs, einfo):
88
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
89
90
        submission_id = args[0]
91
92
        logger.error(
93
            ("%s called with %s" % (self.name, args))
94
        )
95
96
        # get submission object
97
        submission_obj = self.submission_cls.objects.get(pk=submission_id)
98
99
        # mark submission with ERROR
100
        submission_obj.status = ERROR
101
        submission_obj.message = (
102
            "Error in %s batch %s: %s" % (
103
                self.model_type, self.batch_type, str(exc)))
104
        submission_obj.save()
105
106
        send_message(submission_obj)
107
108
        # send a mail to the user with the stacktrace (einfo)
109
        submission_obj.owner.email_user(
110
            "Error in %s batch %s for submission: %s" % (
111
                self.model_type, self.batch_type, submission_obj.id),
112
            ("Something goes wrong in batch %s for %ss. Please report "
113
             "this to InjectTool team\n\n %s" % (
114
                self.model_type, self.batch_type, str(einfo))),
115
        )
116
117
        # TODO: submit mail to admin
118
119
120
class BatchUpdateMixin:
121
    """Mixin to do batch update of fields to fix validation"""
122
123
    item_cls = None
124
    submission_cls = None
125
126
    def batch_update(self, submission_id, ids, attribute):
127
        for id_, value in ids.items():
128
            if value == '' or value == 'None':
129
                value = None
130
131
            item_object = self.item_cls.objects.get(pk=id_)
132
133
            if getattr(item_object, attribute) != value:
134
                setattr(item_object, attribute, value)
135
                item_object.save()
136
137
                # update name object
138
                item_object.name.last_changed = timezone.now()
139
                item_object.name.save()
140
141
        # Update submission
142
        submission_obj = self.submission_cls.objects.get(pk=submission_id)
143
        submission_obj.status = NEED_REVISION
144
        submission_obj.message = "Data updated, try to rerun validation"
145
        submission_obj.save()
146
147
        send_message(
148
            submission_obj, construct_validation_message(submission_obj)
149
        )
150
151
152
@contextmanager
153
def redis_lock(lock_id, blocking=False, expire=True):
154
    """
155
    This function get a lock relying on a lock name and other status. You
156
    can describe more process using the same lock name and give exclusive
157
    access to one of them.
158
159
    Args:
160
        lock_id (str): the name of the lock to take
161
        blocking (bool): if True, we wait until we have the block, if False
162
            we returns immediately False
163
        expire (bool): if True, lock will expire after LOCK_EXPIRE timeout,
164
            if False, it will persist until lock is released
165
166
    Returns:
167
        bool: True if lock acquired, False otherwise
168
    """
169
170
    # read parameters from settings
171
    REDIS_CLIENT = redis.StrictRedis(
172
        host=settings.REDIS_HOST,
173
        port=settings.REDIS_PORT,
174
        db=settings.REDIS_DB)
175
176
    # this will be the redis lock
177
    lock = None
178
179
    # timeout for the lock (if expire condition)
180
    timeout_at = monotonic() + LOCK_EXPIRE - 3
181
182
    if expire:
183
        lock = REDIS_CLIENT.lock(lock_id, timeout=LOCK_EXPIRE)
184
185
    else:
186
        lock = REDIS_CLIENT.lock(lock_id, timeout=None)
187
188
    status = lock.acquire(blocking=blocking)
189
190
    try:
191
        logger.debug("lock %s acquired is: %s" % (lock_id, status))
192
        yield status
193
194
    finally:
195
        # we take advantage of using add() for atomic locking
196
        # don't release the lock if we didn't acquire it
197
        if status and ((monotonic() < timeout_at and expire) or not expire):
198
            logger.debug("Releasing lock %s" % lock_id)
199
            # don't release the lock if we exceeded the timeout
200
            # to lessen the chance of releasing an expired lock
201
            # owned by someone else
202
            # if no timeout and lock is taken, release it
203
            lock.release()
204
205
206
class ExclusiveTask(BaseTask):
207
    """A class to execute an exclusive task (run this task once, others
208
    task calls will return already running message without calling task or
209
    will wait until other tasks of this type are completed)
210
211
    Args:
212
        blocking (bool): set task as blocking (wait until no other tasks
213
            are running. def. False)
214
        lock_expire (bool): define if lock will expire or not after a
215
            certain time (def. False)
216
    """
217
218
    lock_id = None
219
    blocking = False
220
    lock_expire = False
221
222
    def __init__(self, *args, **kwargs):
223
        super().__init__(*args, **kwargs)
224
225
        if self.lock_id is None:
226
            # add a lock id as a name
227
            self.lock_id = self.name
228
229
    def delay(self, *args, **kwargs):
230
        """Star argument version of :meth:`apply_async`.
231
232
        Does not support the extra options enabled by :meth:`apply_async`.
233
234
        Arguments:
235
            *args (Any): Positional arguments passed on to the task.
236
            **kwargs (Any): Keyword arguments passed on to the task.
237
238
        Returns:
239
            celery.result.AsyncResult: Future promise.
240
        """
241
242
        # forcing blocking condition: Wait until a get a lock object
243
        with redis_lock(
244
                self.lock_id,
245
                blocking=self.blocking,
246
                expire=self.lock_expire) as acquired:
247
248
            if acquired:
249
                # do stuff and return something
250
                return self.apply_async(args, kwargs)
251
252
            else:
253
                # warn user and return a default message
254
                message = "%s already running!" % (self.name)
255
                logger.warning(message)
256
257
                return message
258