Completed
Push — master ( 2c16e2...28a4a1 )
by Paolo
17s queued 14s
created

common.tasks.exclusive_task.__call__()   A

Complexity

Conditions 3

Size

Total Lines 29
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 29
rs 9.65
c 0
b 0
f 0
cc 3
nop 2
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jan 15 16:42:24 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
12
from contextlib import contextmanager
13
from celery.five import monotonic
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.core import management
18
19
from image.celery import app as celery_app
20
21
from .helpers import send_mail_to_admins
22
23
# Lock expires in 10 minutes
24
LOCK_EXPIRE = 60 * 10
25
26
# Get an instance of a logger
27
logger = get_task_logger(__name__)
28
29
30
class BaseTask(celery_app.Task):
31
    """Base class to celery tasks. Define logs for on_failure and debug_task"""
32
33
    name = None
34
    description = None
35
    action = None
36
37
    def __init__(self, *args, **kwargs):
38
        super().__init__(*args, **kwargs)
39
40
        if self.name is None:
41
            self.name = str(self.__class__)
42
43
        if self.action is None:
44
            self.action = str(self.__class__)
45
46
    def on_failure(self, exc, task_id, args, kwargs, einfo):
47
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
48
49
    def debug_task(self):
50
        # this doesn't throw an error when debugging a task called with run()
51
        if self.request_stack:
52
            logger.debug('Request: {0!r}'.format(self.request))
53
54
55
class NotifyAdminTaskMixin():
56
    """A mixin to send error message to admins"""
57
58
    action = None
59
60
    def on_failure(self, exc, task_id, args, kwargs, einfo):
61
        """Override the default on_failure method"""
62
63
        # call base class
64
        super().on_failure(exc, task_id, args, kwargs, einfo)
65
66
        # get exception info
67
        einfo = traceback.format_exc()
68
69
        subject = "Error in %s" % (self.action)
70
        body = str(einfo)
71
72
        send_mail_to_admins(subject, body)
73
74
75
# https://stackoverflow.com/a/51429597
76
@celery_app.task(bind=True, base=BaseTask)
77
def clearsessions(self):
78
    """Cleanup expired sessions by using Django management command."""
79
80
    logger.info("Clearing session with celery...")
81
82
    # debugging instance
83
    self.debug_task()
84
85
    # calling management command
86
    management.call_command("clearsessions", verbosity=1)
87
88
    # debug
89
    logger.info("Sessions cleaned!")
90
91
    return "Session cleaned with success"
92
93
94
@contextmanager
95
def redis_lock(lock_id, blocking=False, expire=True):
96
    """
97
    This function get a lock relying on a lock name and other status. You
98
    can describe more process using the same lock name and give exclusive
99
    access to one of them.
100
101
    Args:
102
        lock_id (str): the name of the lock to take
103
        blocking (bool): if True, we wait until we have the block, if False
104
            we returns immediately False
105
        expire (bool): if True, lock will expire after LOCK_EXPIRE timeout,
106
            if False, it will persist until lock is released
107
108
    Returns:
109
        bool: True if lock acquired, False otherwise
110
    """
111
112
    # read parameters from settings
113
    REDIS_CLIENT = redis.StrictRedis(
114
        host=settings.REDIS_HOST,
115
        port=settings.REDIS_PORT,
116
        db=settings.REDIS_DB)
117
118
    # this will be the redis lock
119
    lock = None
120
121
    # timeout for the lock (if expire condition)
122
    timeout_at = monotonic() + LOCK_EXPIRE - 3
123
124
    if expire:
125
        lock = REDIS_CLIENT.lock(lock_id, timeout=LOCK_EXPIRE)
126
127
    else:
128
        lock = REDIS_CLIENT.lock(lock_id, timeout=None)
129
130
    status = lock.acquire(blocking=blocking)
131
132
    try:
133
        logger.debug("lock %s acquired is: %s" % (lock_id, status))
134
        yield status
135
136
    finally:
137
        # we take advantage of using add() for atomic locking
138
        # don't release the lock if we didn't acquire it
139
        if status and ((monotonic() < timeout_at and expire) or not expire):
140
            logger.debug("Releasing lock %s" % lock_id)
141
            # don't release the lock if we exceeded the timeout
142
            # to lessen the chance of releasing an expired lock
143
            # owned by someone else
144
            # if no timeout and lock is taken, release it
145
            lock.release()
146
147
148
class exclusive_task(object):
149
    """A class decorator to execute an exclusive task by decorating
150
    celery.tasks.Task.run (run this task once, others
151
    task calls will return already running message without calling task or
152
    will wait until other tasks of this type are completed)
153
154
    Args:
155
        task_name (str): task name used for debug
156
        lock_id (str): the task lock id
157
        blocking (bool): set task as blocking (wait until no other tasks
158
            are running. def. False)
159
        lock_expire (bool): define if lock will expire or not after a
160
            certain time (def. False)
161
    """
162
163
    def __init__(self, task_name, lock_id, blocking=False, block_expire=False):
164
165
        """
166
        If there are decorator arguments, the function
167
        to be decorated is not passed to the constructor!
168
        """
169
        logger.debug("Setting up ExclusiveTaskDecorator")
170
171
        self.task_name = task_name
172
        self.lock_id = lock_id
173
        self.blocking = blocking
174
        self.block_expire = block_expire
175
176
    def __call__(self, f):
177
        """
178
        If there are decorator arguments, __call__() is only called
179
        once, as part of the decoration process! You can only give
180
        it a single argument, which is the function object.
181
        """
182
        logger.debug("Decorating function")
183
184
        def wrapped_f(*args, **kwargs):
185
            with redis_lock(
186
                    self.lock_id,
187
                    self.blocking,
188
                    self.block_expire) as acquired:
189
                if acquired:
190
                    logger.debug("lock %s aquired")
191
192
                    # do stuff and return something
193
                    result = f(*args, **kwargs)
194
195
                    logger.debug("lock %s released")
196
197
                else:
198
                    # warn user and return a default message
199
                    result = "%s already running!" % (self.task_name)
200
                    logger.warning(result)
201
202
            return result
203
204
        return wrapped_f
205