Completed
Push — master ( 663472...7037a7 )
by Roy
28s
created

PriorityTaskQueue.top()   A

Complexity

Conditions 4

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 7
rs 9.2
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 13:12:10
7
8
import heapq
9
import logging
10
import threading
11
import time
12
13
try:
14
    from UserDict import DictMixin
15
except ImportError:
16
    from collections import Mapping as DictMixin
17
from .token_bucket import Bucket
18
from six.moves import queue as Queue
19
20
logger = logging.getLogger('scheduler')
21
22
try:
23
    cmp
24
except NameError:
25
    cmp = lambda x, y: (x > y) - (x < y)
26
27
28
class AtomInt(object):
29
    __value__ = 0
30
    __mutex__ = threading.RLock()
31
32
    @classmethod
33
    def get_value(cls):
34
        cls.__mutex__.acquire()
35
        cls.__value__ = cls.__value__ + 1
36
        value = cls.__value__
37
        cls.__mutex__.release()
38
        return value
39
40
41
class InQueueTask(DictMixin):
42
    __slots__ = ('taskid', 'priority', 'exetime', 'sequence')
43
    __getitem__ = lambda *x: getattr(*x)
44
    __setitem__ = lambda *x: setattr(*x)
45
    __iter__ = lambda self: iter(self.__slots__)
46
    __len__ = lambda self: len(self.__slots__)
47
    keys = lambda self: self.__slots__
48
49
    def __init__(self, taskid, priority=0, exetime=0):
50
        self.taskid = taskid
51
        self.priority = priority
52
        self.exetime = exetime
53
        self.sequence = AtomInt.get_value()
54
55
    def __cmp__(self, other):
56
        if self.exetime == 0 and other.exetime == 0:
57
            diff = -cmp(self.priority, other.priority)
58
        else:
59
            diff = cmp(self.exetime, other.exetime)
60
61
        # compare in-queue sequence number finally if two element has the same
62
        # priority or exetime
63
        return diff if diff != 0 else cmp(self.sequence, other.sequence)
64
65
    def __lt__(self, other):
66
        return self.__cmp__(other) < 0
67
68
69
class PriorityTaskQueue(Queue.Queue):
70
    '''
71
    TaskQueue
72
73
    Same taskid items will been merged
74
    '''
75
76
    def _init(self, maxsize):
77
        self.queue = []
78
        self.queue_dict = dict()
79
80
    def _qsize(self, len=len):
81
        return len(self.queue_dict)
82
83
    def _put(self, item, heappush=heapq.heappush):
84
        if item.taskid in self.queue_dict:
85
            task = self.queue_dict[item.taskid]
86
            changed = False
87
            if item < task:
88
                changed = True
89
            task.priority = max(item.priority, task.priority)
90
            task.exetime = min(item.exetime, task.exetime)
91
            if changed:
92
                self._resort()
93
        else:
94
            heappush(self.queue, item)
95
            self.queue_dict[item.taskid] = item
96
97
    def _get(self, heappop=heapq.heappop):
98
        while self.queue:
99
            item = heappop(self.queue)
100
            if item.taskid is None:
101
                continue
102
            self.queue_dict.pop(item.taskid, None)
103
            return item
104
        return None
105
106
    @property
107
    def top(self):
108
        while self.queue and self.queue[0].taskid is None:
109
            heapq.heappop(self.queue)
110
        if self.queue:
111
            return self.queue[0]
112
        return None
113
114
    def _resort(self):
115
        heapq.heapify(self.queue)
116
117
    def __contains__(self, taskid):
118
        return taskid in self.queue_dict
119
120
    def __getitem__(self, taskid):
121
        return self.queue_dict[taskid]
122
123
    def __setitem__(self, taskid, item):
124
        assert item.taskid == taskid
125
        self.put(item)
126
127
    def __delitem__(self, taskid):
128
        self.queue_dict.pop(taskid).taskid = None
129
130
131
class TaskQueue(object):
132
    '''
133
    task queue for scheduler, have a priority queue and a time queue for delayed tasks
134
    '''
135
    processing_timeout = 10 * 60
136
137
    def __init__(self, rate=0, burst=0):
138
        self.mutex = threading.RLock()
139
        self.priority_queue = PriorityTaskQueue()
140
        self.time_queue = PriorityTaskQueue()
141
        self.processing = PriorityTaskQueue()
142
        self.bucket = Bucket(rate=rate, burst=burst)
143
144
    @property
145
    def rate(self):
146
        return self.bucket.rate
147
148
    @rate.setter
149
    def rate(self, value):
150
        self.bucket.rate = value
151
152
    @property
153
    def burst(self):
154 View Code Duplication
        return self.bucket.burst
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
155
156
    @burst.setter
157
    def burst(self, value):
158
        self.bucket.burst = value
159
160
    def check_update(self):
161
        '''
162
        Check time queue and processing queue
163 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
164
        put tasks to priority queue when execute time arrived or process timeout
165
        '''
166
        self._check_time_queue()
167
        self._check_processing()
168
169
    def _check_time_queue(self):
170
        now = time.time()
171
        self.mutex.acquire()
172
        while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now:
173
            task = self.time_queue.get_nowait()  # type: InQueueTask
174
            task.exetime = 0
175
            self.priority_queue.put(task)
176
        self.mutex.release()
177
178
    def _check_processing(self):
179
        now = time.time()
180
        self.mutex.acquire()
181
        while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now:
182
            task = self.processing.get_nowait()
183
            if task.taskid is None:
184
                continue
185
            task.exetime = 0
186
            self.priority_queue.put(task)
187
            logger.info("processing: retry %s", task.taskid)
188
        self.mutex.release()
189
190
    def put(self, taskid, priority=0, exetime=0):
191
        """
192
        Put a task into task queue
193
        
194
        when use heap sort, if we put tasks(with the same priority and exetime=0) into queue,
195
        the queue is not a strict FIFO queue, but more like a FILO stack.
196
        It is very possible that when there are continuous big flow, the speed of select is 
197
        slower than request, resulting in priority-queue accumulation in short time.
198
        In this scenario, the tasks more earlier entering the priority-queue will not get 
199
        processed until the request flow becomes small. 
200
        
201
        Thus, we store a global atom self increasing value into task.sequence which represent 
202
        the task enqueue sequence. When the comparison of exetime and priority have no 
203
        difference, we compare task.sequence to ensure that the entire queue is ordered.
204
        """
205
        now = time.time()
206
207
        task = InQueueTask(taskid, priority, exetime)
208
209
        self.mutex.acquire()
210
        if taskid in self.priority_queue:
211
            self.priority_queue.put(task)
212
        elif taskid in self.time_queue:
213
            self.time_queue.put(task)
214
        elif taskid in self.processing and self.processing[taskid].taskid:
215
            # force update a processing task is not allowed as there are so many
216
            # problems may happen
217
            pass
218
        else:
219
            if exetime and exetime > now:
220
                self.time_queue.put(task)
221
            else:
222
                task.exetime = 0
223
                self.priority_queue.put(task)
224
225
        self.mutex.release()
226
227
    def get(self):
228
        '''Get a task from queue when bucket available'''
229
        if self.bucket.get() < 1:
230
            return None
231
        now = time.time()
232
        self.mutex.acquire()
233
        try:
234
            task = self.priority_queue.get_nowait()
235
            self.bucket.desc()
236
        except Queue.Empty:
237
            self.mutex.release()
238
            return None
239
        task.exetime = now + self.processing_timeout
240
        self.processing.put(task)
241
        self.mutex.release()
242
        return task.taskid
243
244
    def done(self, taskid):
245
        '''Mark task done'''
246
        if taskid in self.processing:
247
            self.mutex.acquire()
248
            if taskid in self.processing:
249
                del self.processing[taskid]
250
            self.mutex.release()
251
            return True
252
        return False
253
254
    def delete(self, taskid):
255
        if taskid not in self:
256
            return False
257
        if taskid in self.priority_queue:
258
            self.mutex.acquire()
259
            del self.priority_queue[taskid]
260
            self.mutex.release()
261
        elif taskid in self.time_queue:
262
            self.mutex.acquire()
263
            del self.time_queue[taskid]
264
            self.mutex.release()
265
        elif taskid in self.processing:
266
            self.done(taskid)
267
        return True
268
269
    def size(self):
270
        return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize()
271
272
    def is_processing(self, taskid):
273
        '''
274
        return True if taskid is in processing
275
        '''
276
        return taskid in self.processing and self.processing[taskid].taskid
277
278
    def __len__(self):
279
        return self.size()
280
281
    def __contains__(self, taskid):
282
        if taskid in self.priority_queue or taskid in self.time_queue:
283
            return True
284
        if taskid in self.processing and self.processing[taskid].taskid:
285
            return True
286
        return False
287
288
289
if __name__ == '__main__':
290
    task_queue = TaskQueue()
291
    task_queue.processing_timeout = 0.1
292
    task_queue.put('a3', 3, time.time() + 0.1)
293
    task_queue.put('a1', 1)
294
    task_queue.put('a2', 2)
295
    assert task_queue.get() == 'a2'
296
    time.sleep(0.1)
297
    task_queue._check_time_queue()
298
    assert task_queue.get() == 'a3'
299
    assert task_queue.get() == 'a1'
300
    task_queue._check_processing()
301
    assert task_queue.get() == 'a2'
302
    assert len(task_queue) == 0
303