Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

TaskQueue.burst()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 13:12:10
7
8
import time
9
import heapq
10
import logging
11
import threading
12
try:
13
    from UserDict import DictMixin
14
except ImportError:
15
    from collections import Mapping as DictMixin
16
from .token_bucket import Bucket
17
from six.moves import queue as Queue
18
19
logger = logging.getLogger('scheduler')
20
21
try:
22
    cmp
23
except NameError:
24
    cmp = lambda x, y: (x > y) - (x < y)
25
26
27
class InQueueTask(DictMixin):
28
    __slots__ = ('taskid', 'priority', 'exetime')
29
    __getitem__ = lambda *x: getattr(*x)
30
    __setitem__ = lambda *x: setattr(*x)
31
    __iter__ = lambda self: iter(self.__slots__)
32
    __len__ = lambda self: len(self.__slots__)
33
    keys = lambda self: self.__slots__
34
35
    def __init__(self, taskid, priority=0, exetime=0):
36
        self.taskid = taskid
37
        self.priority = priority
38
        self.exetime = exetime
39
40
    def __cmp__(self, other):
41
        if self.exetime == 0 and other.exetime == 0:
42
            return -cmp(self.priority, other.priority)
43
        else:
44
            return cmp(self.exetime, other.exetime)
45
46
    def __lt__(self, other):
47
        return self.__cmp__(other) < 0
48
49
50
class PriorityTaskQueue(Queue.Queue):
51
52
    '''
53
    TaskQueue
54
55
    Same taskid items will been merged
56
    '''
57
58
    def _init(self, maxsize):
59
        self.queue = []
60
        self.queue_dict = dict()
61
62
    def _qsize(self, len=len):
63
        return len(self.queue_dict)
64
65
    def _put(self, item, heappush=heapq.heappush):
66
        if item.taskid in self.queue_dict:
67
            task = self.queue_dict[item.taskid]
68
            changed = False
69
            if item.priority > task.priority:
70
                task.priority = item.priority
71
                changed = True
72
            if item.exetime < task.exetime:
73
                task.exetime = item.exetime
74
                changed = True
75
            if changed:
76
                self._resort()
77
        else:
78
            heappush(self.queue, item)
79
            self.queue_dict[item.taskid] = item
80
81
    def _get(self, heappop=heapq.heappop):
82
        while self.queue:
83
            item = heappop(self.queue)
84
            if item.taskid is None:
85
                continue
86
            self.queue_dict.pop(item.taskid, None)
87
            return item
88
        return None
89
90
    @property
91
    def top(self):
92
        while self.queue and self.queue[0].taskid is None:
93
            heapq.heappop(self.queue)
94
        if self.queue:
95
            return self.queue[0]
96
        return None
97
98
    def _resort(self):
99
        heapq.heapify(self.queue)
100
101
    def __contains__(self, taskid):
102
        return taskid in self.queue_dict
103
104
    def __getitem__(self, taskid):
105
        return self.queue_dict[taskid]
106
107
    def __setitem__(self, taskid, item):
108
        assert item.taskid == taskid
109
        self.put(item)
110
111
    def __delitem__(self, taskid):
112
        self.queue_dict.pop(taskid).taskid = None
113
114
115
class TaskQueue(object):
116
117
    '''
118
    task queue for scheduler, have a priority queue and a time queue for delayed tasks
119
    '''
120
    processing_timeout = 10 * 60
121
122
    def __init__(self, rate=0, burst=0):
123
        self.mutex = threading.RLock()
124
        self.priority_queue = PriorityTaskQueue()
125
        self.time_queue = PriorityTaskQueue()
126
        self.processing = PriorityTaskQueue()
127
        self.bucket = Bucket(rate=rate, burst=burst)
128
129
    @property
130
    def rate(self):
131
        return self.bucket.rate
132
133
    @rate.setter
134
    def rate(self, value):
135
        self.bucket.rate = value
136
137
    @property
138
    def burst(self):
139
        return self.burst.burst
140
141
    @burst.setter
142
    def burst(self, value):
143
        self.bucket.burst = value
144
145
    def check_update(self):
146
        '''
147
        Check time queue and processing queue
148
149
        put tasks to priority queue when execute time arrived or process timeout
150
        '''
151
        self._check_time_queue()
152
        self._check_processing()
153
154 View Code Duplication
    def _check_time_queue(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
155
        now = time.time()
156
        self.mutex.acquire()
157
        while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now:
158
            task = self.time_queue.get_nowait()
159
            task.exetime = 0
160
            self.priority_queue.put(task)
161
        self.mutex.release()
162
163 View Code Duplication
    def _check_processing(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
164
        now = time.time()
165
        self.mutex.acquire()
166
        while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now:
167
            task = self.processing.get_nowait()
168
            if task.taskid is None:
169
                continue
170
            task.exetime = 0
171
            self.priority_queue.put(task)
172
            logger.info("processing: retry %s", task.taskid)
173
        self.mutex.release()
174
175
    def put(self, taskid, priority=0, exetime=0):
176
        '''Put a task into task queue'''
177
        now = time.time()
178
        task = InQueueTask(taskid, priority, exetime)
179
        self.mutex.acquire()
180
        if taskid in self.priority_queue:
181
            self.priority_queue.put(task)
182
        elif taskid in self.time_queue:
183
            self.time_queue.put(task)
184
        elif taskid in self.processing and self.processing[taskid].taskid:
185
            # force update a processing task is not allowed as there are so many
186
            # problems may happen
187
            pass
188
        else:
189
            if exetime and exetime > now:
190
                self.time_queue.put(task)
191
            else:
192
                self.priority_queue.put(task)
193
        self.mutex.release()
194
195
    def get(self):
196
        '''Get a task from queue when bucket available'''
197
        if self.bucket.get() < 1:
198
            return None
199
        now = time.time()
200
        self.mutex.acquire()
201
        try:
202
            task = self.priority_queue.get_nowait()
203
            self.bucket.desc()
204
        except Queue.Empty:
205
            self.mutex.release()
206
            return None
207
        task.exetime = now + self.processing_timeout
208
        self.processing.put(task)
209
        self.mutex.release()
210
        return task.taskid
211
212
    def done(self, taskid):
213
        '''Mark task done'''
214
        if taskid in self.processing:
215
            self.mutex.acquire()
216
            if taskid in self.processing:
217
                del self.processing[taskid]
218
            self.mutex.release()
219
            return True
220
        return False
221
222
    def size(self):
223
        return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize()
224
225
    def __len__(self):
226
        return self.size()
227
228
    def __contains__(self, taskid):
229
        if taskid in self.priority_queue or taskid in self.time_queue:
230
            return True
231
        if taskid in self.processing and self.processing[taskid].taskid:
232
            return True
233
        return False
234
235
236
if __name__ == '__main__':
237
    task_queue = TaskQueue()
238
    task_queue.processing_timeout = 0.1
239
    task_queue.put('a3', 3, time.time() + 0.1)
240
    task_queue.put('a1', 1)
241
    task_queue.put('a2', 2)
242
    assert task_queue.get() == 'a2'
243
    time.sleep(0.1)
244
    task_queue._check_time_queue()
245
    assert task_queue.get() == 'a3'
246
    assert task_queue.get() == 'a1'
247
    task_queue._check_processing()
248
    assert task_queue.get() == 'a2'
249
    assert len(task_queue) == 0
250