1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- encoding: utf-8 -*- |
3
|
|
|
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: |
4
|
|
|
# Author: Binux<[email protected]> |
5
|
|
|
# http://binux.me |
6
|
|
|
# Created on 2014-02-07 13:12:10 |
7
|
|
|
|
8
|
|
|
import time |
9
|
|
|
import heapq |
10
|
|
|
import logging |
11
|
|
|
import threading |
12
|
|
|
try: |
13
|
|
|
from UserDict import DictMixin |
14
|
|
|
except ImportError: |
15
|
|
|
from collections import Mapping as DictMixin |
16
|
|
|
from .token_bucket import Bucket |
17
|
|
|
from six.moves import queue as Queue |
18
|
|
|
|
19
|
|
|
logger = logging.getLogger('scheduler') |
20
|
|
|
|
21
|
|
|
try: |
22
|
|
|
cmp |
23
|
|
|
except NameError: |
24
|
|
|
cmp = lambda x, y: (x > y) - (x < y) |
25
|
|
|
|
26
|
|
|
|
27
|
|
|
class InQueueTask(DictMixin): |
28
|
|
|
__slots__ = ('taskid', 'priority', 'exetime') |
29
|
|
|
__getitem__ = lambda *x: getattr(*x) |
30
|
|
|
__setitem__ = lambda *x: setattr(*x) |
31
|
|
|
__iter__ = lambda self: iter(self.__slots__) |
32
|
|
|
__len__ = lambda self: len(self.__slots__) |
33
|
|
|
keys = lambda self: self.__slots__ |
34
|
|
|
|
35
|
|
|
def __init__(self, taskid, priority=0, exetime=0): |
36
|
|
|
self.taskid = taskid |
37
|
|
|
self.priority = priority |
38
|
|
|
self.exetime = exetime |
39
|
|
|
|
40
|
|
|
def __cmp__(self, other): |
41
|
|
|
if self.exetime == 0 and other.exetime == 0: |
42
|
|
|
return -cmp(self.priority, other.priority) |
43
|
|
|
else: |
44
|
|
|
return cmp(self.exetime, other.exetime) |
45
|
|
|
|
46
|
|
|
def __lt__(self, other): |
47
|
|
|
return self.__cmp__(other) < 0 |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
class PriorityTaskQueue(Queue.Queue): |
51
|
|
|
|
52
|
|
|
''' |
53
|
|
|
TaskQueue |
54
|
|
|
|
55
|
|
|
Same taskid items will been merged |
56
|
|
|
''' |
57
|
|
|
|
58
|
|
|
def _init(self, maxsize): |
59
|
|
|
self.queue = [] |
60
|
|
|
self.queue_dict = dict() |
61
|
|
|
|
62
|
|
|
def _qsize(self, len=len): |
63
|
|
|
return len(self.queue_dict) |
64
|
|
|
|
65
|
|
|
def _put(self, item, heappush=heapq.heappush): |
66
|
|
|
if item.taskid in self.queue_dict: |
67
|
|
|
task = self.queue_dict[item.taskid] |
68
|
|
|
changed = False |
69
|
|
|
if item.priority > task.priority: |
70
|
|
|
task.priority = item.priority |
71
|
|
|
changed = True |
72
|
|
|
if item.exetime < task.exetime: |
73
|
|
|
task.exetime = item.exetime |
74
|
|
|
changed = True |
75
|
|
|
if changed: |
76
|
|
|
self._resort() |
77
|
|
|
else: |
78
|
|
|
heappush(self.queue, item) |
79
|
|
|
self.queue_dict[item.taskid] = item |
80
|
|
|
|
81
|
|
|
def _get(self, heappop=heapq.heappop): |
82
|
|
|
while self.queue: |
83
|
|
|
item = heappop(self.queue) |
84
|
|
|
if item.taskid is None: |
85
|
|
|
continue |
86
|
|
|
self.queue_dict.pop(item.taskid, None) |
87
|
|
|
return item |
88
|
|
|
return None |
89
|
|
|
|
90
|
|
|
@property |
91
|
|
|
def top(self): |
92
|
|
|
while self.queue and self.queue[0].taskid is None: |
93
|
|
|
heapq.heappop(self.queue) |
94
|
|
|
if self.queue: |
95
|
|
|
return self.queue[0] |
96
|
|
|
return None |
97
|
|
|
|
98
|
|
|
def _resort(self): |
99
|
|
|
heapq.heapify(self.queue) |
100
|
|
|
|
101
|
|
|
def __contains__(self, taskid): |
102
|
|
|
return taskid in self.queue_dict |
103
|
|
|
|
104
|
|
|
def __getitem__(self, taskid): |
105
|
|
|
return self.queue_dict[taskid] |
106
|
|
|
|
107
|
|
|
def __setitem__(self, taskid, item): |
108
|
|
|
assert item.taskid == taskid |
109
|
|
|
self.put(item) |
110
|
|
|
|
111
|
|
|
def __delitem__(self, taskid): |
112
|
|
|
self.queue_dict.pop(taskid).taskid = None |
113
|
|
|
|
114
|
|
|
|
115
|
|
|
class TaskQueue(object): |
116
|
|
|
|
117
|
|
|
''' |
118
|
|
|
task queue for scheduler, have a priority queue and a time queue for delayed tasks |
119
|
|
|
''' |
120
|
|
|
processing_timeout = 10 * 60 |
121
|
|
|
|
122
|
|
|
def __init__(self, rate=0, burst=0): |
123
|
|
|
self.mutex = threading.RLock() |
124
|
|
|
self.priority_queue = PriorityTaskQueue() |
125
|
|
|
self.time_queue = PriorityTaskQueue() |
126
|
|
|
self.processing = PriorityTaskQueue() |
127
|
|
|
self.bucket = Bucket(rate=rate, burst=burst) |
128
|
|
|
|
129
|
|
|
@property |
130
|
|
|
def rate(self): |
131
|
|
|
return self.bucket.rate |
132
|
|
|
|
133
|
|
|
@rate.setter |
134
|
|
|
def rate(self, value): |
135
|
|
|
self.bucket.rate = value |
136
|
|
|
|
137
|
|
|
@property |
138
|
|
|
def burst(self): |
139
|
|
|
return self.burst.burst |
140
|
|
|
|
141
|
|
|
@burst.setter |
142
|
|
|
def burst(self, value): |
143
|
|
|
self.bucket.burst = value |
144
|
|
|
|
145
|
|
|
def check_update(self): |
146
|
|
|
''' |
147
|
|
|
Check time queue and processing queue |
148
|
|
|
|
149
|
|
|
put tasks to priority queue when execute time arrived or process timeout |
150
|
|
|
''' |
151
|
|
|
self._check_time_queue() |
152
|
|
|
self._check_processing() |
153
|
|
|
|
154
|
|
View Code Duplication |
def _check_time_queue(self): |
|
|
|
|
155
|
|
|
now = time.time() |
156
|
|
|
self.mutex.acquire() |
157
|
|
|
while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now: |
158
|
|
|
task = self.time_queue.get_nowait() |
159
|
|
|
task.exetime = 0 |
160
|
|
|
self.priority_queue.put(task) |
161
|
|
|
self.mutex.release() |
162
|
|
|
|
163
|
|
View Code Duplication |
def _check_processing(self): |
|
|
|
|
164
|
|
|
now = time.time() |
165
|
|
|
self.mutex.acquire() |
166
|
|
|
while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now: |
167
|
|
|
task = self.processing.get_nowait() |
168
|
|
|
if task.taskid is None: |
169
|
|
|
continue |
170
|
|
|
task.exetime = 0 |
171
|
|
|
self.priority_queue.put(task) |
172
|
|
|
logger.info("processing: retry %s", task.taskid) |
173
|
|
|
self.mutex.release() |
174
|
|
|
|
175
|
|
|
def put(self, taskid, priority=0, exetime=0): |
176
|
|
|
'''Put a task into task queue''' |
177
|
|
|
now = time.time() |
178
|
|
|
task = InQueueTask(taskid, priority, exetime) |
179
|
|
|
self.mutex.acquire() |
180
|
|
|
if taskid in self.priority_queue: |
181
|
|
|
self.priority_queue.put(task) |
182
|
|
|
elif taskid in self.time_queue: |
183
|
|
|
self.time_queue.put(task) |
184
|
|
|
elif taskid in self.processing and self.processing[taskid].taskid: |
185
|
|
|
# force update a processing task is not allowed as there are so many |
186
|
|
|
# problems may happen |
187
|
|
|
pass |
188
|
|
|
else: |
189
|
|
|
if exetime and exetime > now: |
190
|
|
|
self.time_queue.put(task) |
191
|
|
|
else: |
192
|
|
|
self.priority_queue.put(task) |
193
|
|
|
self.mutex.release() |
194
|
|
|
|
195
|
|
|
def get(self): |
196
|
|
|
'''Get a task from queue when bucket available''' |
197
|
|
|
if self.bucket.get() < 1: |
198
|
|
|
return None |
199
|
|
|
now = time.time() |
200
|
|
|
self.mutex.acquire() |
201
|
|
|
try: |
202
|
|
|
task = self.priority_queue.get_nowait() |
203
|
|
|
self.bucket.desc() |
204
|
|
|
except Queue.Empty: |
205
|
|
|
self.mutex.release() |
206
|
|
|
return None |
207
|
|
|
task.exetime = now + self.processing_timeout |
208
|
|
|
self.processing.put(task) |
209
|
|
|
self.mutex.release() |
210
|
|
|
return task.taskid |
211
|
|
|
|
212
|
|
|
def done(self, taskid): |
213
|
|
|
'''Mark task done''' |
214
|
|
|
if taskid in self.processing: |
215
|
|
|
self.mutex.acquire() |
216
|
|
|
if taskid in self.processing: |
217
|
|
|
del self.processing[taskid] |
218
|
|
|
self.mutex.release() |
219
|
|
|
return True |
220
|
|
|
return False |
221
|
|
|
|
222
|
|
|
def size(self): |
223
|
|
|
return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize() |
224
|
|
|
|
225
|
|
|
def __len__(self): |
226
|
|
|
return self.size() |
227
|
|
|
|
228
|
|
|
def __contains__(self, taskid): |
229
|
|
|
if taskid in self.priority_queue or taskid in self.time_queue: |
230
|
|
|
return True |
231
|
|
|
if taskid in self.processing and self.processing[taskid].taskid: |
232
|
|
|
return True |
233
|
|
|
return False |
234
|
|
|
|
235
|
|
|
|
236
|
|
|
if __name__ == '__main__': |
237
|
|
|
task_queue = TaskQueue() |
238
|
|
|
task_queue.processing_timeout = 0.1 |
239
|
|
|
task_queue.put('a3', 3, time.time() + 0.1) |
240
|
|
|
task_queue.put('a1', 1) |
241
|
|
|
task_queue.put('a2', 2) |
242
|
|
|
assert task_queue.get() == 'a2' |
243
|
|
|
time.sleep(0.1) |
244
|
|
|
task_queue._check_time_queue() |
245
|
|
|
assert task_queue.get() == 'a3' |
246
|
|
|
assert task_queue.get() == 'a1' |
247
|
|
|
task_queue._check_processing() |
248
|
|
|
assert task_queue.get() == 'a2' |
249
|
|
|
assert len(task_queue) == 0 |
250
|
|
|
|