Completed
Push — master ( e6dbce...addc19 )
by Roy
02:40
created

TaskQueue.delete()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
c 0
b 0
f 0
dl 0
loc 14
rs 8.5454
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 13:12:10
7
8
import time
9
import heapq
10
import logging
11
import threading
12
try:
13
    from UserDict import DictMixin
14
except ImportError:
15
    from collections import Mapping as DictMixin
16
from .token_bucket import Bucket
17
from six.moves import queue as Queue
18
19
logger = logging.getLogger('scheduler')
20
21
try:
22
    cmp
23
except NameError:
24
    cmp = lambda x, y: (x > y) - (x < y)
25
26
27
class InQueueTask(DictMixin):
28
    __slots__ = ('taskid', 'priority', 'exetime')
29
    __getitem__ = lambda *x: getattr(*x)
30
    __setitem__ = lambda *x: setattr(*x)
31
    __iter__ = lambda self: iter(self.__slots__)
32
    __len__ = lambda self: len(self.__slots__)
33
    keys = lambda self: self.__slots__
34
35
    def __init__(self, taskid, priority=0, exetime=0):
36
        self.taskid = taskid
37
        self.priority = priority
38
        self.exetime = exetime
39
40
    def __cmp__(self, other):
41
        if self.exetime == 0 and other.exetime == 0:
42
            return -cmp(self.priority, other.priority)
43
        else:
44
            return cmp(self.exetime, other.exetime)
45
46
    def __lt__(self, other):
47
        return self.__cmp__(other) < 0
48
49
50
class PriorityTaskQueue(Queue.Queue):
51
52
    '''
53
    TaskQueue
54
55
    Same taskid items will been merged
56
    '''
57
58
    def _init(self, maxsize):
59
        self.queue = []
60
        self.queue_dict = dict()
61
62
    def _qsize(self, len=len):
63
        return len(self.queue_dict)
64
65
    def _put(self, item, heappush=heapq.heappush):
66
        if item.taskid in self.queue_dict:
67
            task = self.queue_dict[item.taskid]
68
            changed = False
69
            if item.priority > task.priority:
70
                task.priority = item.priority
71
                changed = True
72
            if item.exetime < task.exetime:
73
                task.exetime = item.exetime
74
                changed = True
75
            if changed:
76
                self._resort()
77
        else:
78
            heappush(self.queue, item)
79
            self.queue_dict[item.taskid] = item
80
81
    def _get(self, heappop=heapq.heappop):
82
        while self.queue:
83
            item = heappop(self.queue)
84
            if item.taskid is None:
85
                continue
86
            self.queue_dict.pop(item.taskid, None)
87
            return item
88
        return None
89
90
    @property
91
    def top(self):
92
        while self.queue and self.queue[0].taskid is None:
93
            heapq.heappop(self.queue)
94
        if self.queue:
95
            return self.queue[0]
96
        return None
97
98
    def _resort(self):
99
        heapq.heapify(self.queue)
100
101
    def __contains__(self, taskid):
102
        return taskid in self.queue_dict
103
104
    def __getitem__(self, taskid):
105
        return self.queue_dict[taskid]
106
107
    def __setitem__(self, taskid, item):
108
        assert item.taskid == taskid
109
        self.put(item)
110
111
    def __delitem__(self, taskid):
112
        self.queue_dict.pop(taskid).taskid = None
113
114
115
class TaskQueue(object):
116
117
    '''
118
    task queue for scheduler, have a priority queue and a time queue for delayed tasks
119
    '''
120
    processing_timeout = 10 * 60
121
122
    def __init__(self, rate=0, burst=0):
123
        self.mutex = threading.RLock()
124
        self.priority_queue = PriorityTaskQueue()
125
        self.time_queue = PriorityTaskQueue()
126
        self.processing = PriorityTaskQueue()
127
        self.bucket = Bucket(rate=rate, burst=burst)
128
129
    @property
130
    def rate(self):
131
        return self.bucket.rate
132
133
    @rate.setter
134
    def rate(self, value):
135
        self.bucket.rate = value
136
137
    @property
138
    def burst(self):
139
        return self.bucket.burst
140
141
    @burst.setter
142
    def burst(self, value):
143
        self.bucket.burst = value
144
145
    def check_update(self):
146
        '''
147
        Check time queue and processing queue
148
149
        put tasks to priority queue when execute time arrived or process timeout
150
        '''
151
        self._check_time_queue()
152
        self._check_processing()
153
154 View Code Duplication
    def _check_time_queue(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
155
        now = time.time()
156
        self.mutex.acquire()
157
        while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now:
158
            task = self.time_queue.get_nowait()
159
            task.exetime = 0
160
            self.priority_queue.put(task)
161
        self.mutex.release()
162
163 View Code Duplication
    def _check_processing(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
164
        now = time.time()
165
        self.mutex.acquire()
166
        while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now:
167
            task = self.processing.get_nowait()
168
            if task.taskid is None:
169
                continue
170
            task.exetime = 0
171
            self.priority_queue.put(task)
172
            logger.info("processing: retry %s", task.taskid)
173
        self.mutex.release()
174
175
    def put(self, taskid, priority=0, exetime=0):
176
        '''Put a task into task queue'''
177
        now = time.time()
178
        task = InQueueTask(taskid, priority, exetime)
179
        self.mutex.acquire()
180
        if taskid in self.priority_queue:
181
            self.priority_queue.put(task)
182
        elif taskid in self.time_queue:
183
            self.time_queue.put(task)
184
        elif taskid in self.processing and self.processing[taskid].taskid:
185
            # force update a processing task is not allowed as there are so many
186
            # problems may happen
187
            pass
188
        else:
189
            if exetime and exetime > now:
190
                self.time_queue.put(task)
191
            else:
192
                self.priority_queue.put(task)
193
        self.mutex.release()
194
195
    def get(self):
196
        '''Get a task from queue when bucket available'''
197
        if self.bucket.get() < 1:
198
            return None
199
        now = time.time()
200
        self.mutex.acquire()
201
        try:
202
            task = self.priority_queue.get_nowait()
203
            self.bucket.desc()
204
        except Queue.Empty:
205
            self.mutex.release()
206
            return None
207
        task.exetime = now + self.processing_timeout
208
        self.processing.put(task)
209
        self.mutex.release()
210
        return task.taskid
211
212
    def done(self, taskid):
213
        '''Mark task done'''
214
        if taskid in self.processing:
215
            self.mutex.acquire()
216
            if taskid in self.processing:
217
                del self.processing[taskid]
218
            self.mutex.release()
219
            return True
220
        return False
221
222
    def delete(self, taskid):
223
        if taskid not in self:
224
            return False
225
        if taskid in self.priority_queue:
226
            self.mutex.acquire()
227
            del self.priority_queue[taskid]
228
            self.mutex.release()
229
        elif taskid in self.time_queue:
230
            self.mutex.acquire()
231
            del self.time_queue[taskid]
232
            self.mutex.release()
233
        elif taskid in self.processing:
234
            self.done(taskid)
235
        return True
236
237
    def size(self):
238
        return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize()
239
240
    def is_processing(self, taskid):
241
        '''
242
        return True if taskid is in processing
243
        '''
244
        return taskid in self.processing and self.processing[taskid].taskid
245
246
    def __len__(self):
247
        return self.size()
248
249
    def __contains__(self, taskid):
250
        if taskid in self.priority_queue or taskid in self.time_queue:
251
            return True
252
        if taskid in self.processing and self.processing[taskid].taskid:
253
            return True
254
        return False
255
256
257
if __name__ == '__main__':
258
    task_queue = TaskQueue()
259
    task_queue.processing_timeout = 0.1
260
    task_queue.put('a3', 3, time.time() + 0.1)
261
    task_queue.put('a1', 1)
262
    task_queue.put('a2', 2)
263
    assert task_queue.get() == 'a2'
264
    time.sleep(0.1)
265
    task_queue._check_time_queue()
266
    assert task_queue.get() == 'a3'
267
    assert task_queue.get() == 'a1'
268
    task_queue._check_processing()
269
    assert task_queue.get() == 'a2'
270
    assert len(task_queue) == 0
271