Total Complexity | 165 |
Total Lines | 740 |
Duplicated Lines | 0 % |
Complex classes like pyspider.scheduler.Scheduler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
25 | class Scheduler(object): |
||
26 | UPDATE_PROJECT_INTERVAL = 5 * 60 |
||
27 | default_schedule = { |
||
28 | 'priority': 0, |
||
29 | 'retries': 3, |
||
30 | 'exetime': 0, |
||
31 | 'age': -1, |
||
32 | 'itag': None, |
||
33 | } |
||
34 | LOOP_LIMIT = 1000 |
||
35 | LOOP_INTERVAL = 0.1 |
||
36 | ACTIVE_TASKS = 100 |
||
37 | INQUEUE_LIMIT = 0 |
||
38 | EXCEPTION_LIMIT = 3 |
||
39 | DELETE_TIME = 24 * 60 * 60 |
||
40 | DEFAULT_RETRY_DELAY = { |
||
41 | 0: 30, |
||
42 | 1: 1*60*60, |
||
43 | 2: 6*60*60, |
||
44 | 3: 12*60*60, |
||
45 | '': 24*60*60 |
||
46 | } |
||
47 | |||
48 | def __init__(self, taskdb, projectdb, newtask_queue, status_queue, |
||
49 | out_queue, data_path='./data', resultdb=None): |
||
50 | self.taskdb = taskdb |
||
51 | self.projectdb = projectdb |
||
52 | self.resultdb = resultdb |
||
53 | self.newtask_queue = newtask_queue |
||
54 | self.status_queue = status_queue |
||
55 | self.out_queue = out_queue |
||
56 | self.data_path = data_path |
||
57 | |||
58 | self._send_buffer = deque() |
||
59 | self._quit = False |
||
60 | self._exceptions = 0 |
||
61 | self.projects = dict() |
||
62 | self._force_update_project = False |
||
63 | self._last_update_project = 0 |
||
64 | self.task_queue = dict() |
||
65 | self._last_tick = int(time.time()) |
||
66 | |||
67 | self._cnt = { |
||
68 | "5m_time": counter.CounterManager( |
||
69 | lambda: counter.TimebaseAverageEventCounter(30, 10)), |
||
70 | "5m": counter.CounterManager( |
||
71 | lambda: counter.TimebaseAverageWindowCounter(30, 10)), |
||
72 | "1h": counter.CounterManager( |
||
73 | lambda: counter.TimebaseAverageWindowCounter(60, 60)), |
||
74 | "1d": counter.CounterManager( |
||
75 | lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)), |
||
76 | "all": counter.CounterManager( |
||
77 | lambda: counter.TotalCounter()), |
||
78 | } |
||
79 | self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h')) |
||
80 | self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d')) |
||
81 | self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all')) |
||
82 | self._last_dump_cnt = 0 |
||
83 | |||
84 | def _update_projects(self): |
||
85 | '''Check project update''' |
||
86 | now = time.time() |
||
87 | if ( |
||
88 | not self._force_update_project |
||
89 | and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now |
||
90 | ): |
||
91 | return |
||
92 | for project in self.projectdb.check_update(self._last_update_project): |
||
93 | self._update_project(project) |
||
94 | logger.debug("project: %s updated.", project['name']) |
||
95 | self._force_update_project = False |
||
96 | self._last_update_project = now |
||
97 | |||
98 | def _update_project(self, project): |
||
99 | '''update one project''' |
||
100 | if project['name'] not in self.projects: |
||
101 | self.projects[project['name']] = {} |
||
102 | self.projects[project['name']].update(project) |
||
103 | self.projects[project['name']]['md5sum'] = utils.md5string(project['script']) |
||
104 | if not self.projects[project['name']].get('active_tasks', None): |
||
105 | self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS) |
||
106 | |||
107 | # load task queue when project is running and delete task_queue when project is stoped |
||
108 | if project['status'] in ('RUNNING', 'DEBUG'): |
||
109 | if project['name'] not in self.task_queue: |
||
110 | self._load_tasks(project['name']) |
||
111 | self.task_queue[project['name']].rate = project['rate'] |
||
112 | self.task_queue[project['name']].burst = project['burst'] |
||
113 | |||
114 | # update project runtime info from processor by sending a _on_get_info |
||
115 | # request, result is in status_page.track.save |
||
116 | self.on_select_task({ |
||
117 | 'taskid': '_on_get_info', |
||
118 | 'project': project['name'], |
||
119 | 'url': 'data:,_on_get_info', |
||
120 | 'status': self.taskdb.SUCCESS, |
||
121 | 'fetch': { |
||
122 | 'save': ['min_tick', 'retry_delay'], |
||
123 | }, |
||
124 | 'process': { |
||
125 | 'callback': '_on_get_info', |
||
126 | }, |
||
127 | }) |
||
128 | else: |
||
129 | if project['name'] in self.task_queue: |
||
130 | self.task_queue[project['name']].rate = 0 |
||
131 | self.task_queue[project['name']].burst = 0 |
||
132 | del self.task_queue[project['name']] |
||
133 | |||
134 | if project not in self._cnt['all']: |
||
135 | self._update_project_cnt(project['name']) |
||
136 | |||
137 | scheduler_task_fields = ['taskid', 'project', 'schedule', ] |
||
138 | |||
139 | def _load_tasks(self, project): |
||
140 | '''load tasks from database''' |
||
141 | self.task_queue[project] = TaskQueue(rate=0, burst=0) |
||
142 | for task in self.taskdb.load_tasks( |
||
143 | self.taskdb.ACTIVE, project, self.scheduler_task_fields |
||
144 | ): |
||
145 | taskid = task['taskid'] |
||
146 | _schedule = task.get('schedule', self.default_schedule) |
||
147 | priority = _schedule.get('priority', self.default_schedule['priority']) |
||
148 | exetime = _schedule.get('exetime', self.default_schedule['exetime']) |
||
149 | self.task_queue[project].put(taskid, priority, exetime) |
||
150 | logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project])) |
||
151 | |||
152 | if self.projects[project]['status'] in ('RUNNING', 'DEBUG'): |
||
153 | self.task_queue[project].rate = self.projects[project]['rate'] |
||
154 | self.task_queue[project].burst = self.projects[project]['burst'] |
||
155 | else: |
||
156 | self.task_queue[project].rate = 0 |
||
157 | self.task_queue[project].burst = 0 |
||
158 | |||
159 | if project not in self._cnt['all']: |
||
160 | self._update_project_cnt(project) |
||
161 | self._cnt['all'].value((project, 'pending'), len(self.task_queue[project])) |
||
162 | |||
163 | def _update_project_cnt(self, project): |
||
164 | status_count = self.taskdb.status_count(project) |
||
165 | self._cnt['all'].value( |
||
166 | (project, 'success'), |
||
167 | status_count.get(self.taskdb.SUCCESS, 0) |
||
168 | ) |
||
169 | self._cnt['all'].value( |
||
170 | (project, 'failed'), |
||
171 | status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0) |
||
172 | ) |
||
173 | self._cnt['all'].value( |
||
174 | (project, 'pending'), |
||
175 | status_count.get(self.taskdb.ACTIVE, 0) |
||
176 | ) |
||
177 | |||
178 | def task_verify(self, task): |
||
179 | ''' |
||
180 | return False if any of 'taskid', 'project', 'url' is not in task dict |
||
181 | or project in not in task_queue |
||
182 | ''' |
||
183 | for each in ('taskid', 'project', 'url', ): |
||
184 | if each not in task or not task[each]: |
||
185 | logger.error('%s not in task: %.200r', each, task) |
||
186 | return False |
||
187 | if task['project'] not in self.task_queue: |
||
188 | logger.error('unknown project: %s', task['project']) |
||
189 | return False |
||
190 | return True |
||
191 | |||
192 | def insert_task(self, task): |
||
193 | '''insert task into database''' |
||
194 | return self.taskdb.insert(task['project'], task['taskid'], task) |
||
195 | |||
196 | def update_task(self, task): |
||
197 | '''update task in database''' |
||
198 | return self.taskdb.update(task['project'], task['taskid'], task) |
||
199 | |||
200 | def put_task(self, task): |
||
201 | '''put task to task queue''' |
||
202 | _schedule = task.get('schedule', self.default_schedule) |
||
203 | self.task_queue[task['project']].put( |
||
204 | task['taskid'], |
||
205 | priority=_schedule.get('priority', self.default_schedule['priority']), |
||
206 | exetime=_schedule.get('exetime', self.default_schedule['exetime']) |
||
207 | ) |
||
208 | |||
209 | def send_task(self, task, force=True): |
||
210 | ''' |
||
211 | dispatch task to fetcher |
||
212 | |||
213 | out queue may have size limit to prevent block, a send_buffer is used |
||
214 | ''' |
||
215 | try: |
||
216 | self.out_queue.put_nowait(task) |
||
217 | except Queue.Full: |
||
218 | if force: |
||
219 | self._send_buffer.appendleft(task) |
||
220 | else: |
||
221 | raise |
||
222 | |||
223 | def _check_task_done(self): |
||
224 | '''Check status queue''' |
||
225 | cnt = 0 |
||
226 | try: |
||
227 | while True: |
||
228 | task = self.status_queue.get_nowait() |
||
229 | # check _on_get_info result here |
||
230 | if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task: |
||
231 | if task['project'] not in self.projects: |
||
232 | continue |
||
233 | self.projects[task['project']].update(task['track'].get('save') or {}) |
||
234 | logger.info( |
||
235 | '%s on_get_info %r', task['project'], task['track'].get('save', {}) |
||
236 | ) |
||
237 | continue |
||
238 | elif not self.task_verify(task): |
||
239 | continue |
||
240 | self.on_task_status(task) |
||
241 | cnt += 1 |
||
242 | except Queue.Empty: |
||
243 | pass |
||
244 | return cnt |
||
245 | |||
246 | merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime'] |
||
247 | |||
248 | def _check_request(self): |
||
249 | '''Check new task queue''' |
||
250 | tasks = {} |
||
251 | while len(tasks) < self.LOOP_LIMIT: |
||
252 | try: |
||
253 | task = self.newtask_queue.get_nowait() |
||
254 | except Queue.Empty: |
||
255 | break |
||
256 | |||
257 | if isinstance(task, list): |
||
258 | _tasks = task |
||
259 | else: |
||
260 | _tasks = (task, ) |
||
261 | |||
262 | for task in _tasks: |
||
263 | if not self.task_verify(task): |
||
264 | continue |
||
265 | |||
266 | if task['taskid'] in self.task_queue[task['project']]: |
||
267 | if not task.get('schedule', {}).get('force_update', False): |
||
268 | logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
||
269 | continue |
||
270 | |||
271 | if task['taskid'] in tasks: |
||
272 | if not task.get('schedule', {}).get('force_update', False): |
||
273 | continue |
||
274 | |||
275 | tasks[task['taskid']] = task |
||
276 | |||
277 | for task in itervalues(tasks): |
||
278 | self.on_request(task) |
||
279 | |||
280 | return len(tasks) |
||
281 | |||
282 | def _check_cronjob(self): |
||
283 | """Check projects cronjob tick, return True when a new tick is sended""" |
||
284 | now = time.time() |
||
285 | self._last_tick = int(self._last_tick) |
||
286 | if now - self._last_tick < 1: |
||
287 | return False |
||
288 | self._last_tick += 1 |
||
289 | for project in itervalues(self.projects): |
||
290 | if project['status'] not in ('DEBUG', 'RUNNING'): |
||
291 | continue |
||
292 | if project.get('min_tick', 0) == 0: |
||
293 | continue |
||
294 | if self._last_tick % int(project['min_tick']) != 0: |
||
295 | continue |
||
296 | self.on_select_task({ |
||
297 | 'taskid': '_on_cronjob', |
||
298 | 'project': project['name'], |
||
299 | 'url': 'data:,_on_cronjob', |
||
300 | 'status': self.taskdb.SUCCESS, |
||
301 | 'fetch': { |
||
302 | 'save': { |
||
303 | 'tick': self._last_tick, |
||
304 | }, |
||
305 | }, |
||
306 | 'process': { |
||
307 | 'callback': '_on_cronjob', |
||
308 | }, |
||
309 | }) |
||
310 | return True |
||
311 | |||
312 | request_task_fields = [ |
||
313 | 'taskid', |
||
314 | 'project', |
||
315 | 'url', |
||
316 | 'status', |
||
317 | 'schedule', |
||
318 | 'fetch', |
||
319 | 'process', |
||
320 | 'track', |
||
321 | 'lastcrawltime' |
||
322 | ] |
||
323 | |||
324 | def _check_select(self): |
||
325 | '''Select task to fetch & process''' |
||
326 | while self._send_buffer: |
||
327 | _task = self._send_buffer.pop() |
||
328 | try: |
||
329 | # use force=False here to prevent automatic send_buffer append and get exception |
||
330 | self.send_task(_task, False) |
||
331 | except Queue.Full: |
||
332 | self._send_buffer.append(_task) |
||
333 | break |
||
334 | |||
335 | if self.out_queue.full(): |
||
336 | return {} |
||
337 | |||
338 | taskids = [] |
||
339 | cnt = 0 |
||
340 | cnt_dict = dict() |
||
341 | limit = self.LOOP_LIMIT |
||
342 | for project, task_queue in iteritems(self.task_queue): |
||
343 | if cnt >= limit: |
||
344 | break |
||
345 | |||
346 | # task queue |
||
347 | self.task_queue[project].check_update() |
||
348 | project_cnt = 0 |
||
349 | |||
350 | # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks |
||
351 | while cnt < limit and project_cnt < limit / 10: |
||
352 | taskid = task_queue.get() |
||
353 | if not taskid: |
||
354 | break |
||
355 | |||
356 | taskids.append((project, taskid)) |
||
357 | project_cnt += 1 |
||
358 | cnt += 1 |
||
359 | cnt_dict[project] = project_cnt |
||
360 | |||
361 | for project, taskid in taskids: |
||
362 | self._load_put_task(project, taskid) |
||
363 | |||
364 | return cnt_dict |
||
365 | |||
366 | def _load_put_task(self, project, taskid): |
||
367 | task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields) |
||
368 | if not task: |
||
369 | return |
||
370 | task = self.on_select_task(task) |
||
371 | |||
372 | def _print_counter_log(self): |
||
373 | # print top 5 active counters |
||
374 | keywords = ('pending', 'success', 'retry', 'failed') |
||
375 | total_cnt = {} |
||
376 | project_actives = [] |
||
377 | project_fails = [] |
||
378 | for key in keywords: |
||
379 | total_cnt[key] = 0 |
||
380 | for project, subcounter in iteritems(self._cnt['5m']): |
||
381 | actives = 0 |
||
382 | for key in keywords: |
||
383 | cnt = subcounter.get(key, None) |
||
384 | if cnt: |
||
385 | cnt = cnt.sum |
||
386 | total_cnt[key] += cnt |
||
387 | actives += cnt |
||
388 | |||
389 | project_actives.append((actives, project)) |
||
390 | |||
391 | fails = subcounter.get('failed', None) |
||
392 | if fails: |
||
393 | project_fails.append((fails.sum, project)) |
||
394 | |||
395 | top_2_fails = sorted(project_fails, reverse=True)[:2] |
||
396 | top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails], |
||
397 | reverse=True)[:5 - len(top_2_fails)] |
||
398 | |||
399 | log_str = ("in 5m: new:%(pending)d,success:%(success)d," |
||
400 | "retry:%(retry)d,failed:%(failed)d" % total_cnt) |
||
401 | for _, project in itertools.chain(top_3_actives, top_2_fails): |
||
402 | subcounter = self._cnt['5m'][project].to_dict(get_value='sum') |
||
403 | log_str += " %s:%d,%d,%d,%d" % (project, |
||
404 | subcounter.get('pending', 0), |
||
405 | subcounter.get('success', 0), |
||
406 | subcounter.get('retry', 0), |
||
407 | subcounter.get('failed', 0)) |
||
408 | logger.info(log_str) |
||
409 | |||
410 | def _dump_cnt(self): |
||
411 | '''Dump counters to file''' |
||
412 | self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h')) |
||
413 | self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d')) |
||
414 | self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all')) |
||
415 | |||
416 | def _try_dump_cnt(self): |
||
417 | '''Dump counters every 60 seconds''' |
||
418 | now = time.time() |
||
419 | if now - self._last_dump_cnt > 60: |
||
420 | self._last_dump_cnt = now |
||
421 | self._dump_cnt() |
||
422 | self._print_counter_log() |
||
423 | |||
424 | def _check_delete(self): |
||
425 | '''Check project delete''' |
||
426 | now = time.time() |
||
427 | for project in list(itervalues(self.projects)): |
||
428 | if project['status'] != 'STOP': |
||
429 | continue |
||
430 | if now - project['updatetime'] < self.DELETE_TIME: |
||
431 | continue |
||
432 | if 'delete' not in self.projectdb.split_group(project['group']): |
||
433 | continue |
||
434 | |||
435 | logger.warning("deleting project: %s!", project['name']) |
||
436 | if project['name'] in self.task_queue: |
||
437 | self.task_queue[project['name']].rate = 0 |
||
438 | self.task_queue[project['name']].burst = 0 |
||
439 | del self.task_queue[project['name']] |
||
440 | del self.projects[project['name']] |
||
441 | self.taskdb.drop(project['name']) |
||
442 | self.projectdb.drop(project['name']) |
||
443 | if self.resultdb: |
||
444 | self.resultdb.drop(project['name']) |
||
445 | for each in self._cnt.values(): |
||
446 | del each[project['name']] |
||
447 | |||
448 | def __len__(self): |
||
449 | return sum(len(x) for x in itervalues(self.task_queue)) |
||
450 | |||
451 | def quit(self): |
||
452 | '''Set quit signal''' |
||
453 | self._quit = True |
||
454 | # stop xmlrpc server |
||
455 | if hasattr(self, 'xmlrpc_server'): |
||
456 | self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop) |
||
457 | self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop) |
||
458 | |||
459 | def run_once(self): |
||
460 | '''comsume queues and feed tasks to fetcher, once''' |
||
461 | |||
462 | self._update_projects() |
||
463 | self._check_task_done() |
||
464 | self._check_request() |
||
465 | while self._check_cronjob(): |
||
466 | pass |
||
467 | self._check_select() |
||
468 | self._check_delete() |
||
469 | self._try_dump_cnt() |
||
470 | |||
471 | def run(self): |
||
472 | '''Start scheduler loop''' |
||
473 | logger.info("loading projects") |
||
474 | |||
475 | while not self._quit: |
||
476 | try: |
||
477 | time.sleep(self.LOOP_INTERVAL) |
||
478 | self.run_once() |
||
479 | self._exceptions = 0 |
||
480 | except KeyboardInterrupt: |
||
481 | break |
||
482 | except Exception as e: |
||
483 | logger.exception(e) |
||
484 | self._exceptions += 1 |
||
485 | if self._exceptions > self.EXCEPTION_LIMIT: |
||
486 | break |
||
487 | continue |
||
488 | |||
489 | logger.info("scheduler exiting...") |
||
490 | self._dump_cnt() |
||
491 | |||
492 | def trigger_on_start(self, project): |
||
493 | '''trigger an on_start callback of project''' |
||
494 | self.newtask_queue.put({ |
||
495 | "project": project, |
||
496 | "taskid": "on_start", |
||
497 | "url": "data:,on_start", |
||
498 | "process": { |
||
499 | "callback": "on_start", |
||
500 | }, |
||
501 | }) |
||
502 | |||
503 | def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False): |
||
504 | '''Start xmlrpc interface''' |
||
505 | from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication |
||
506 | |||
507 | application = WSGIXMLRPCApplication() |
||
508 | |||
509 | application.register_function(self.quit, '_quit') |
||
510 | application.register_function(self.__len__, 'size') |
||
511 | |||
512 | def dump_counter(_time, _type): |
||
513 | try: |
||
514 | return self._cnt[_time].to_dict(_type) |
||
515 | except: |
||
516 | logger.exception('') |
||
517 | application.register_function(dump_counter, 'counter') |
||
518 | |||
519 | def new_task(task): |
||
520 | if self.task_verify(task): |
||
521 | self.newtask_queue.put(task) |
||
522 | return True |
||
523 | return False |
||
524 | application.register_function(new_task, 'newtask') |
||
525 | |||
526 | def send_task(task): |
||
527 | '''dispatch task to fetcher''' |
||
528 | self.send_task(task) |
||
529 | return True |
||
530 | application.register_function(send_task, 'send_task') |
||
531 | |||
532 | def update_project(): |
||
533 | self._force_update_project = True |
||
534 | application.register_function(update_project, 'update_project') |
||
535 | |||
536 | def get_active_tasks(project=None, limit=100): |
||
537 | allowed_keys = set(( |
||
538 | 'taskid', |
||
539 | 'project', |
||
540 | 'status', |
||
541 | 'url', |
||
542 | 'lastcrawltime', |
||
543 | 'updatetime', |
||
544 | 'track', |
||
545 | )) |
||
546 | track_allowed_keys = set(( |
||
547 | 'ok', |
||
548 | 'time', |
||
549 | 'follows', |
||
550 | 'status_code', |
||
551 | )) |
||
552 | |||
553 | iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects) |
||
554 | if x and (k == project if project else True)] |
||
555 | tasks = [next(x, None) for x in iters] |
||
556 | result = [] |
||
557 | |||
558 | while len(result) < limit and tasks and not all(x is None for x in tasks): |
||
559 | updatetime, task = t = max(t for t in tasks if t) |
||
560 | i = tasks.index(t) |
||
561 | tasks[i] = next(iters[i], None) |
||
562 | for key in list(task): |
||
563 | if key == 'track': |
||
564 | for k in list(task[key].get('fetch', [])): |
||
565 | if k not in track_allowed_keys: |
||
566 | del task[key]['fetch'][k] |
||
567 | for k in list(task[key].get('process', [])): |
||
568 | if k not in track_allowed_keys: |
||
569 | del task[key]['process'][k] |
||
570 | if key in allowed_keys: |
||
571 | continue |
||
572 | del task[key] |
||
573 | result.append(t) |
||
574 | # fix for "<type 'exceptions.TypeError'>:dictionary key must be string" |
||
575 | # have no idea why |
||
576 | return json.loads(json.dumps(result)) |
||
577 | application.register_function(get_active_tasks, 'get_active_tasks') |
||
578 | |||
579 | import tornado.wsgi |
||
580 | import tornado.ioloop |
||
581 | import tornado.httpserver |
||
582 | |||
583 | container = tornado.wsgi.WSGIContainer(application) |
||
584 | self.xmlrpc_ioloop = tornado.ioloop.IOLoop() |
||
585 | self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop) |
||
586 | self.xmlrpc_server.listen(port=port, address=bind) |
||
587 | self.xmlrpc_ioloop.start() |
||
588 | |||
589 | def on_request(self, task): |
||
590 | if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT: |
||
591 | logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task) |
||
592 | return |
||
593 | |||
594 | oldtask = self.taskdb.get_task(task['project'], task['taskid'], |
||
595 | fields=self.merge_task_fields) |
||
596 | if oldtask: |
||
597 | return self.on_old_request(task, oldtask) |
||
598 | else: |
||
599 | return self.on_new_request(task) |
||
600 | |||
601 | def on_new_request(self, task): |
||
602 | '''Called when a new request is arrived''' |
||
603 | task['status'] = self.taskdb.ACTIVE |
||
604 | self.insert_task(task) |
||
605 | self.put_task(task) |
||
606 | |||
607 | project = task['project'] |
||
608 | self._cnt['5m'].event((project, 'pending'), +1) |
||
609 | self._cnt['1h'].event((project, 'pending'), +1) |
||
610 | self._cnt['1d'].event((project, 'pending'), +1) |
||
611 | self._cnt['all'].event((project, 'pending'), +1) |
||
612 | logger.info('new task %(project)s:%(taskid)s %(url)s', task) |
||
613 | return task |
||
614 | |||
615 | def on_old_request(self, task, old_task): |
||
616 | '''Called when a crawled task is arrived''' |
||
617 | now = time.time() |
||
618 | |||
619 | _schedule = task.get('schedule', self.default_schedule) |
||
620 | old_schedule = old_task.get('schedule', {}) |
||
621 | |||
622 | restart = False |
||
623 | schedule_age = _schedule.get('age', self.default_schedule['age']) |
||
624 | if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'): |
||
625 | restart = True |
||
626 | elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now: |
||
627 | restart = True |
||
628 | elif _schedule.get('force_update'): |
||
629 | restart = True |
||
630 | |||
631 | if not restart: |
||
632 | logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
||
633 | return |
||
634 | |||
635 | task['status'] = self.taskdb.ACTIVE |
||
636 | self.update_task(task) |
||
637 | self.put_task(task) |
||
638 | |||
639 | project = task['project'] |
||
640 | if old_task['status'] != self.taskdb.ACTIVE: |
||
641 | self._cnt['5m'].event((project, 'pending'), +1) |
||
642 | self._cnt['1h'].event((project, 'pending'), +1) |
||
643 | self._cnt['1d'].event((project, 'pending'), +1) |
||
644 | if old_task['status'] == self.taskdb.SUCCESS: |
||
645 | self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1) |
||
646 | elif old_task['status'] == self.taskdb.FAILED: |
||
647 | self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1) |
||
648 | logger.info('restart task %(project)s:%(taskid)s %(url)s', task) |
||
649 | return task |
||
650 | |||
651 | def on_task_status(self, task): |
||
652 | '''Called when a status pack is arrived''' |
||
653 | try: |
||
654 | procesok = task['track']['process']['ok'] |
||
655 | if not self.task_queue[task['project']].done(task['taskid']): |
||
656 | logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task) |
||
657 | return None |
||
658 | except KeyError as e: |
||
659 | logger.error("Bad status pack: %s", e) |
||
660 | return None |
||
661 | |||
662 | if procesok: |
||
663 | ret = self.on_task_done(task) |
||
664 | else: |
||
665 | ret = self.on_task_failed(task) |
||
666 | |||
667 | if task['track']['fetch'].get('time'): |
||
668 | self._cnt['5m_time'].event((task['project'], 'fetch_time'), |
||
669 | task['track']['fetch']['time']) |
||
670 | if task['track']['process'].get('time'): |
||
671 | self._cnt['5m_time'].event((task['project'], 'process_time'), |
||
672 | task['track']['process'].get('time')) |
||
673 | self.projects[task['project']]['active_tasks'].appendleft((time.time(), task)) |
||
674 | return ret |
||
675 | |||
676 | def on_task_done(self, task): |
||
677 | '''Called when a task is done and success, called by `on_task_status`''' |
||
678 | task['status'] = self.taskdb.SUCCESS |
||
679 | task['lastcrawltime'] = time.time() |
||
680 | |||
681 | if 'schedule' in task: |
||
682 | if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
||
683 | task['status'] = self.taskdb.ACTIVE |
||
684 | next_exetime = task['schedule'].get('age') |
||
685 | task['schedule']['exetime'] = time.time() + next_exetime |
||
686 | self.put_task(task) |
||
687 | else: |
||
688 | del task['schedule'] |
||
689 | self.update_task(task) |
||
690 | |||
691 | project = task['project'] |
||
692 | self._cnt['5m'].event((project, 'success'), +1) |
||
693 | self._cnt['1h'].event((project, 'success'), +1) |
||
694 | self._cnt['1d'].event((project, 'success'), +1) |
||
695 | self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1) |
||
696 | logger.info('task done %(project)s:%(taskid)s %(url)s', task) |
||
697 | return task |
||
698 | |||
699 | def on_task_failed(self, task): |
||
700 | '''Called when a task is failed, called by `on_task_status`''' |
||
701 | |||
702 | if 'schedule' not in task: |
||
703 | old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule']) |
||
704 | if old_task is None: |
||
705 | logging.error('unknown status pack: %s' % task) |
||
706 | return |
||
707 | task['schedule'] = old_task.get('schedule', {}) |
||
708 | |||
709 | retries = task['schedule'].get('retries', self.default_schedule['retries']) |
||
710 | retried = task['schedule'].get('retried', 0) |
||
711 | |||
712 | project_info = self.projects.get(task['project'], {}) |
||
713 | retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY |
||
714 | next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY[''])) |
||
715 | |||
716 | if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
||
717 | next_exetime = min(next_exetime, task['schedule'].get('age')) |
||
718 | else: |
||
719 | if retried >= retries: |
||
720 | next_exetime = -1 |
||
721 | elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'): |
||
722 | next_exetime = task['schedule'].get('age') |
||
723 | |||
724 | if next_exetime < 0: |
||
725 | task['status'] = self.taskdb.FAILED |
||
726 | task['lastcrawltime'] = time.time() |
||
727 | self.update_task(task) |
||
728 | |||
729 | project = task['project'] |
||
730 | self._cnt['5m'].event((project, 'failed'), +1) |
||
731 | self._cnt['1h'].event((project, 'failed'), +1) |
||
732 | self._cnt['1d'].event((project, 'failed'), +1) |
||
733 | self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1) |
||
734 | logger.info('task failed %(project)s:%(taskid)s %(url)s' % task) |
||
735 | return task |
||
736 | else: |
||
737 | task['schedule']['retried'] = retried + 1 |
||
738 | task['schedule']['exetime'] = time.time() + next_exetime |
||
739 | task['lastcrawltime'] = time.time() |
||
740 | self.update_task(task) |
||
741 | self.put_task(task) |
||
742 | |||
743 | project = task['project'] |
||
744 | self._cnt['5m'].event((project, 'retry'), +1) |
||
745 | self._cnt['1h'].event((project, 'retry'), +1) |
||
746 | self._cnt['1d'].event((project, 'retry'), +1) |
||
747 | # self._cnt['all'].event((project, 'retry'), +1) |
||
748 | logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % ( |
||
749 | retried, retries), task) |
||
750 | return task |
||
751 | |||
752 | def on_select_task(self, task): |
||
753 | '''Called when a task is selected to fetch & process''' |
||
754 | # inject informations about project |
||
755 | logger.info('select %(project)s:%(taskid)s %(url)s', task) |
||
756 | |||
757 | project_info = self.projects.get(task['project']) |
||
758 | assert project_info, 'no such project' |
||
759 | task['group'] = project_info.get('group') |
||
760 | task['project_md5sum'] = project_info.get('md5sum') |
||
761 | task['project_updatetime'] = project_info.get('updatetime', 0) |
||
762 | project_info['active_tasks'].appendleft((time.time(), task)) |
||
763 | self.send_task(task) |
||
764 | return task |
||
765 | |||
1047 |