Completed
Push — master ( 2c16e2...2c16e2 )
by Paolo
13s queued 11s
created

common.tasks.BaseTask.__init__()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 3
nop 3
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jan 15 16:42:24 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import redis
10
import traceback
11
12
from contextlib import contextmanager
13
from celery.five import monotonic
14
from celery.utils.log import get_task_logger
15
16
from django.conf import settings
17
from django.core import management
18
19
from image.celery import app as celery_app
20
21
from .helpers import send_mail_to_admins
22
23
# Lock expires in 10 minutes
24
LOCK_EXPIRE = 60 * 10
25
26
# Get an instance of a logger
27
logger = get_task_logger(__name__)
28
29
30
class BaseTask(celery_app.Task):
31
    """Base class to celery tasks. Define logs for on_failure and debug_task"""
32
33
    name = None
34
    description = None
35
    action = None
36
37
    def __init__(self, *args, **kwargs):
38
        super().__init__(*args, **kwargs)
39
40
        if self.name is None:
41
            self.name = str(self.__class__)
42
43
        if self.action is None:
44
            self.action = str(self.__class__)
45
46
    def on_failure(self, exc, task_id, args, kwargs, einfo):
47
        logger.error('{0!r} failed: {1!r}'.format(task_id, exc))
48
49
    def debug_task(self):
50
        # this doesn't throw an error when debugging a task called with run()
51
        if self.request_stack:
52
            logger.debug('Request: {0!r}'.format(self.request))
53
54
55
class NotifyAdminTaskMixin():
56
    """A mixin to send error message to admins"""
57
58
    action = None
59
60
    def on_failure(self, exc, task_id, args, kwargs, einfo):
61
        """Override the default on_failure method"""
62
63
        # call base class
64
        super().on_failure(exc, task_id, args, kwargs, einfo)
65
66
        # get exception info
67
        einfo = traceback.format_exc()
68
69
        subject = "Error in %s" % (self.action)
70
        body = str(einfo)
71
72
        send_mail_to_admins(subject, body)
73
74
75
# https://stackoverflow.com/a/51429597
76
@celery_app.task(bind=True, base=BaseTask)
77
def clearsessions(self):
78
    """Cleanup expired sessions by using Django management command."""
79
80
    logger.info("Clearing session with celery...")
81
82
    # debugging instance
83
    self.debug_task()
84
85
    # calling management command
86
    management.call_command("clearsessions", verbosity=1)
87
88
    # debug
89
    logger.info("Sessions cleaned!")
90
91
    return "Session cleaned with success"
92
93
94
@contextmanager
95
def redis_lock(lock_id, blocking=False, expire=True):
96
    """
97
    This function get a lock relying on a lock name and other status. You
98
    can describe more process using the same lock name and give exclusive
99
    access to one of them.
100
101
    Args:
102
        lock_id (str): the name of the lock to take
103
        blocking (bool): if True, we wait until we have the block, if False
104
            we returns immediately False
105
        expire (bool): if True, lock will expire after LOCK_EXPIRE timeout,
106
            if False, it will persist until lock is released
107
108
    Returns:
109
        bool: True if lock acquired, False otherwise
110
    """
111
112
    # read parameters from settings
113
    REDIS_CLIENT = redis.StrictRedis(
114
        host=settings.REDIS_HOST,
115
        port=settings.REDIS_PORT,
116
        db=settings.REDIS_DB)
117
118
    # this will be the redis lock
119
    lock = None
120
121
    # timeout for the lock (if expire condition)
122
    timeout_at = monotonic() + LOCK_EXPIRE - 3
123
124
    if expire:
125
        lock = REDIS_CLIENT.lock(lock_id, timeout=LOCK_EXPIRE)
126
127
    else:
128
        lock = REDIS_CLIENT.lock(lock_id, timeout=None)
129
130
    status = lock.acquire(blocking=blocking)
131
132
    try:
133
        logger.debug("lock %s acquired is: %s" % (lock_id, status))
134
        yield status
135
136
    finally:
137
        # we take advantage of using add() for atomic locking
138
        # don't release the lock if we didn't acquire it
139
        if status and ((monotonic() < timeout_at and expire) or not expire):
140
            logger.debug("Releasing lock %s" % lock_id)
141
            # don't release the lock if we exceeded the timeout
142
            # to lessen the chance of releasing an expired lock
143
            # owned by someone else
144
            # if no timeout and lock is taken, release it
145
            lock.release()
146
147
148
class ExclusiveTask(BaseTask):
149
    """A class to execute an exclusive task (run this task once, others
150
    task calls will return already running message without calling task or
151
    will wait until other tasks of this type are completed)
152
153
    Args:
154
        blocking (bool): set task as blocking (wait until no other tasks
155
            are running. def. False)
156
        lock_expire (bool): define if lock will expire or not after a
157
            certain time (def. False)
158
    """
159
160
    lock_id = None
161
    blocking = False
162
    lock_expire = False
163
164
    def __init__(self, *args, **kwargs):
165
        super().__init__(*args, **kwargs)
166
167
        if self.lock_id is None:
168
            # add a lock id as a name
169
            self.lock_id = self.name
170
171
    def delay(self, *args, **kwargs):
172
        """Star argument version of :meth:`apply_async`.
173
174
        Does not support the extra options enabled by :meth:`apply_async`.
175
176
        Arguments:
177
            *args (Any): Positional arguments passed on to the task.
178
            **kwargs (Any): Keyword arguments passed on to the task.
179
180
        Returns:
181
            celery.result.AsyncResult: Future promise.
182
        """
183
184
        # forcing blocking condition: Wait until a get a lock object
185
        with redis_lock(
186
                self.lock_id,
187
                blocking=self.blocking,
188
                expire=self.lock_expire) as acquired:
189
190
            if acquired:
191
                # do stuff and return something
192
                return self.apply_async(args, kwargs)
193
194
            else:
195
                # warn user and return a default message
196
                message = "%s already running!" % (self.name)
197
                logger.warning(message)
198
199
                return message
200