Completed
Pull Request — master (#794)
by
unknown
33s
created

InQueueTask.__repr__()   A

Complexity

Conditions 2

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 13:12:10
7
8
import heapq
9
import logging
10
import threading
11
import time
12
13
try:
14
    from UserDict import DictMixin
15
except ImportError:
16
    from collections import Mapping as DictMixin
17
from .token_bucket import Bucket
18
from six.moves import queue as Queue
19
20
logger = logging.getLogger('scheduler')
21
22
try:
23
    cmp
24
except NameError:
25
    cmp = lambda x, y: (x > y) - (x < y)
26
27
28
class InQueueTask(DictMixin):
29
    __slots__ = ('taskid', 'priority', 'exetime')
30
    __getitem__ = lambda *x: getattr(*x)
31
    __setitem__ = lambda *x: setattr(*x)
32
    __iter__ = lambda self: iter(self.__slots__)
33
    __len__ = lambda self: len(self.__slots__)
34
    keys = lambda self: self.__slots__
35
36
    def __init__(self, taskid, priority=0, exetime=0):
37
        self.taskid = taskid
38
        self.priority = priority
39
        self.exetime = exetime
40
41
    def __cmp__(self, other):
42
        # compare priority first
43
        cmp_priority = -cmp(self.priority, other.priority)
44
45
        # when two element have the same priority, then compare exetime.
46
        # keep tasks in time order.
47
        if cmp_priority == 0:
48
            return cmp(self.exetime, other.exetime)
49
        return cmp_priority
50
51
    def __lt__(self, other):
52
        return self.__cmp__(other) < 0
53
54
    def __repr__(self):
55
        return repr({k: self[k] for k in self.__slots__})
56
57
58
class PriorityTaskQueue(Queue.Queue):
59
    '''
60
    TaskQueue
61
62
    Same taskid items will been merged
63
    '''
64
65
    def _init(self, maxsize):
66
        self.queue = []
67
        self.queue_dict = dict()
68
69
    def _qsize(self, len=len):
70
        return len(self.queue_dict)
71
72
    def _put(self, item, heappush=heapq.heappush):
73
        if item.taskid in self.queue_dict:
74
            task = self.queue_dict[item.taskid]
75
            changed = False
76
            if item < task:
77
                task.priority = max(item.priority, task.priority)
78
                task.exetime = min(item.exetime, task.exetime)
79
                changed = True
80
            if changed:
81
                self._resort()
82
        else:
83
            heappush(self.queue, item)
84
            self.queue_dict[item.taskid] = item
85
86
    def _get(self, heappop=heapq.heappop):
87
        while self.queue:
88
            item = heappop(self.queue)
89
            if item.taskid is None:
90
                continue
91
            self.queue_dict.pop(item.taskid, None)
92
            return item
93
        return None
94
95
    @property
96
    def top(self):
97
        while self.queue and self.queue[0].taskid is None:
98
            heapq.heappop(self.queue)
99
        if self.queue:
100
            return self.queue[0]
101
        return None
102
103
    def _resort(self):
104
        heapq.heapify(self.queue)
105
106
    def __contains__(self, taskid):
107
        return taskid in self.queue_dict
108
109
    def __getitem__(self, taskid):
110
        return self.queue_dict[taskid]
111
112
    def __setitem__(self, taskid, item):
113
        assert item.taskid == taskid
114
        self.put(item)
115
116
    def __delitem__(self, taskid):
117
        self.queue_dict.pop(taskid).taskid = None
118
119
120
class TaskQueue(object):
121
    '''
122
    task queue for scheduler, have a priority queue and a time queue for delayed tasks
123
    '''
124
    processing_timeout = 10 * 60
125
126
    def __init__(self, rate=0, burst=0):
127
        self.mutex = threading.RLock()
128
        self.priority_queue = PriorityTaskQueue()
129
        self.time_queue = PriorityTaskQueue()
130
        self.processing = PriorityTaskQueue()
131
        self.bucket = Bucket(rate=rate, burst=burst)
132
133
    @property
134
    def rate(self):
135
        return self.bucket.rate
136
137
    @rate.setter
138
    def rate(self, value):
139
        self.bucket.rate = value
140
141
    @property
142
    def burst(self):
143
        return self.bucket.burst
144
145
    @burst.setter
146
    def burst(self, value):
147
        self.bucket.burst = value
148
149
    def check_update(self):
150
        '''
151
        Check time queue and processing queue
152
153
        put tasks to priority queue when execute time arrived or process timeout
154
        '''
155
        self._check_time_queue()
156
        self._check_processing()
157
158 View Code Duplication
    def _check_time_queue(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        now = time.time()
160
        self.mutex.acquire()
161
        while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now:
162
            task = self.time_queue.get_nowait()
163
            task.exetime = 0
164
            self.priority_queue.put(task)
165
        self.mutex.release()
166
167 View Code Duplication
    def _check_processing(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
168
        now = time.time()
169
        self.mutex.acquire()
170
        while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now:
171
            task = self.processing.get_nowait()
172
            if task.taskid is None:
173
                continue
174
            task.exetime = 0
175
            self.priority_queue.put(task)
176
            logger.info("processing: retry %s", task.taskid)
177
        self.mutex.release()
178
179
    def put(self, taskid, priority=0, exetime=0):
180
        """
181
        Put a task into task queue
182
        
183
        when use heap sort, if we put tasks(with the same priority and exetime=0) into queue,
184
        the queue is not a strict FIFO queue, but more like a FILO stack.
185
186
        It is very possible that when there are continuous big flow, the speed of select is 
187
        slower than request, resulting in priority-queue accumulation in short time.
188
        
189
        In this scenario, the tasks more earlier entering the priority-queue will not get 
190
        processed until the request flow becomes small. 
191
        
192
        """
193
        now = time.time()
194
195
        # give exetime to time.time() by default. So if two or more tasks
196
        # have the same priority, we can still keep tasks in time order
197
        # as much as possible.
198
        task = InQueueTask(taskid, priority, exetime if exetime > 0 else now)
199
200
        self.mutex.acquire()
201
        if taskid in self.priority_queue:
202
            self.priority_queue.put(task)
203
        elif taskid in self.time_queue:
204
            self.time_queue.put(task)
205
        elif taskid in self.processing and self.processing[taskid].taskid:
206
            # force update a processing task is not allowed as there are so many
207
            # problems may happen
208
            pass
209
        else:
210
            if exetime and exetime > now:
211
                self.time_queue.put(task)
212
            else:
213
                self.priority_queue.put(task)
214
215
        self.mutex.release()
216
217
    def get(self):
218
        '''Get a task from queue when bucket available'''
219
        if self.bucket.get() < 1:
220
            return None
221
        now = time.time()
222
        self.mutex.acquire()
223
        try:
224
            task = self.priority_queue.get_nowait()
225
            self.bucket.desc()
226
        except Queue.Empty:
227
            self.mutex.release()
228
            return None
229
        task.exetime = now + self.processing_timeout
230
        self.processing.put(task)
231
        self.mutex.release()
232
        return task.taskid
233
234
    def done(self, taskid):
235
        '''Mark task done'''
236
        if taskid in self.processing:
237
            self.mutex.acquire()
238
            if taskid in self.processing:
239
                del self.processing[taskid]
240
            self.mutex.release()
241
            return True
242
        return False
243
244
    def delete(self, taskid):
245
        if taskid not in self:
246
            return False
247
        if taskid in self.priority_queue:
248
            self.mutex.acquire()
249
            del self.priority_queue[taskid]
250
            self.mutex.release()
251
        elif taskid in self.time_queue:
252
            self.mutex.acquire()
253
            del self.time_queue[taskid]
254
            self.mutex.release()
255
        elif taskid in self.processing:
256
            self.done(taskid)
257
        return True
258
259
    def size(self):
260
        return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize()
261
262
    def is_processing(self, taskid):
263
        '''
264
        return True if taskid is in processing
265
        '''
266
        return taskid in self.processing and self.processing[taskid].taskid
267
268
    def __len__(self):
269
        return self.size()
270
271
    def __contains__(self, taskid):
272
        if taskid in self.priority_queue or taskid in self.time_queue:
273
            return True
274
        if taskid in self.processing and self.processing[taskid].taskid:
275
            return True
276
        return False
277
278
279
if __name__ == '__main__':
280
    task_queue = TaskQueue()
281
    task_queue.processing_timeout = 0.1
282
    task_queue.put('a3', 3, time.time() + 0.1)
283
    task_queue.put('a1', 1)
284
    task_queue.put('a2', 2)
285
    assert task_queue.get() == 'a2'
286
    time.sleep(0.1)
287
    task_queue._check_time_queue()
288
    assert task_queue.get() == 'a3'
289
    assert task_queue.get() == 'a1'
290
    task_queue._check_processing()
291
    assert task_queue.get() == 'a2'
292
    assert len(task_queue) == 0
293