1 | #!/usr/bin/env python |
||
2 | # -*- encoding: utf-8 -*- |
||
3 | # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
||
4 | # Author: Binux<[email protected]> |
||
5 | # http://binux.me |
||
6 | # Created on 2014-02-07 13:12:10 |
||
7 | |||
8 | import heapq |
||
9 | import logging |
||
10 | import threading |
||
11 | import time |
||
12 | |||
13 | try: |
||
14 | from UserDict import DictMixin |
||
15 | except ImportError: |
||
16 | from collections import Mapping as DictMixin |
||
17 | from .token_bucket import Bucket |
||
18 | from six.moves import queue as Queue |
||
19 | |||
20 | logger = logging.getLogger('scheduler') |
||
21 | |||
22 | try: |
||
23 | cmp |
||
24 | except NameError: |
||
25 | cmp = lambda x, y: (x > y) - (x < y) |
||
26 | |||
27 | |||
28 | class AtomInt(object): |
||
29 | __value__ = 0 |
||
30 | __mutex__ = threading.RLock() |
||
31 | |||
32 | @classmethod |
||
33 | def get_value(cls): |
||
34 | cls.__mutex__.acquire() |
||
35 | cls.__value__ = cls.__value__ + 1 |
||
36 | value = cls.__value__ |
||
37 | cls.__mutex__.release() |
||
38 | return value |
||
39 | |||
40 | |||
41 | class InQueueTask(DictMixin): |
||
42 | __slots__ = ('taskid', 'priority', 'exetime', 'sequence') |
||
43 | __getitem__ = lambda *x: getattr(*x) |
||
44 | __setitem__ = lambda *x: setattr(*x) |
||
45 | __iter__ = lambda self: iter(self.__slots__) |
||
46 | __len__ = lambda self: len(self.__slots__) |
||
47 | keys = lambda self: self.__slots__ |
||
48 | |||
49 | def __init__(self, taskid, priority=0, exetime=0): |
||
50 | self.taskid = taskid |
||
51 | self.priority = priority |
||
52 | self.exetime = exetime |
||
53 | self.sequence = AtomInt.get_value() |
||
54 | |||
55 | def __cmp__(self, other): |
||
56 | if self.exetime == 0 and other.exetime == 0: |
||
57 | diff = -cmp(self.priority, other.priority) |
||
58 | else: |
||
59 | diff = cmp(self.exetime, other.exetime) |
||
60 | |||
61 | # compare in-queue sequence number finally if two element has the same |
||
62 | # priority or exetime |
||
63 | return diff if diff != 0 else cmp(self.sequence, other.sequence) |
||
64 | |||
65 | def __lt__(self, other): |
||
66 | return self.__cmp__(other) < 0 |
||
67 | |||
68 | |||
69 | class PriorityTaskQueue(Queue.Queue): |
||
70 | ''' |
||
71 | TaskQueue |
||
72 | |||
73 | Same taskid items will been merged |
||
74 | ''' |
||
75 | |||
76 | def _init(self, maxsize): |
||
77 | self.queue = [] |
||
78 | self.queue_dict = dict() |
||
79 | |||
80 | def _qsize(self, len=len): |
||
81 | return len(self.queue_dict) |
||
82 | |||
83 | def _put(self, item, heappush=heapq.heappush): |
||
84 | if item.taskid in self.queue_dict: |
||
85 | task = self.queue_dict[item.taskid] |
||
86 | changed = False |
||
87 | if item < task: |
||
88 | changed = True |
||
89 | task.priority = max(item.priority, task.priority) |
||
90 | task.exetime = min(item.exetime, task.exetime) |
||
91 | if changed: |
||
92 | self._resort() |
||
93 | else: |
||
94 | heappush(self.queue, item) |
||
95 | self.queue_dict[item.taskid] = item |
||
96 | |||
97 | def _get(self, heappop=heapq.heappop): |
||
98 | while self.queue: |
||
99 | item = heappop(self.queue) |
||
100 | if item.taskid is None: |
||
101 | continue |
||
102 | self.queue_dict.pop(item.taskid, None) |
||
103 | return item |
||
104 | return None |
||
105 | |||
106 | @property |
||
107 | def top(self): |
||
108 | while self.queue and self.queue[0].taskid is None: |
||
109 | heapq.heappop(self.queue) |
||
110 | if self.queue: |
||
111 | return self.queue[0] |
||
112 | return None |
||
113 | |||
114 | def _resort(self): |
||
115 | heapq.heapify(self.queue) |
||
116 | |||
117 | def __contains__(self, taskid): |
||
118 | return taskid in self.queue_dict |
||
119 | |||
120 | def __getitem__(self, taskid): |
||
121 | return self.queue_dict[taskid] |
||
122 | |||
123 | def __setitem__(self, taskid, item): |
||
124 | assert item.taskid == taskid |
||
125 | self.put(item) |
||
126 | |||
127 | def __delitem__(self, taskid): |
||
128 | self.queue_dict.pop(taskid).taskid = None |
||
129 | |||
130 | |||
131 | class TaskQueue(object): |
||
132 | ''' |
||
133 | task queue for scheduler, have a priority queue and a time queue for delayed tasks |
||
134 | ''' |
||
135 | processing_timeout = 10 * 60 |
||
136 | |||
137 | def __init__(self, rate=0, burst=0): |
||
138 | self.mutex = threading.RLock() |
||
139 | self.priority_queue = PriorityTaskQueue() |
||
140 | self.time_queue = PriorityTaskQueue() |
||
141 | self.processing = PriorityTaskQueue() |
||
142 | self.bucket = Bucket(rate=rate, burst=burst) |
||
143 | |||
144 | @property |
||
145 | def rate(self): |
||
146 | return self.bucket.rate |
||
147 | |||
148 | @rate.setter |
||
149 | def rate(self, value): |
||
150 | self.bucket.rate = value |
||
151 | |||
152 | @property |
||
153 | def burst(self): |
||
154 | View Code Duplication | return self.bucket.burst |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
155 | |||
156 | @burst.setter |
||
157 | def burst(self, value): |
||
158 | self.bucket.burst = value |
||
159 | |||
160 | def check_update(self): |
||
161 | ''' |
||
162 | Check time queue and processing queue |
||
163 | View Code Duplication | ||
0 ignored issues
–
show
|
|||
164 | put tasks to priority queue when execute time arrived or process timeout |
||
165 | ''' |
||
166 | self._check_time_queue() |
||
167 | self._check_processing() |
||
168 | |||
169 | def _check_time_queue(self): |
||
170 | now = time.time() |
||
171 | self.mutex.acquire() |
||
172 | while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now: |
||
173 | task = self.time_queue.get_nowait() # type: InQueueTask |
||
174 | task.exetime = 0 |
||
175 | self.priority_queue.put(task) |
||
176 | self.mutex.release() |
||
177 | |||
178 | def _check_processing(self): |
||
179 | now = time.time() |
||
180 | self.mutex.acquire() |
||
181 | while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now: |
||
182 | task = self.processing.get_nowait() |
||
183 | if task.taskid is None: |
||
184 | continue |
||
185 | task.exetime = 0 |
||
186 | self.priority_queue.put(task) |
||
187 | logger.info("processing: retry %s", task.taskid) |
||
188 | self.mutex.release() |
||
189 | |||
190 | def put(self, taskid, priority=0, exetime=0): |
||
191 | """ |
||
192 | Put a task into task queue |
||
193 | |||
194 | when use heap sort, if we put tasks(with the same priority and exetime=0) into queue, |
||
195 | the queue is not a strict FIFO queue, but more like a FILO stack. |
||
196 | It is very possible that when there are continuous big flow, the speed of select is |
||
197 | slower than request, resulting in priority-queue accumulation in short time. |
||
198 | In this scenario, the tasks more earlier entering the priority-queue will not get |
||
199 | processed until the request flow becomes small. |
||
200 | |||
201 | Thus, we store a global atom self increasing value into task.sequence which represent |
||
202 | the task enqueue sequence. When the comparison of exetime and priority have no |
||
203 | difference, we compare task.sequence to ensure that the entire queue is ordered. |
||
204 | """ |
||
205 | now = time.time() |
||
206 | |||
207 | task = InQueueTask(taskid, priority, exetime) |
||
208 | |||
209 | self.mutex.acquire() |
||
210 | if taskid in self.priority_queue: |
||
211 | self.priority_queue.put(task) |
||
212 | elif taskid in self.time_queue: |
||
213 | self.time_queue.put(task) |
||
214 | elif taskid in self.processing and self.processing[taskid].taskid: |
||
215 | # force update a processing task is not allowed as there are so many |
||
216 | # problems may happen |
||
217 | pass |
||
218 | else: |
||
219 | if exetime and exetime > now: |
||
220 | self.time_queue.put(task) |
||
221 | else: |
||
222 | task.exetime = 0 |
||
223 | self.priority_queue.put(task) |
||
224 | |||
225 | self.mutex.release() |
||
226 | |||
227 | def get(self): |
||
228 | '''Get a task from queue when bucket available''' |
||
229 | if self.bucket.get() < 1: |
||
230 | return None |
||
231 | now = time.time() |
||
232 | self.mutex.acquire() |
||
233 | try: |
||
234 | task = self.priority_queue.get_nowait() |
||
235 | self.bucket.desc() |
||
236 | except Queue.Empty: |
||
237 | self.mutex.release() |
||
238 | return None |
||
239 | task.exetime = now + self.processing_timeout |
||
240 | self.processing.put(task) |
||
241 | self.mutex.release() |
||
242 | return task.taskid |
||
243 | |||
244 | def done(self, taskid): |
||
245 | '''Mark task done''' |
||
246 | if taskid in self.processing: |
||
247 | self.mutex.acquire() |
||
248 | if taskid in self.processing: |
||
249 | del self.processing[taskid] |
||
250 | self.mutex.release() |
||
251 | return True |
||
252 | return False |
||
253 | |||
254 | def delete(self, taskid): |
||
255 | if taskid not in self: |
||
256 | return False |
||
257 | if taskid in self.priority_queue: |
||
258 | self.mutex.acquire() |
||
259 | del self.priority_queue[taskid] |
||
260 | self.mutex.release() |
||
261 | elif taskid in self.time_queue: |
||
262 | self.mutex.acquire() |
||
263 | del self.time_queue[taskid] |
||
264 | self.mutex.release() |
||
265 | elif taskid in self.processing: |
||
266 | self.done(taskid) |
||
267 | return True |
||
268 | |||
269 | def size(self): |
||
270 | return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize() |
||
271 | |||
272 | def is_processing(self, taskid): |
||
273 | ''' |
||
274 | return True if taskid is in processing |
||
275 | ''' |
||
276 | return taskid in self.processing and self.processing[taskid].taskid |
||
277 | |||
278 | def __len__(self): |
||
279 | return self.size() |
||
280 | |||
281 | def __contains__(self, taskid): |
||
282 | if taskid in self.priority_queue or taskid in self.time_queue: |
||
283 | return True |
||
284 | if taskid in self.processing and self.processing[taskid].taskid: |
||
285 | return True |
||
286 | return False |
||
287 | |||
288 | |||
289 | if __name__ == '__main__': |
||
290 | task_queue = TaskQueue() |
||
291 | task_queue.processing_timeout = 0.1 |
||
292 | task_queue.put('a3', 3, time.time() + 0.1) |
||
293 | task_queue.put('a1', 1) |
||
294 | task_queue.put('a2', 2) |
||
295 | assert task_queue.get() == 'a2' |
||
296 | time.sleep(0.1) |
||
297 | task_queue._check_time_queue() |
||
298 | assert task_queue.get() == 'a3' |
||
299 | assert task_queue.get() == 'a1' |
||
300 | task_queue._check_processing() |
||
301 | assert task_queue.get() == 'a2' |
||
302 | assert len(task_queue) == 0 |
||
303 |