Total Complexity | 162 |
Total Lines | 726 |
Duplicated Lines | 0 % |
Complex classes like pyspider.scheduler.Scheduler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
24 | class Scheduler(object): |
||
25 | UPDATE_PROJECT_INTERVAL = 5 * 60 |
||
26 | default_schedule = { |
||
27 | 'priority': 0, |
||
28 | 'retries': 3, |
||
29 | 'exetime': 0, |
||
30 | 'age': -1, |
||
31 | 'itag': None, |
||
32 | } |
||
33 | LOOP_LIMIT = 1000 |
||
34 | LOOP_INTERVAL = 0.1 |
||
35 | ACTIVE_TASKS = 100 |
||
36 | INQUEUE_LIMIT = 0 |
||
37 | EXCEPTION_LIMIT = 3 |
||
38 | DELETE_TIME = 24 * 60 * 60 |
||
39 | DEFAULT_RETRY_DELAY = { |
||
40 | 0: 30, |
||
41 | 1: 1*60*60, |
||
42 | 2: 6*60*60, |
||
43 | 3: 12*60*60, |
||
44 | '': 24*60*60 |
||
45 | } |
||
46 | |||
47 | def __init__(self, taskdb, projectdb, newtask_queue, status_queue, |
||
48 | out_queue, data_path='./data', resultdb=None): |
||
49 | self.taskdb = taskdb |
||
50 | self.projectdb = projectdb |
||
51 | self.resultdb = resultdb |
||
52 | self.newtask_queue = newtask_queue |
||
53 | self.status_queue = status_queue |
||
54 | self.out_queue = out_queue |
||
55 | self.data_path = data_path |
||
56 | |||
57 | self._send_buffer = deque() |
||
58 | self._quit = False |
||
59 | self._exceptions = 0 |
||
60 | self.projects = dict() |
||
61 | self._force_update_project = False |
||
62 | self._last_update_project = 0 |
||
63 | self.task_queue = dict() |
||
64 | self._last_tick = int(time.time()) |
||
65 | |||
66 | self._cnt = { |
||
67 | "5m_time": counter.CounterManager( |
||
68 | lambda: counter.TimebaseAverageEventCounter(30, 10)), |
||
69 | "5m": counter.CounterManager( |
||
70 | lambda: counter.TimebaseAverageWindowCounter(30, 10)), |
||
71 | "1h": counter.CounterManager( |
||
72 | lambda: counter.TimebaseAverageWindowCounter(60, 60)), |
||
73 | "1d": counter.CounterManager( |
||
74 | lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)), |
||
75 | "all": counter.CounterManager( |
||
76 | lambda: counter.TotalCounter()), |
||
77 | } |
||
78 | self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h')) |
||
79 | self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d')) |
||
80 | self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all')) |
||
81 | self._last_dump_cnt = 0 |
||
82 | |||
83 | def _update_projects(self): |
||
84 | '''Check project update''' |
||
85 | now = time.time() |
||
86 | if ( |
||
87 | not self._force_update_project |
||
88 | and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now |
||
89 | ): |
||
90 | return |
||
91 | for project in self.projectdb.check_update(self._last_update_project): |
||
92 | self._update_project(project) |
||
93 | logger.debug("project: %s updated.", project['name']) |
||
94 | self._force_update_project = False |
||
95 | self._last_update_project = now |
||
96 | |||
97 | def _update_project(self, project): |
||
98 | '''update one project''' |
||
99 | if project['name'] not in self.projects: |
||
100 | self.projects[project['name']] = {} |
||
101 | self.projects[project['name']].update(project) |
||
102 | self.projects[project['name']]['md5sum'] = utils.md5string(project['script']) |
||
103 | if not self.projects[project['name']].get('active_tasks', None): |
||
104 | self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS) |
||
105 | |||
106 | # load task queue when project is running and delete task_queue when project is stoped |
||
107 | if project['status'] in ('RUNNING', 'DEBUG'): |
||
108 | if project['name'] not in self.task_queue: |
||
109 | self._load_tasks(project['name']) |
||
110 | self.task_queue[project['name']].rate = project['rate'] |
||
111 | self.task_queue[project['name']].burst = project['burst'] |
||
112 | |||
113 | # update project runtime info from processor by sending a _on_get_info |
||
114 | # request, result is in status_page.track.save |
||
115 | self.on_select_task({ |
||
116 | 'taskid': '_on_get_info', |
||
117 | 'project': project['name'], |
||
118 | 'url': 'data:,_on_get_info', |
||
119 | 'status': self.taskdb.SUCCESS, |
||
120 | 'fetch': { |
||
121 | 'save': ['min_tick', 'retry_delay'], |
||
122 | }, |
||
123 | 'process': { |
||
124 | 'callback': '_on_get_info', |
||
125 | }, |
||
126 | }) |
||
127 | else: |
||
128 | if project['name'] in self.task_queue: |
||
1 ignored issue
–
show
|
|||
129 | self.task_queue[project['name']].rate = 0 |
||
130 | self.task_queue[project['name']].burst = 0 |
||
131 | del self.task_queue[project['name']] |
||
132 | |||
133 | if project not in self._cnt['all']: |
||
134 | self._update_project_cnt(project['name']) |
||
135 | |||
136 | scheduler_task_fields = ['taskid', 'project', 'schedule', ] |
||
137 | |||
138 | def _load_tasks(self, project): |
||
139 | '''load tasks from database''' |
||
140 | self.task_queue[project] = TaskQueue(rate=0, burst=0) |
||
141 | for task in self.taskdb.load_tasks( |
||
142 | self.taskdb.ACTIVE, project, self.scheduler_task_fields |
||
143 | ): |
||
144 | taskid = task['taskid'] |
||
145 | _schedule = task.get('schedule', self.default_schedule) |
||
146 | priority = _schedule.get('priority', self.default_schedule['priority']) |
||
147 | exetime = _schedule.get('exetime', self.default_schedule['exetime']) |
||
148 | self.task_queue[project].put(taskid, priority, exetime) |
||
149 | logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project])) |
||
150 | |||
151 | if self.projects[project]['status'] in ('RUNNING', 'DEBUG'): |
||
152 | self.task_queue[project].rate = self.projects[project]['rate'] |
||
153 | self.task_queue[project].burst = self.projects[project]['burst'] |
||
154 | else: |
||
155 | self.task_queue[project].rate = 0 |
||
156 | self.task_queue[project].burst = 0 |
||
157 | |||
158 | if project not in self._cnt['all']: |
||
159 | self._update_project_cnt(project) |
||
160 | self._cnt['all'].value((project, 'pending'), len(self.task_queue[project])) |
||
161 | |||
162 | def _update_project_cnt(self, project): |
||
163 | status_count = self.taskdb.status_count(project) |
||
164 | self._cnt['all'].value( |
||
165 | (project, 'success'), |
||
166 | status_count.get(self.taskdb.SUCCESS, 0) |
||
167 | ) |
||
168 | self._cnt['all'].value( |
||
169 | (project, 'failed'), |
||
170 | status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0) |
||
171 | ) |
||
172 | self._cnt['all'].value( |
||
173 | (project, 'pending'), |
||
174 | status_count.get(self.taskdb.ACTIVE, 0) |
||
175 | ) |
||
176 | |||
177 | def task_verify(self, task): |
||
178 | ''' |
||
179 | return False if any of 'taskid', 'project', 'url' is not in task dict |
||
180 | or project in not in task_queue |
||
181 | ''' |
||
182 | for each in ('taskid', 'project', 'url', ): |
||
183 | if each not in task or not task[each]: |
||
184 | logger.error('%s not in task: %.200r', each, task) |
||
185 | return False |
||
186 | if task['project'] not in self.task_queue: |
||
187 | logger.error('unknown project: %s', task['project']) |
||
188 | return False |
||
189 | return True |
||
190 | |||
191 | def insert_task(self, task): |
||
192 | '''insert task into database''' |
||
193 | return self.taskdb.insert(task['project'], task['taskid'], task) |
||
194 | |||
195 | def update_task(self, task): |
||
196 | '''update task in database''' |
||
197 | return self.taskdb.update(task['project'], task['taskid'], task) |
||
198 | |||
199 | def put_task(self, task): |
||
200 | '''put task to task queue''' |
||
201 | _schedule = task.get('schedule', self.default_schedule) |
||
202 | self.task_queue[task['project']].put( |
||
203 | task['taskid'], |
||
204 | priority=_schedule.get('priority', self.default_schedule['priority']), |
||
205 | exetime=_schedule.get('exetime', self.default_schedule['exetime']) |
||
206 | ) |
||
207 | |||
208 | def send_task(self, task, force=True): |
||
209 | ''' |
||
210 | dispatch task to fetcher |
||
211 | |||
212 | out queue may have size limit to prevent block, a send_buffer is used |
||
213 | ''' |
||
214 | try: |
||
215 | self.out_queue.put_nowait(task) |
||
216 | except Queue.Full: |
||
217 | if force: |
||
218 | self._send_buffer.appendleft(task) |
||
219 | else: |
||
220 | raise |
||
221 | |||
222 | def _check_task_done(self): |
||
223 | '''Check status queue''' |
||
224 | cnt = 0 |
||
225 | try: |
||
226 | while True: |
||
227 | task = self.status_queue.get_nowait() |
||
228 | # check _on_get_info result here |
||
229 | if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task: |
||
230 | self.projects[task['project']].update(task['track'].get('save') or {}) |
||
231 | logger.info( |
||
232 | '%s on_get_info %r', task['project'], task['track'].get('save', {}) |
||
233 | ) |
||
234 | continue |
||
235 | elif not self.task_verify(task): |
||
236 | continue |
||
237 | self.on_task_status(task) |
||
238 | cnt += 1 |
||
239 | except Queue.Empty: |
||
240 | pass |
||
241 | return cnt |
||
242 | |||
243 | merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime'] |
||
244 | |||
245 | def _check_request(self): |
||
246 | '''Check new task queue''' |
||
247 | tasks = {} |
||
248 | while len(tasks) < self.LOOP_LIMIT: |
||
249 | try: |
||
250 | task = self.newtask_queue.get_nowait() |
||
251 | except Queue.Empty: |
||
252 | break |
||
253 | |||
254 | if isinstance(task, list): |
||
255 | _tasks = task |
||
256 | else: |
||
257 | _tasks = (task, ) |
||
258 | |||
259 | for task in _tasks: |
||
260 | if not self.task_verify(task): |
||
261 | continue |
||
262 | |||
263 | if task['taskid'] in self.task_queue[task['project']]: |
||
264 | if not task.get('schedule', {}).get('force_update', False): |
||
265 | logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
||
266 | continue |
||
267 | |||
268 | if task['taskid'] in tasks: |
||
269 | if not task.get('schedule', {}).get('force_update', False): |
||
270 | continue |
||
271 | |||
272 | tasks[task['taskid']] = task |
||
273 | |||
274 | for task in itervalues(tasks): |
||
275 | if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT: |
||
276 | logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task) |
||
277 | continue |
||
278 | |||
279 | oldtask = self.taskdb.get_task(task['project'], task['taskid'], |
||
280 | fields=self.merge_task_fields) |
||
281 | if oldtask: |
||
282 | task = self.on_old_request(task, oldtask) |
||
283 | else: |
||
284 | task = self.on_new_request(task) |
||
285 | |||
286 | return len(tasks) |
||
287 | |||
288 | def _check_cronjob(self): |
||
289 | """Check projects cronjob tick, return True when a new tick is sended""" |
||
290 | now = time.time() |
||
291 | self._last_tick = int(self._last_tick) |
||
292 | if now - self._last_tick < 1: |
||
293 | return False |
||
294 | self._last_tick += 1 |
||
295 | for project in itervalues(self.projects): |
||
296 | if project['status'] not in ('DEBUG', 'RUNNING'): |
||
297 | continue |
||
298 | if project.get('min_tick', 0) == 0: |
||
299 | continue |
||
300 | if self._last_tick % int(project['min_tick']) != 0: |
||
301 | continue |
||
302 | self.on_select_task({ |
||
303 | 'taskid': '_on_cronjob', |
||
304 | 'project': project['name'], |
||
305 | 'url': 'data:,_on_cronjob', |
||
306 | 'status': self.taskdb.SUCCESS, |
||
307 | 'fetch': { |
||
308 | 'save': { |
||
309 | 'tick': self._last_tick, |
||
310 | }, |
||
311 | }, |
||
312 | 'process': { |
||
313 | 'callback': '_on_cronjob', |
||
314 | }, |
||
315 | }) |
||
316 | return True |
||
317 | |||
318 | request_task_fields = [ |
||
319 | 'taskid', |
||
320 | 'project', |
||
321 | 'url', |
||
322 | 'status', |
||
323 | 'schedule', |
||
324 | 'fetch', |
||
325 | 'process', |
||
326 | 'track', |
||
327 | 'lastcrawltime' |
||
328 | ] |
||
329 | |||
330 | def _check_select(self): |
||
331 | '''Select task to fetch & process''' |
||
332 | while self._send_buffer: |
||
333 | _task = self._send_buffer.pop() |
||
334 | try: |
||
335 | # use force=False here to prevent automatic send_buffer append and get exception |
||
336 | self.send_task(_task, False) |
||
337 | except Queue.Full: |
||
338 | self._send_buffer.append(_task) |
||
339 | break |
||
340 | |||
341 | if self.out_queue.full(): |
||
342 | return {} |
||
343 | |||
344 | taskids = [] |
||
345 | cnt = 0 |
||
346 | cnt_dict = dict() |
||
347 | limit = self.LOOP_LIMIT |
||
348 | for project, task_queue in iteritems(self.task_queue): |
||
349 | if cnt >= limit: |
||
350 | break |
||
351 | |||
352 | # task queue |
||
353 | self.task_queue[project].check_update() |
||
354 | project_cnt = 0 |
||
355 | |||
356 | # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks |
||
357 | while cnt < limit and project_cnt < limit / 10: |
||
358 | taskid = task_queue.get() |
||
359 | if not taskid: |
||
360 | break |
||
361 | |||
362 | taskids.append((project, taskid)) |
||
363 | project_cnt += 1 |
||
364 | cnt += 1 |
||
365 | cnt_dict[project] = project_cnt |
||
366 | |||
367 | for project, taskid in taskids: |
||
368 | task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields) |
||
369 | if not task: |
||
370 | continue |
||
371 | task = self.on_select_task(task) |
||
372 | |||
373 | return cnt_dict |
||
374 | |||
375 | def _print_counter_log(self): |
||
376 | # print top 5 active counters |
||
377 | keywords = ('pending', 'success', 'retry', 'failed') |
||
378 | total_cnt = {} |
||
379 | project_actives = [] |
||
380 | project_fails = [] |
||
381 | for key in keywords: |
||
382 | total_cnt[key] = 0 |
||
383 | for project, subcounter in iteritems(self._cnt['5m']): |
||
384 | actives = 0 |
||
385 | for key in keywords: |
||
386 | cnt = subcounter.get(key, None) |
||
387 | if cnt: |
||
388 | cnt = cnt.sum |
||
389 | total_cnt[key] += cnt |
||
390 | actives += cnt |
||
391 | |||
392 | project_actives.append((actives, project)) |
||
393 | |||
394 | fails = subcounter.get('failed', None) |
||
395 | if fails: |
||
396 | project_fails.append((fails.sum, project)) |
||
397 | |||
398 | top_2_fails = sorted(project_fails, reverse=True)[:2] |
||
399 | top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails], |
||
400 | reverse=True)[:5 - len(top_2_fails)] |
||
401 | |||
402 | log_str = ("in 5m: new:%(pending)d,success:%(success)d," |
||
403 | "retry:%(retry)d,failed:%(failed)d" % total_cnt) |
||
404 | for _, project in itertools.chain(top_3_actives, top_2_fails): |
||
405 | subcounter = self._cnt['5m'][project].to_dict(get_value='sum') |
||
406 | log_str += " %s:%d,%d,%d,%d" % (project, |
||
407 | subcounter.get('pending', 0), |
||
408 | subcounter.get('success', 0), |
||
409 | subcounter.get('retry', 0), |
||
410 | subcounter.get('failed', 0)) |
||
411 | logger.info(log_str) |
||
412 | |||
413 | def _dump_cnt(self): |
||
414 | '''Dump counters to file''' |
||
415 | self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h')) |
||
416 | self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d')) |
||
417 | self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all')) |
||
418 | |||
419 | def _try_dump_cnt(self): |
||
420 | '''Dump counters every 60 seconds''' |
||
421 | now = time.time() |
||
422 | if now - self._last_dump_cnt > 60: |
||
423 | self._last_dump_cnt = now |
||
424 | self._dump_cnt() |
||
425 | self._print_counter_log() |
||
426 | |||
427 | def _check_delete(self): |
||
428 | '''Check project delete''' |
||
429 | now = time.time() |
||
430 | for project in list(itervalues(self.projects)): |
||
431 | if project['status'] != 'STOP': |
||
432 | continue |
||
433 | if now - project['updatetime'] < self.DELETE_TIME: |
||
434 | continue |
||
435 | if 'delete' not in self.projectdb.split_group(project['group']): |
||
436 | continue |
||
437 | |||
438 | logger.warning("deleting project: %s!", project['name']) |
||
439 | if project['name'] in self.task_queue: |
||
1 ignored issue
–
show
|
|||
440 | self.task_queue[project['name']].rate = 0 |
||
441 | self.task_queue[project['name']].burst = 0 |
||
442 | del self.task_queue[project['name']] |
||
443 | del self.projects[project['name']] |
||
444 | self.taskdb.drop(project['name']) |
||
445 | self.projectdb.drop(project['name']) |
||
446 | if self.resultdb: |
||
447 | self.resultdb.drop(project['name']) |
||
448 | |||
449 | def __len__(self): |
||
450 | return sum(len(x) for x in itervalues(self.task_queue)) |
||
451 | |||
452 | def quit(self): |
||
453 | '''Set quit signal''' |
||
454 | self._quit = True |
||
455 | |||
456 | def run_once(self): |
||
457 | '''comsume queues and feed tasks to fetcher, once''' |
||
458 | |||
459 | self._update_projects() |
||
460 | self._check_task_done() |
||
461 | self._check_request() |
||
462 | while self._check_cronjob(): |
||
463 | pass |
||
464 | self._check_select() |
||
465 | self._check_delete() |
||
466 | self._try_dump_cnt() |
||
467 | |||
468 | def run(self): |
||
469 | '''Start scheduler loop''' |
||
470 | logger.info("loading projects") |
||
471 | |||
472 | while not self._quit: |
||
473 | try: |
||
474 | time.sleep(self.LOOP_INTERVAL) |
||
475 | self.run_once() |
||
476 | self._exceptions = 0 |
||
477 | except KeyboardInterrupt: |
||
478 | break |
||
479 | except Exception as e: |
||
480 | logger.exception(e) |
||
481 | self._exceptions += 1 |
||
482 | if self._exceptions > self.EXCEPTION_LIMIT: |
||
483 | break |
||
484 | continue |
||
485 | |||
486 | logger.info("scheduler exiting...") |
||
487 | self._dump_cnt() |
||
488 | |||
489 | def trigger_on_start(self, project): |
||
490 | '''trigger an on_start callback of project''' |
||
491 | self.newtask_queue.put({ |
||
492 | "project": project, |
||
493 | "taskid": "on_start", |
||
494 | "url": "data:,on_start", |
||
495 | "process": { |
||
496 | "callback": "on_start", |
||
497 | }, |
||
498 | }) |
||
499 | |||
500 | def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False): |
||
501 | '''Start xmlrpc interface''' |
||
502 | try: |
||
503 | from six.moves.xmlrpc_server import SimpleXMLRPCServer |
||
504 | except ImportError: |
||
505 | from SimpleXMLRPCServer import SimpleXMLRPCServer |
||
506 | |||
507 | server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests) |
||
508 | server.register_introspection_functions() |
||
509 | server.register_multicall_functions() |
||
510 | |||
511 | server.register_function(self.quit, '_quit') |
||
512 | server.register_function(self.__len__, 'size') |
||
513 | |||
514 | def dump_counter(_time, _type): |
||
515 | try: |
||
516 | return self._cnt[_time].to_dict(_type) |
||
517 | except: |
||
518 | logger.exception('') |
||
519 | server.register_function(dump_counter, 'counter') |
||
520 | |||
521 | def new_task(task): |
||
522 | if self.task_verify(task): |
||
523 | self.newtask_queue.put(task) |
||
524 | return True |
||
525 | return False |
||
526 | server.register_function(new_task, 'newtask') |
||
527 | |||
528 | def send_task(task): |
||
529 | '''dispatch task to fetcher''' |
||
530 | self.send_task(task) |
||
531 | return True |
||
532 | server.register_function(send_task, 'send_task') |
||
533 | |||
534 | def update_project(): |
||
535 | self._force_update_project = True |
||
536 | server.register_function(update_project, 'update_project') |
||
537 | |||
538 | def get_active_tasks(project=None, limit=100): |
||
539 | allowed_keys = set(( |
||
540 | 'taskid', |
||
541 | 'project', |
||
542 | 'status', |
||
543 | 'url', |
||
544 | 'lastcrawltime', |
||
545 | 'updatetime', |
||
546 | 'track', |
||
547 | )) |
||
548 | track_allowed_keys = set(( |
||
549 | 'ok', |
||
550 | 'time', |
||
551 | 'follows', |
||
552 | 'status_code', |
||
553 | )) |
||
554 | |||
555 | iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects) |
||
556 | if x and (k == project if project else True)] |
||
557 | tasks = [next(x, None) for x in iters] |
||
558 | result = [] |
||
559 | |||
560 | while len(result) < limit and tasks and not all(x is None for x in tasks): |
||
561 | updatetime, task = t = max(t for t in tasks if t) |
||
562 | i = tasks.index(t) |
||
563 | tasks[i] = next(iters[i], None) |
||
564 | for key in list(task): |
||
565 | if key == 'track': |
||
566 | for k in list(task[key].get('fetch', [])): |
||
567 | if k not in track_allowed_keys: |
||
568 | del task[key]['fetch'][k] |
||
569 | for k in list(task[key].get('process', [])): |
||
570 | if k not in track_allowed_keys: |
||
571 | del task[key]['process'][k] |
||
572 | if key in allowed_keys: |
||
573 | continue |
||
574 | del task[key] |
||
575 | result.append(t) |
||
576 | # fix for "<type 'exceptions.TypeError'>:dictionary key must be string" |
||
577 | # have no idea why |
||
578 | return json.loads(json.dumps(result)) |
||
579 | server.register_function(get_active_tasks, 'get_active_tasks') |
||
580 | |||
581 | server.timeout = 0.5 |
||
582 | while not self._quit: |
||
583 | server.handle_request() |
||
584 | server.server_close() |
||
585 | |||
586 | def on_new_request(self, task): |
||
587 | '''Called when a new request is arrived''' |
||
588 | task['status'] = self.taskdb.ACTIVE |
||
589 | self.insert_task(task) |
||
590 | self.put_task(task) |
||
591 | |||
592 | project = task['project'] |
||
593 | self._cnt['5m'].event((project, 'pending'), +1) |
||
594 | self._cnt['1h'].event((project, 'pending'), +1) |
||
595 | self._cnt['1d'].event((project, 'pending'), +1) |
||
596 | self._cnt['all'].event((project, 'pending'), +1) |
||
597 | logger.info('new task %(project)s:%(taskid)s %(url)s', task) |
||
598 | return task |
||
599 | |||
600 | def on_old_request(self, task, old_task): |
||
601 | '''Called when a crawled task is arrived''' |
||
602 | now = time.time() |
||
603 | |||
604 | _schedule = task.get('schedule', self.default_schedule) |
||
605 | old_schedule = old_task.get('schedule', {}) |
||
606 | |||
607 | restart = False |
||
608 | schedule_age = _schedule.get('age', self.default_schedule['age']) |
||
609 | if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'): |
||
610 | restart = True |
||
611 | elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now: |
||
612 | restart = True |
||
613 | elif _schedule.get('force_update'): |
||
614 | restart = True |
||
615 | |||
616 | if not restart: |
||
617 | logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
||
618 | return |
||
619 | |||
620 | task['status'] = self.taskdb.ACTIVE |
||
621 | self.update_task(task) |
||
622 | self.put_task(task) |
||
623 | |||
624 | project = task['project'] |
||
625 | if old_task['status'] != self.taskdb.ACTIVE: |
||
626 | self._cnt['5m'].event((project, 'pending'), +1) |
||
627 | self._cnt['1h'].event((project, 'pending'), +1) |
||
628 | self._cnt['1d'].event((project, 'pending'), +1) |
||
629 | if old_task['status'] == self.taskdb.SUCCESS: |
||
630 | self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1) |
||
631 | elif old_task['status'] == self.taskdb.FAILED: |
||
632 | self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1) |
||
633 | logger.info('restart task %(project)s:%(taskid)s %(url)s', task) |
||
634 | return task |
||
635 | |||
636 | def on_task_status(self, task): |
||
637 | '''Called when a status pack is arrived''' |
||
638 | try: |
||
639 | procesok = task['track']['process']['ok'] |
||
640 | if not self.task_queue[task['project']].done(task['taskid']): |
||
641 | logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task) |
||
642 | return None |
||
643 | except KeyError as e: |
||
644 | logger.error("Bad status pack: %s", e) |
||
645 | return None |
||
646 | |||
647 | if procesok: |
||
648 | ret = self.on_task_done(task) |
||
649 | else: |
||
650 | ret = self.on_task_failed(task) |
||
651 | |||
652 | if task['track']['fetch'].get('time'): |
||
653 | self._cnt['5m_time'].event((task['project'], 'fetch_time'), |
||
654 | task['track']['fetch']['time']) |
||
655 | if task['track']['process'].get('time'): |
||
656 | self._cnt['5m_time'].event((task['project'], 'process_time'), |
||
657 | task['track']['process'].get('time')) |
||
658 | self.projects[task['project']]['active_tasks'].appendleft((time.time(), task)) |
||
659 | return ret |
||
660 | |||
661 | def on_task_done(self, task): |
||
662 | '''Called when a task is done and success, called by `on_task_status`''' |
||
663 | task['status'] = self.taskdb.SUCCESS |
||
664 | task['lastcrawltime'] = time.time() |
||
665 | |||
666 | if 'schedule' in task: |
||
667 | if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
||
668 | task['status'] = self.taskdb.ACTIVE |
||
669 | next_exetime = task['schedule'].get('age') |
||
670 | task['schedule']['exetime'] = time.time() + next_exetime |
||
671 | self.put_task(task) |
||
672 | else: |
||
673 | del task['schedule'] |
||
674 | self.update_task(task) |
||
675 | |||
676 | project = task['project'] |
||
677 | self._cnt['5m'].event((project, 'success'), +1) |
||
678 | self._cnt['1h'].event((project, 'success'), +1) |
||
679 | self._cnt['1d'].event((project, 'success'), +1) |
||
680 | self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1) |
||
681 | logger.info('task done %(project)s:%(taskid)s %(url)s', task) |
||
682 | return task |
||
683 | |||
684 | def on_task_failed(self, task): |
||
685 | '''Called when a task is failed, called by `on_task_status`''' |
||
686 | |||
687 | if 'schedule' not in task: |
||
688 | old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule']) |
||
689 | if old_task is None: |
||
690 | logging.error('unknown status pack: %s' % task) |
||
691 | return |
||
692 | task['schedule'] = old_task.get('schedule', {}) |
||
693 | |||
694 | retries = task['schedule'].get('retries', self.default_schedule['retries']) |
||
695 | retried = task['schedule'].get('retried', 0) |
||
696 | |||
697 | project_info = self.projects.get(task['project'], {}) |
||
698 | retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY |
||
699 | next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY[''])) |
||
700 | |||
701 | if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
||
702 | next_exetime = min(next_exetime, task['schedule'].get('age')) |
||
703 | else: |
||
704 | if retried >= retries: |
||
705 | next_exetime = -1 |
||
706 | elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'): |
||
707 | next_exetime = task['schedule'].get('age') |
||
708 | |||
709 | if next_exetime < 0: |
||
710 | task['status'] = self.taskdb.FAILED |
||
711 | task['lastcrawltime'] = time.time() |
||
712 | self.update_task(task) |
||
713 | |||
714 | project = task['project'] |
||
715 | self._cnt['5m'].event((project, 'failed'), +1) |
||
716 | self._cnt['1h'].event((project, 'failed'), +1) |
||
717 | self._cnt['1d'].event((project, 'failed'), +1) |
||
718 | self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1) |
||
719 | logger.info('task failed %(project)s:%(taskid)s %(url)s' % task) |
||
720 | return task |
||
721 | else: |
||
722 | task['schedule']['retried'] = retried + 1 |
||
723 | task['schedule']['exetime'] = time.time() + next_exetime |
||
724 | task['lastcrawltime'] = time.time() |
||
725 | self.update_task(task) |
||
726 | self.put_task(task) |
||
727 | |||
728 | project = task['project'] |
||
729 | self._cnt['5m'].event((project, 'retry'), +1) |
||
730 | self._cnt['1h'].event((project, 'retry'), +1) |
||
731 | self._cnt['1d'].event((project, 'retry'), +1) |
||
732 | # self._cnt['all'].event((project, 'retry'), +1) |
||
733 | logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % ( |
||
734 | retried, retries), task) |
||
735 | return task |
||
736 | |||
737 | def on_select_task(self, task): |
||
738 | '''Called when a task is selected to fetch & process''' |
||
739 | # inject informations about project |
||
740 | logger.info('select %(project)s:%(taskid)s %(url)s', task) |
||
741 | |||
742 | project_info = self.projects.get(task['project']) |
||
743 | assert project_info, 'no such project' |
||
744 | task['group'] = project_info.get('group') |
||
745 | task['project_md5sum'] = project_info.get('md5sum') |
||
746 | task['project_updatetime'] = project_info.get('updatetime', 0) |
||
747 | project_info['active_tasks'].appendleft((time.time(), task)) |
||
748 | self.send_task(task) |
||
749 | return task |
||
750 | |||
915 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.