Total Complexity | 41 |
Total Lines | 156 |
Duplicated Lines | 12.18 % |
Changes | 1 | ||
Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like TaskQueue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
131 | class TaskQueue(object): |
||
132 | ''' |
||
133 | task queue for scheduler, have a priority queue and a time queue for delayed tasks |
||
134 | ''' |
||
135 | processing_timeout = 10 * 60 |
||
136 | |||
137 | def __init__(self, rate=0, burst=0): |
||
138 | self.mutex = threading.RLock() |
||
139 | self.priority_queue = PriorityTaskQueue() |
||
140 | self.time_queue = PriorityTaskQueue() |
||
141 | self.processing = PriorityTaskQueue() |
||
142 | self.bucket = Bucket(rate=rate, burst=burst) |
||
143 | |||
144 | @property |
||
145 | def rate(self): |
||
146 | return self.bucket.rate |
||
147 | |||
148 | @rate.setter |
||
149 | def rate(self, value): |
||
150 | self.bucket.rate = value |
||
151 | |||
152 | @property |
||
153 | def burst(self): |
||
154 | View Code Duplication | return self.bucket.burst |
|
|
|||
155 | |||
156 | @burst.setter |
||
157 | def burst(self, value): |
||
158 | self.bucket.burst = value |
||
159 | |||
160 | def check_update(self): |
||
161 | ''' |
||
162 | Check time queue and processing queue |
||
163 | View Code Duplication | ||
164 | put tasks to priority queue when execute time arrived or process timeout |
||
165 | ''' |
||
166 | self._check_time_queue() |
||
167 | self._check_processing() |
||
168 | |||
169 | def _check_time_queue(self): |
||
170 | now = time.time() |
||
171 | self.mutex.acquire() |
||
172 | while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now: |
||
173 | task = self.time_queue.get_nowait() # type: InQueueTask |
||
174 | task.exetime = 0 |
||
175 | self.priority_queue.put(task) |
||
176 | self.mutex.release() |
||
177 | |||
178 | def _check_processing(self): |
||
179 | now = time.time() |
||
180 | self.mutex.acquire() |
||
181 | while self.processing.qsize() and self.processing.top and self.processing.top.exetime < now: |
||
182 | task = self.processing.get_nowait() |
||
183 | if task.taskid is None: |
||
184 | continue |
||
185 | task.exetime = 0 |
||
186 | self.priority_queue.put(task) |
||
187 | logger.info("processing: retry %s", task.taskid) |
||
188 | self.mutex.release() |
||
189 | |||
190 | def put(self, taskid, priority=0, exetime=0): |
||
191 | """ |
||
192 | Put a task into task queue |
||
193 | |||
194 | when use heap sort, if we put tasks(with the same priority and exetime=0) into queue, |
||
195 | the queue is not a strict FIFO queue, but more like a FILO stack. |
||
196 | It is very possible that when there are continuous big flow, the speed of select is |
||
197 | slower than request, resulting in priority-queue accumulation in short time. |
||
198 | In this scenario, the tasks more earlier entering the priority-queue will not get |
||
199 | processed until the request flow becomes small. |
||
200 | |||
201 | Thus, we store a global atom self increasing value into task.sequence which represent |
||
202 | the task enqueue sequence. When the comparison of exetime and priority have no |
||
203 | difference, we compare task.sequence to ensure that the entire queue is ordered. |
||
204 | """ |
||
205 | now = time.time() |
||
206 | |||
207 | task = InQueueTask(taskid, priority, exetime) |
||
208 | |||
209 | self.mutex.acquire() |
||
210 | if taskid in self.priority_queue: |
||
211 | self.priority_queue.put(task) |
||
212 | elif taskid in self.time_queue: |
||
213 | self.time_queue.put(task) |
||
214 | elif taskid in self.processing and self.processing[taskid].taskid: |
||
215 | # force update a processing task is not allowed as there are so many |
||
216 | # problems may happen |
||
217 | pass |
||
218 | else: |
||
219 | if exetime and exetime > now: |
||
220 | self.time_queue.put(task) |
||
221 | else: |
||
222 | task.exetime = 0 |
||
223 | self.priority_queue.put(task) |
||
224 | |||
225 | self.mutex.release() |
||
226 | |||
227 | def get(self): |
||
228 | '''Get a task from queue when bucket available''' |
||
229 | if self.bucket.get() < 1: |
||
230 | return None |
||
231 | now = time.time() |
||
232 | self.mutex.acquire() |
||
233 | try: |
||
234 | task = self.priority_queue.get_nowait() |
||
235 | self.bucket.desc() |
||
236 | except Queue.Empty: |
||
237 | self.mutex.release() |
||
238 | return None |
||
239 | task.exetime = now + self.processing_timeout |
||
240 | self.processing.put(task) |
||
241 | self.mutex.release() |
||
242 | return task.taskid |
||
243 | |||
244 | def done(self, taskid): |
||
245 | '''Mark task done''' |
||
246 | if taskid in self.processing: |
||
247 | self.mutex.acquire() |
||
248 | if taskid in self.processing: |
||
249 | del self.processing[taskid] |
||
250 | self.mutex.release() |
||
251 | return True |
||
252 | return False |
||
253 | |||
254 | def delete(self, taskid): |
||
255 | if taskid not in self: |
||
256 | return False |
||
257 | if taskid in self.priority_queue: |
||
258 | self.mutex.acquire() |
||
259 | del self.priority_queue[taskid] |
||
260 | self.mutex.release() |
||
261 | elif taskid in self.time_queue: |
||
262 | self.mutex.acquire() |
||
263 | del self.time_queue[taskid] |
||
264 | self.mutex.release() |
||
265 | elif taskid in self.processing: |
||
266 | self.done(taskid) |
||
267 | return True |
||
268 | |||
269 | def size(self): |
||
270 | return self.priority_queue.qsize() + self.time_queue.qsize() + self.processing.qsize() |
||
271 | |||
272 | def is_processing(self, taskid): |
||
273 | ''' |
||
274 | return True if taskid is in processing |
||
275 | ''' |
||
276 | return taskid in self.processing and self.processing[taskid].taskid |
||
277 | |||
278 | def __len__(self): |
||
279 | return self.size() |
||
280 | |||
281 | def __contains__(self, taskid): |
||
282 | if taskid in self.priority_queue or taskid in self.time_queue: |
||
283 | return True |
||
284 | if taskid in self.processing and self.processing[taskid].taskid: |
||
285 | return True |
||
286 | return False |
||
287 | |||
303 |