Completed
Branch devel (51448d)
by Paolo
06:33
created

common.tasks.cleanupregistration()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 16
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jan 15 16:42:24 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
12
from contextlib import contextmanager
13
from celery.five import monotonic
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.core import management
18
19
from image.celery import app as celery_app
20
21
from .helpers import send_mail_to_admins
22
23
# Lock expires in 10 minutes
24
LOCK_EXPIRE = 60 * 10
25
26
# Get an instance of a logger
27
logger = get_task_logger(__name__)
28
29
30
class BaseTask(celery_app.Task):
31
    """Base class to celery tasks. Define logs for on_failure and debug_task"""
32
33
    name = None
34
    description = None
35
    action = None
36
37
    def __init__(self, *args, **kwargs):
38
        super().__init__(*args, **kwargs)
39
40
        if self.name is None:
41
            self.name = str(self.__class__)
42
43
        if self.action is None:
44
            self.action = str(self.__class__)
45
46
    def on_failure(self, exc, task_id, args, kwargs, einfo):
47
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
48
49
    def debug_task(self):
50
        # this doesn't throw an error when debugging a task called with run()
51
        if self.request_stack:
52
            logger.debug('Request: {0!r}'.format(self.request))
53
54
55
class NotifyAdminTaskMixin():
56
    """A mixin to send error message to admins"""
57
58
    action = None
59
60
    def on_failure(self, exc, task_id, args, kwargs, einfo):
61
        """Override the default on_failure method"""
62
63
        # call base class
64
        super().on_failure(exc, task_id, args, kwargs, einfo)
65
66
        # get exception info
67
        einfo = traceback.format_exc()
68
69
        subject = "Error in %s" % (self.action)
70
        body = str(einfo)
71
72
        send_mail_to_admins(subject, body)
73
74
75
# https://stackoverflow.com/a/51429597
76
@celery_app.task(bind=True, base=BaseTask)
77
def clearsessions(self):
78
    """Cleanup expired sessions by using Django management command."""
79
80
    logger.info("Clearing session with celery...")
81
82
    # debugging instance
83
    self.debug_task()
84
85
    # calling management command
86
    management.call_command("clearsessions", verbosity=1)
87
88
    # debug
89
    logger.info("Sessions cleaned!")
90
91
    return "Sessions cleaned with success"
92
93
94
@celery_app.task(bind=True, base=BaseTask)
95
def cleanupregistration(self):
96
    """Cleanup expired registration keys by using Django management command."""
97
98
    logger.info("Cleaning up expired registration keys with celery...")
99
100
    # debugging instance
101
    self.debug_task()
102
103
    # calling management command
104
    management.call_command("cleanupregistration", verbosity=1)
105
106
    # debug
107
    logger.info("Registrations cleaned!")
108
109
    return "Registrations cleaned with success"
110
111
112
@contextmanager
113
def redis_lock(lock_id, blocking=False, expire=True):
114
    """
115
    This function get a lock relying on a lock name and other status. You
116
    can describe more process using the same lock name and give exclusive
117
    access to one of them.
118
119
    Args:
120
        lock_id (str): the name of the lock to take
121
        blocking (bool): if True, we wait until we have the block, if False
122
            we returns immediately False
123
        expire (bool): if True, lock will expire after LOCK_EXPIRE timeout,
124
            if False, it will persist until lock is released
125
126
    Returns:
127
        bool: True if lock acquired, False otherwise
128
    """
129
130
    # read parameters from settings
131
    REDIS_CLIENT = redis.StrictRedis(
132
        host=settings.REDIS_HOST,
133
        port=settings.REDIS_PORT,
134
        db=settings.REDIS_DB)
135
136
    # this will be the redis lock
137
    lock = None
138
139
    # timeout for the lock (if expire condition)
140
    timeout_at = monotonic() + LOCK_EXPIRE - 3
141
142
    if expire:
143
        lock = REDIS_CLIENT.lock(lock_id, timeout=LOCK_EXPIRE)
144
145
    else:
146
        lock = REDIS_CLIENT.lock(lock_id, timeout=None)
147
148
    status = lock.acquire(blocking=blocking)
149
150
    try:
151
        logger.debug("lock %s acquired is: %s" % (lock_id, status))
152
        yield status
153
154
    finally:
155
        # we take advantage of using add() for atomic locking
156
        # don't release the lock if we didn't acquire it
157
        if status and ((monotonic() < timeout_at and expire) or not expire):
158
            logger.debug("Releasing lock %s" % lock_id)
159
            # don't release the lock if we exceeded the timeout
160
            # to lessen the chance of releasing an expired lock
161
            # owned by someone else
162
            # if no timeout and lock is taken, release it
163
            lock.release()
164
165
166
class exclusive_task(object):
167
    """A class decorator to execute an exclusive task by decorating
168
    celery.tasks.Task.run (run this task once, others
169
    task calls will return already running message without calling task or
170
    will wait until other tasks of this type are completed)
171
172
    Args:
173
        task_name (str): task name used for debug
174
        lock_id (str): the task lock id
175
        blocking (bool): set task as blocking (wait until no other tasks
176
            are running. def. False)
177
        lock_expire (bool): define if lock will expire or not after a
178
            certain time (def. False)
179
    """
180
181
    def __init__(self, task_name, lock_id, blocking=False, block_expire=False):
182
183
        """
184
        If there are decorator arguments, the function
185
        to be decorated is not passed to the constructor!
186
        """
187
        logger.debug("Setting up ExclusiveTaskDecorator")
188
189
        self.task_name = task_name
190
        self.lock_id = lock_id
191
        self.blocking = blocking
192
        self.block_expire = block_expire
193
194
    def __call__(self, f):
195
        """
196
        If there are decorator arguments, __call__() is only called
197
        once, as part of the decoration process! You can only give
198
        it a single argument, which is the function object.
199
        """
200
        logger.debug("Decorating function")
201
202
        def wrapped_f(*args, **kwargs):
203
            with redis_lock(
204
                    self.lock_id,
205
                    self.blocking,
206
                    self.block_expire) as acquired:
207
                if acquired:
208
                    logger.debug("lock %s acquired" % self.lock_id)
209
210
                    # do stuff and return something
211
                    result = f(*args, **kwargs)
212
213
                    logger.debug("lock %s released" % self.lock_id)
214
215
                else:
216
                    # warn user and return a default message
217
                    result = "%s already running!" % (self.task_name)
218
                    logger.warning(result)
219
220
            return result
221
222
        return wrapped_f
223