| Total Complexity | 168 |
| Total Lines | 756 |
| Duplicated Lines | 0 % |
Complex classes like pyspider.scheduler.Scheduler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | #!/usr/bin/env python |
||
| 25 | class Scheduler(object): |
||
| 26 | UPDATE_PROJECT_INTERVAL = 5 * 60 |
||
| 27 | default_schedule = { |
||
| 28 | 'priority': 0, |
||
| 29 | 'retries': 3, |
||
| 30 | 'exetime': 0, |
||
| 31 | 'age': -1, |
||
| 32 | 'itag': None, |
||
| 33 | } |
||
| 34 | LOOP_LIMIT = 1000 |
||
| 35 | LOOP_INTERVAL = 0.1 |
||
| 36 | ACTIVE_TASKS = 100 |
||
| 37 | INQUEUE_LIMIT = 0 |
||
| 38 | EXCEPTION_LIMIT = 3 |
||
| 39 | DELETE_TIME = 24 * 60 * 60 |
||
| 40 | DEFAULT_RETRY_DELAY = { |
||
| 41 | 0: 30, |
||
| 42 | 1: 1*60*60, |
||
| 43 | 2: 6*60*60, |
||
| 44 | 3: 12*60*60, |
||
| 45 | '': 24*60*60 |
||
| 46 | } |
||
| 47 | |||
| 48 | def __init__(self, taskdb, projectdb, newtask_queue, status_queue, |
||
| 49 | out_queue, data_path='./data', resultdb=None): |
||
| 50 | self.taskdb = taskdb |
||
| 51 | self.projectdb = projectdb |
||
| 52 | self.resultdb = resultdb |
||
| 53 | self.newtask_queue = newtask_queue |
||
| 54 | self.status_queue = status_queue |
||
| 55 | self.out_queue = out_queue |
||
| 56 | self.data_path = data_path |
||
| 57 | |||
| 58 | self._send_buffer = deque() |
||
| 59 | self._quit = False |
||
| 60 | self._exceptions = 0 |
||
| 61 | self.projects = dict() |
||
| 62 | self._force_update_project = False |
||
| 63 | self._last_update_project = 0 |
||
| 64 | self.task_queue = dict() |
||
| 65 | self._last_tick = int(time.time()) |
||
| 66 | self._sent_finished_event = dict() |
||
| 67 | |||
| 68 | self._cnt = { |
||
| 69 | "5m_time": counter.CounterManager( |
||
| 70 | lambda: counter.TimebaseAverageEventCounter(30, 10)), |
||
| 71 | "5m": counter.CounterManager( |
||
| 72 | lambda: counter.TimebaseAverageWindowCounter(30, 10)), |
||
| 73 | "1h": counter.CounterManager( |
||
| 74 | lambda: counter.TimebaseAverageWindowCounter(60, 60)), |
||
| 75 | "1d": counter.CounterManager( |
||
| 76 | lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)), |
||
| 77 | "all": counter.CounterManager( |
||
| 78 | lambda: counter.TotalCounter()), |
||
| 79 | } |
||
| 80 | self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h')) |
||
| 81 | self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d')) |
||
| 82 | self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all')) |
||
| 83 | self._last_dump_cnt = 0 |
||
| 84 | |||
| 85 | def _update_projects(self): |
||
| 86 | '''Check project update''' |
||
| 87 | now = time.time() |
||
| 88 | if ( |
||
| 89 | not self._force_update_project |
||
| 90 | and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now |
||
| 91 | ): |
||
| 92 | return |
||
| 93 | for project in self.projectdb.check_update(self._last_update_project): |
||
| 94 | self._update_project(project) |
||
| 95 | logger.debug("project: %s updated.", project['name']) |
||
| 96 | self._force_update_project = False |
||
| 97 | self._last_update_project = now |
||
| 98 | |||
| 99 | def _update_project(self, project): |
||
| 100 | '''update one project''' |
||
| 101 | if project['name'] not in self.projects: |
||
| 102 | self.projects[project['name']] = {} |
||
| 103 | self.projects[project['name']].update(project) |
||
| 104 | self.projects[project['name']]['md5sum'] = utils.md5string(project['script']) |
||
| 105 | if not self.projects[project['name']].get('active_tasks', None): |
||
| 106 | self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS) |
||
| 107 | |||
| 108 | # load task queue when project is running and delete task_queue when project is stoped |
||
| 109 | if project['status'] in ('RUNNING', 'DEBUG'): |
||
| 110 | if project['name'] not in self.task_queue: |
||
| 111 | self._load_tasks(project['name']) |
||
| 112 | self.task_queue[project['name']].rate = project['rate'] |
||
| 113 | self.task_queue[project['name']].burst = project['burst'] |
||
| 114 | |||
| 115 | # update project runtime info from processor by sending a _on_get_info |
||
| 116 | # request, result is in status_page.track.save |
||
| 117 | self.on_select_task({ |
||
| 118 | 'taskid': '_on_get_info', |
||
| 119 | 'project': project['name'], |
||
| 120 | 'url': 'data:,_on_get_info', |
||
| 121 | 'status': self.taskdb.SUCCESS, |
||
| 122 | 'fetch': { |
||
| 123 | 'save': ['min_tick', 'retry_delay'], |
||
| 124 | }, |
||
| 125 | 'process': { |
||
| 126 | 'callback': '_on_get_info', |
||
| 127 | }, |
||
| 128 | }) |
||
| 129 | else: |
||
| 130 | if project['name'] in self.task_queue: |
||
| 131 | self.task_queue[project['name']].rate = 0 |
||
| 132 | self.task_queue[project['name']].burst = 0 |
||
| 133 | del self.task_queue[project['name']] |
||
| 134 | |||
| 135 | if project not in self._cnt['all']: |
||
| 136 | self._update_project_cnt(project['name']) |
||
| 137 | |||
| 138 | scheduler_task_fields = ['taskid', 'project', 'schedule', ] |
||
| 139 | |||
| 140 | def _load_tasks(self, project): |
||
| 141 | '''load tasks from database''' |
||
| 142 | self.task_queue[project] = TaskQueue(rate=0, burst=0) |
||
| 143 | for task in self.taskdb.load_tasks( |
||
| 144 | self.taskdb.ACTIVE, project, self.scheduler_task_fields |
||
| 145 | ): |
||
| 146 | taskid = task['taskid'] |
||
| 147 | _schedule = task.get('schedule', self.default_schedule) |
||
| 148 | priority = _schedule.get('priority', self.default_schedule['priority']) |
||
| 149 | exetime = _schedule.get('exetime', self.default_schedule['exetime']) |
||
| 150 | self.task_queue[project].put(taskid, priority, exetime) |
||
| 151 | logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project])) |
||
| 152 | |||
| 153 | if self.projects[project]['status'] in ('RUNNING', 'DEBUG'): |
||
| 154 | self.task_queue[project].rate = self.projects[project]['rate'] |
||
| 155 | self.task_queue[project].burst = self.projects[project]['burst'] |
||
| 156 | else: |
||
| 157 | self.task_queue[project].rate = 0 |
||
| 158 | self.task_queue[project].burst = 0 |
||
| 159 | |||
| 160 | if project not in self._cnt['all']: |
||
| 161 | self._update_project_cnt(project) |
||
| 162 | self._cnt['all'].value((project, 'pending'), len(self.task_queue[project])) |
||
| 163 | |||
| 164 | def _update_project_cnt(self, project): |
||
| 165 | status_count = self.taskdb.status_count(project) |
||
| 166 | self._cnt['all'].value( |
||
| 167 | (project, 'success'), |
||
| 168 | status_count.get(self.taskdb.SUCCESS, 0) |
||
| 169 | ) |
||
| 170 | self._cnt['all'].value( |
||
| 171 | (project, 'failed'), |
||
| 172 | status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0) |
||
| 173 | ) |
||
| 174 | self._cnt['all'].value( |
||
| 175 | (project, 'pending'), |
||
| 176 | status_count.get(self.taskdb.ACTIVE, 0) |
||
| 177 | ) |
||
| 178 | |||
| 179 | def task_verify(self, task): |
||
| 180 | ''' |
||
| 181 | return False if any of 'taskid', 'project', 'url' is not in task dict |
||
| 182 | or project in not in task_queue |
||
| 183 | ''' |
||
| 184 | for each in ('taskid', 'project', 'url', ): |
||
| 185 | if each not in task or not task[each]: |
||
| 186 | logger.error('%s not in task: %.200r', each, task) |
||
| 187 | return False |
||
| 188 | if task['project'] not in self.task_queue: |
||
| 189 | logger.error('unknown project: %s', task['project']) |
||
| 190 | return False |
||
| 191 | return True |
||
| 192 | |||
| 193 | def insert_task(self, task): |
||
| 194 | '''insert task into database''' |
||
| 195 | return self.taskdb.insert(task['project'], task['taskid'], task) |
||
| 196 | |||
| 197 | def update_task(self, task): |
||
| 198 | '''update task in database''' |
||
| 199 | return self.taskdb.update(task['project'], task['taskid'], task) |
||
| 200 | |||
| 201 | def put_task(self, task): |
||
| 202 | '''put task to task queue''' |
||
| 203 | _schedule = task.get('schedule', self.default_schedule) |
||
| 204 | self.task_queue[task['project']].put( |
||
| 205 | task['taskid'], |
||
| 206 | priority=_schedule.get('priority', self.default_schedule['priority']), |
||
| 207 | exetime=_schedule.get('exetime', self.default_schedule['exetime']) |
||
| 208 | ) |
||
| 209 | |||
| 210 | def send_task(self, task, force=True): |
||
| 211 | ''' |
||
| 212 | dispatch task to fetcher |
||
| 213 | |||
| 214 | out queue may have size limit to prevent block, a send_buffer is used |
||
| 215 | ''' |
||
| 216 | try: |
||
| 217 | self.out_queue.put_nowait(task) |
||
| 218 | except Queue.Full: |
||
| 219 | if force: |
||
| 220 | self._send_buffer.appendleft(task) |
||
| 221 | else: |
||
| 222 | raise |
||
| 223 | |||
| 224 | def _check_task_done(self): |
||
| 225 | '''Check status queue''' |
||
| 226 | cnt = 0 |
||
| 227 | try: |
||
| 228 | while True: |
||
| 229 | task = self.status_queue.get_nowait() |
||
| 230 | # check _on_get_info result here |
||
| 231 | if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task: |
||
| 232 | if task['project'] not in self.projects: |
||
| 233 | continue |
||
| 234 | self.projects[task['project']].update(task['track'].get('save') or {}) |
||
| 235 | logger.info( |
||
| 236 | '%s on_get_info %r', task['project'], task['track'].get('save', {}) |
||
| 237 | ) |
||
| 238 | continue |
||
| 239 | elif not self.task_verify(task): |
||
| 240 | continue |
||
| 241 | self.on_task_status(task) |
||
| 242 | cnt += 1 |
||
| 243 | except Queue.Empty: |
||
| 244 | pass |
||
| 245 | return cnt |
||
| 246 | |||
| 247 | merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime'] |
||
| 248 | |||
| 249 | def _check_request(self): |
||
| 250 | '''Check new task queue''' |
||
| 251 | tasks = {} |
||
| 252 | while len(tasks) < self.LOOP_LIMIT: |
||
| 253 | try: |
||
| 254 | task = self.newtask_queue.get_nowait() |
||
| 255 | except Queue.Empty: |
||
| 256 | break |
||
| 257 | |||
| 258 | if isinstance(task, list): |
||
| 259 | _tasks = task |
||
| 260 | else: |
||
| 261 | _tasks = (task, ) |
||
| 262 | |||
| 263 | for task in _tasks: |
||
| 264 | if not self.task_verify(task): |
||
| 265 | continue |
||
| 266 | |||
| 267 | if task['taskid'] in self.task_queue[task['project']]: |
||
| 268 | if not task.get('schedule', {}).get('force_update', False): |
||
| 269 | logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
||
| 270 | continue |
||
| 271 | |||
| 272 | if task['taskid'] in tasks: |
||
| 273 | if not task.get('schedule', {}).get('force_update', False): |
||
| 274 | continue |
||
| 275 | |||
| 276 | tasks[task['taskid']] = task |
||
| 277 | |||
| 278 | for task in itervalues(tasks): |
||
| 279 | self.on_request(task) |
||
| 280 | |||
| 281 | return len(tasks) |
||
| 282 | |||
| 283 | def _check_cronjob(self): |
||
| 284 | """Check projects cronjob tick, return True when a new tick is sended""" |
||
| 285 | now = time.time() |
||
| 286 | self._last_tick = int(self._last_tick) |
||
| 287 | if now - self._last_tick < 1: |
||
| 288 | return False |
||
| 289 | self._last_tick += 1 |
||
| 290 | for project in itervalues(self.projects): |
||
| 291 | if project['status'] not in ('DEBUG', 'RUNNING'): |
||
| 292 | continue |
||
| 293 | if project.get('min_tick', 0) == 0: |
||
| 294 | continue |
||
| 295 | if self._last_tick % int(project['min_tick']) != 0: |
||
| 296 | continue |
||
| 297 | self.on_select_task({ |
||
| 298 | 'taskid': '_on_cronjob', |
||
| 299 | 'project': project['name'], |
||
| 300 | 'url': 'data:,_on_cronjob', |
||
| 301 | 'status': self.taskdb.SUCCESS, |
||
| 302 | 'fetch': { |
||
| 303 | 'save': { |
||
| 304 | 'tick': self._last_tick, |
||
| 305 | }, |
||
| 306 | }, |
||
| 307 | 'process': { |
||
| 308 | 'callback': '_on_cronjob', |
||
| 309 | }, |
||
| 310 | }) |
||
| 311 | return True |
||
| 312 | |||
| 313 | request_task_fields = [ |
||
| 314 | 'taskid', |
||
| 315 | 'project', |
||
| 316 | 'url', |
||
| 317 | 'status', |
||
| 318 | 'schedule', |
||
| 319 | 'fetch', |
||
| 320 | 'process', |
||
| 321 | 'track', |
||
| 322 | 'lastcrawltime' |
||
| 323 | ] |
||
| 324 | |||
| 325 | def _check_select(self): |
||
| 326 | '''Select task to fetch & process''' |
||
| 327 | while self._send_buffer: |
||
| 328 | _task = self._send_buffer.pop() |
||
| 329 | try: |
||
| 330 | # use force=False here to prevent automatic send_buffer append and get exception |
||
| 331 | self.send_task(_task, False) |
||
| 332 | except Queue.Full: |
||
| 333 | self._send_buffer.append(_task) |
||
| 334 | break |
||
| 335 | |||
| 336 | if self.out_queue.full(): |
||
| 337 | return {} |
||
| 338 | |||
| 339 | taskids = [] |
||
| 340 | cnt = 0 |
||
| 341 | cnt_dict = dict() |
||
| 342 | limit = self.LOOP_LIMIT |
||
| 343 | for project, task_queue in iteritems(self.task_queue): |
||
| 344 | if cnt >= limit: |
||
| 345 | break |
||
| 346 | |||
| 347 | # task queue |
||
| 348 | self.task_queue[project].check_update() |
||
| 349 | project_cnt = 0 |
||
| 350 | |||
| 351 | # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks |
||
| 352 | while cnt < limit and project_cnt < limit / 10: |
||
| 353 | taskid = task_queue.get() |
||
| 354 | if not taskid: |
||
| 355 | break |
||
| 356 | |||
| 357 | taskids.append((project, taskid)) |
||
| 358 | project_cnt += 1 |
||
| 359 | cnt += 1 |
||
| 360 | |||
| 361 | cnt_dict[project] = project_cnt |
||
| 362 | if project_cnt: |
||
| 363 | self._sent_finished_event[project] = 'need' |
||
| 364 | # check and send finished event to project |
||
| 365 | elif len(task_queue) == 0 and self._sent_finished_event.get(project) == 'need': |
||
| 366 | self._sent_finished_event[project] = 'sent' |
||
| 367 | self.on_select_task({ |
||
| 368 | 'taskid': 'on_finished', |
||
| 369 | 'project': project, |
||
| 370 | 'url': 'data:,on_finished', |
||
| 371 | 'status': self.taskdb.SUCCESS, |
||
| 372 | 'process': { |
||
| 373 | 'callback': 'on_finished', |
||
| 374 | }, |
||
| 375 | }) |
||
| 376 | |||
| 377 | for project, taskid in taskids: |
||
| 378 | self._load_put_task(project, taskid) |
||
| 379 | |||
| 380 | return cnt_dict |
||
| 381 | |||
| 382 | def _load_put_task(self, project, taskid): |
||
| 383 | task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields) |
||
| 384 | if not task: |
||
| 385 | return |
||
| 386 | task = self.on_select_task(task) |
||
| 387 | |||
| 388 | def _print_counter_log(self): |
||
| 389 | # print top 5 active counters |
||
| 390 | keywords = ('pending', 'success', 'retry', 'failed') |
||
| 391 | total_cnt = {} |
||
| 392 | project_actives = [] |
||
| 393 | project_fails = [] |
||
| 394 | for key in keywords: |
||
| 395 | total_cnt[key] = 0 |
||
| 396 | for project, subcounter in iteritems(self._cnt['5m']): |
||
| 397 | actives = 0 |
||
| 398 | for key in keywords: |
||
| 399 | cnt = subcounter.get(key, None) |
||
| 400 | if cnt: |
||
| 401 | cnt = cnt.sum |
||
| 402 | total_cnt[key] += cnt |
||
| 403 | actives += cnt |
||
| 404 | |||
| 405 | project_actives.append((actives, project)) |
||
| 406 | |||
| 407 | fails = subcounter.get('failed', None) |
||
| 408 | if fails: |
||
| 409 | project_fails.append((fails.sum, project)) |
||
| 410 | |||
| 411 | top_2_fails = sorted(project_fails, reverse=True)[:2] |
||
| 412 | top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails], |
||
| 413 | reverse=True)[:5 - len(top_2_fails)] |
||
| 414 | |||
| 415 | log_str = ("in 5m: new:%(pending)d,success:%(success)d," |
||
| 416 | "retry:%(retry)d,failed:%(failed)d" % total_cnt) |
||
| 417 | for _, project in itertools.chain(top_3_actives, top_2_fails): |
||
| 418 | subcounter = self._cnt['5m'][project].to_dict(get_value='sum') |
||
| 419 | log_str += " %s:%d,%d,%d,%d" % (project, |
||
| 420 | subcounter.get('pending', 0), |
||
| 421 | subcounter.get('success', 0), |
||
| 422 | subcounter.get('retry', 0), |
||
| 423 | subcounter.get('failed', 0)) |
||
| 424 | logger.info(log_str) |
||
| 425 | |||
| 426 | def _dump_cnt(self): |
||
| 427 | '''Dump counters to file''' |
||
| 428 | self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h')) |
||
| 429 | self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d')) |
||
| 430 | self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all')) |
||
| 431 | |||
| 432 | def _try_dump_cnt(self): |
||
| 433 | '''Dump counters every 60 seconds''' |
||
| 434 | now = time.time() |
||
| 435 | if now - self._last_dump_cnt > 60: |
||
| 436 | self._last_dump_cnt = now |
||
| 437 | self._dump_cnt() |
||
| 438 | self._print_counter_log() |
||
| 439 | |||
| 440 | def _check_delete(self): |
||
| 441 | '''Check project delete''' |
||
| 442 | now = time.time() |
||
| 443 | for project in list(itervalues(self.projects)): |
||
| 444 | if project['status'] != 'STOP': |
||
| 445 | continue |
||
| 446 | if now - project['updatetime'] < self.DELETE_TIME: |
||
| 447 | continue |
||
| 448 | if 'delete' not in self.projectdb.split_group(project['group']): |
||
| 449 | continue |
||
| 450 | |||
| 451 | logger.warning("deleting project: %s!", project['name']) |
||
| 452 | if project['name'] in self.task_queue: |
||
| 453 | self.task_queue[project['name']].rate = 0 |
||
| 454 | self.task_queue[project['name']].burst = 0 |
||
| 455 | del self.task_queue[project['name']] |
||
| 456 | del self.projects[project['name']] |
||
| 457 | self.taskdb.drop(project['name']) |
||
| 458 | self.projectdb.drop(project['name']) |
||
| 459 | if self.resultdb: |
||
| 460 | self.resultdb.drop(project['name']) |
||
| 461 | for each in self._cnt.values(): |
||
| 462 | del each[project['name']] |
||
| 463 | |||
| 464 | def __len__(self): |
||
| 465 | return sum(len(x) for x in itervalues(self.task_queue)) |
||
| 466 | |||
| 467 | def quit(self): |
||
| 468 | '''Set quit signal''' |
||
| 469 | self._quit = True |
||
| 470 | # stop xmlrpc server |
||
| 471 | if hasattr(self, 'xmlrpc_server'): |
||
| 472 | self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop) |
||
| 473 | self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop) |
||
| 474 | |||
| 475 | def run_once(self): |
||
| 476 | '''comsume queues and feed tasks to fetcher, once''' |
||
| 477 | |||
| 478 | self._update_projects() |
||
| 479 | self._check_task_done() |
||
| 480 | self._check_request() |
||
| 481 | while self._check_cronjob(): |
||
| 482 | pass |
||
| 483 | self._check_select() |
||
| 484 | self._check_delete() |
||
| 485 | self._try_dump_cnt() |
||
| 486 | |||
| 487 | def run(self): |
||
| 488 | '''Start scheduler loop''' |
||
| 489 | logger.info("loading projects") |
||
| 490 | |||
| 491 | while not self._quit: |
||
| 492 | try: |
||
| 493 | time.sleep(self.LOOP_INTERVAL) |
||
| 494 | self.run_once() |
||
| 495 | self._exceptions = 0 |
||
| 496 | except KeyboardInterrupt: |
||
| 497 | break |
||
| 498 | except Exception as e: |
||
| 499 | logger.exception(e) |
||
| 500 | self._exceptions += 1 |
||
| 501 | if self._exceptions > self.EXCEPTION_LIMIT: |
||
| 502 | break |
||
| 503 | continue |
||
| 504 | |||
| 505 | logger.info("scheduler exiting...") |
||
| 506 | self._dump_cnt() |
||
| 507 | |||
| 508 | def trigger_on_start(self, project): |
||
| 509 | '''trigger an on_start callback of project''' |
||
| 510 | self.newtask_queue.put({ |
||
| 511 | "project": project, |
||
| 512 | "taskid": "on_start", |
||
| 513 | "url": "data:,on_start", |
||
| 514 | "process": { |
||
| 515 | "callback": "on_start", |
||
| 516 | }, |
||
| 517 | }) |
||
| 518 | |||
| 519 | def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False): |
||
| 520 | '''Start xmlrpc interface''' |
||
| 521 | from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication |
||
| 522 | |||
| 523 | application = WSGIXMLRPCApplication() |
||
| 524 | |||
| 525 | application.register_function(self.quit, '_quit') |
||
| 526 | application.register_function(self.__len__, 'size') |
||
| 527 | |||
| 528 | def dump_counter(_time, _type): |
||
| 529 | try: |
||
| 530 | return self._cnt[_time].to_dict(_type) |
||
| 531 | except: |
||
| 532 | logger.exception('') |
||
| 533 | application.register_function(dump_counter, 'counter') |
||
| 534 | |||
| 535 | def new_task(task): |
||
| 536 | if self.task_verify(task): |
||
| 537 | self.newtask_queue.put(task) |
||
| 538 | return True |
||
| 539 | return False |
||
| 540 | application.register_function(new_task, 'newtask') |
||
| 541 | |||
| 542 | def send_task(task): |
||
| 543 | '''dispatch task to fetcher''' |
||
| 544 | self.send_task(task) |
||
| 545 | return True |
||
| 546 | application.register_function(send_task, 'send_task') |
||
| 547 | |||
| 548 | def update_project(): |
||
| 549 | self._force_update_project = True |
||
| 550 | application.register_function(update_project, 'update_project') |
||
| 551 | |||
| 552 | def get_active_tasks(project=None, limit=100): |
||
| 553 | allowed_keys = set(( |
||
| 554 | 'taskid', |
||
| 555 | 'project', |
||
| 556 | 'status', |
||
| 557 | 'url', |
||
| 558 | 'lastcrawltime', |
||
| 559 | 'updatetime', |
||
| 560 | 'track', |
||
| 561 | )) |
||
| 562 | track_allowed_keys = set(( |
||
| 563 | 'ok', |
||
| 564 | 'time', |
||
| 565 | 'follows', |
||
| 566 | 'status_code', |
||
| 567 | )) |
||
| 568 | |||
| 569 | iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects) |
||
| 570 | if x and (k == project if project else True)] |
||
| 571 | tasks = [next(x, None) for x in iters] |
||
| 572 | result = [] |
||
| 573 | |||
| 574 | while len(result) < limit and tasks and not all(x is None for x in tasks): |
||
| 575 | updatetime, task = t = max(t for t in tasks if t) |
||
| 576 | i = tasks.index(t) |
||
| 577 | tasks[i] = next(iters[i], None) |
||
| 578 | for key in list(task): |
||
| 579 | if key == 'track': |
||
| 580 | for k in list(task[key].get('fetch', [])): |
||
| 581 | if k not in track_allowed_keys: |
||
| 582 | del task[key]['fetch'][k] |
||
| 583 | for k in list(task[key].get('process', [])): |
||
| 584 | if k not in track_allowed_keys: |
||
| 585 | del task[key]['process'][k] |
||
| 586 | if key in allowed_keys: |
||
| 587 | continue |
||
| 588 | del task[key] |
||
| 589 | result.append(t) |
||
| 590 | # fix for "<type 'exceptions.TypeError'>:dictionary key must be string" |
||
| 591 | # have no idea why |
||
| 592 | return json.loads(json.dumps(result)) |
||
| 593 | application.register_function(get_active_tasks, 'get_active_tasks') |
||
| 594 | |||
| 595 | import tornado.wsgi |
||
| 596 | import tornado.ioloop |
||
| 597 | import tornado.httpserver |
||
| 598 | |||
| 599 | container = tornado.wsgi.WSGIContainer(application) |
||
| 600 | self.xmlrpc_ioloop = tornado.ioloop.IOLoop() |
||
| 601 | self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop) |
||
| 602 | self.xmlrpc_server.listen(port=port, address=bind) |
||
| 603 | self.xmlrpc_ioloop.start() |
||
| 604 | |||
| 605 | def on_request(self, task): |
||
| 606 | if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT: |
||
| 607 | logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task) |
||
| 608 | return |
||
| 609 | |||
| 610 | oldtask = self.taskdb.get_task(task['project'], task['taskid'], |
||
| 611 | fields=self.merge_task_fields) |
||
| 612 | if oldtask: |
||
| 613 | return self.on_old_request(task, oldtask) |
||
| 614 | else: |
||
| 615 | return self.on_new_request(task) |
||
| 616 | |||
| 617 | def on_new_request(self, task): |
||
| 618 | '''Called when a new request is arrived''' |
||
| 619 | task['status'] = self.taskdb.ACTIVE |
||
| 620 | self.insert_task(task) |
||
| 621 | self.put_task(task) |
||
| 622 | |||
| 623 | project = task['project'] |
||
| 624 | self._cnt['5m'].event((project, 'pending'), +1) |
||
| 625 | self._cnt['1h'].event((project, 'pending'), +1) |
||
| 626 | self._cnt['1d'].event((project, 'pending'), +1) |
||
| 627 | self._cnt['all'].event((project, 'pending'), +1) |
||
| 628 | logger.info('new task %(project)s:%(taskid)s %(url)s', task) |
||
| 629 | return task |
||
| 630 | |||
| 631 | def on_old_request(self, task, old_task): |
||
| 632 | '''Called when a crawled task is arrived''' |
||
| 633 | now = time.time() |
||
| 634 | |||
| 635 | _schedule = task.get('schedule', self.default_schedule) |
||
| 636 | old_schedule = old_task.get('schedule', {}) |
||
| 637 | |||
| 638 | restart = False |
||
| 639 | schedule_age = _schedule.get('age', self.default_schedule['age']) |
||
| 640 | if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'): |
||
| 641 | restart = True |
||
| 642 | elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now: |
||
| 643 | restart = True |
||
| 644 | elif _schedule.get('force_update'): |
||
| 645 | restart = True |
||
| 646 | |||
| 647 | if not restart: |
||
| 648 | logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) |
||
| 649 | return |
||
| 650 | |||
| 651 | task['status'] = self.taskdb.ACTIVE |
||
| 652 | self.update_task(task) |
||
| 653 | self.put_task(task) |
||
| 654 | |||
| 655 | project = task['project'] |
||
| 656 | if old_task['status'] != self.taskdb.ACTIVE: |
||
| 657 | self._cnt['5m'].event((project, 'pending'), +1) |
||
| 658 | self._cnt['1h'].event((project, 'pending'), +1) |
||
| 659 | self._cnt['1d'].event((project, 'pending'), +1) |
||
| 660 | if old_task['status'] == self.taskdb.SUCCESS: |
||
| 661 | self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1) |
||
| 662 | elif old_task['status'] == self.taskdb.FAILED: |
||
| 663 | self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1) |
||
| 664 | logger.info('restart task %(project)s:%(taskid)s %(url)s', task) |
||
| 665 | return task |
||
| 666 | |||
| 667 | def on_task_status(self, task): |
||
| 668 | '''Called when a status pack is arrived''' |
||
| 669 | try: |
||
| 670 | procesok = task['track']['process']['ok'] |
||
| 671 | if not self.task_queue[task['project']].done(task['taskid']): |
||
| 672 | logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task) |
||
| 673 | return None |
||
| 674 | except KeyError as e: |
||
| 675 | logger.error("Bad status pack: %s", e) |
||
| 676 | return None |
||
| 677 | |||
| 678 | if procesok: |
||
| 679 | ret = self.on_task_done(task) |
||
| 680 | else: |
||
| 681 | ret = self.on_task_failed(task) |
||
| 682 | |||
| 683 | if task['track']['fetch'].get('time'): |
||
| 684 | self._cnt['5m_time'].event((task['project'], 'fetch_time'), |
||
| 685 | task['track']['fetch']['time']) |
||
| 686 | if task['track']['process'].get('time'): |
||
| 687 | self._cnt['5m_time'].event((task['project'], 'process_time'), |
||
| 688 | task['track']['process'].get('time')) |
||
| 689 | self.projects[task['project']]['active_tasks'].appendleft((time.time(), task)) |
||
| 690 | return ret |
||
| 691 | |||
| 692 | def on_task_done(self, task): |
||
| 693 | '''Called when a task is done and success, called by `on_task_status`''' |
||
| 694 | task['status'] = self.taskdb.SUCCESS |
||
| 695 | task['lastcrawltime'] = time.time() |
||
| 696 | |||
| 697 | if 'schedule' in task: |
||
| 698 | if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
||
| 699 | task['status'] = self.taskdb.ACTIVE |
||
| 700 | next_exetime = task['schedule'].get('age') |
||
| 701 | task['schedule']['exetime'] = time.time() + next_exetime |
||
| 702 | self.put_task(task) |
||
| 703 | else: |
||
| 704 | del task['schedule'] |
||
| 705 | self.update_task(task) |
||
| 706 | |||
| 707 | project = task['project'] |
||
| 708 | self._cnt['5m'].event((project, 'success'), +1) |
||
| 709 | self._cnt['1h'].event((project, 'success'), +1) |
||
| 710 | self._cnt['1d'].event((project, 'success'), +1) |
||
| 711 | self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1) |
||
| 712 | logger.info('task done %(project)s:%(taskid)s %(url)s', task) |
||
| 713 | return task |
||
| 714 | |||
| 715 | def on_task_failed(self, task): |
||
| 716 | '''Called when a task is failed, called by `on_task_status`''' |
||
| 717 | |||
| 718 | if 'schedule' not in task: |
||
| 719 | old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule']) |
||
| 720 | if old_task is None: |
||
| 721 | logging.error('unknown status pack: %s' % task) |
||
| 722 | return |
||
| 723 | task['schedule'] = old_task.get('schedule', {}) |
||
| 724 | |||
| 725 | retries = task['schedule'].get('retries', self.default_schedule['retries']) |
||
| 726 | retried = task['schedule'].get('retried', 0) |
||
| 727 | |||
| 728 | project_info = self.projects.get(task['project'], {}) |
||
| 729 | retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY |
||
| 730 | next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY[''])) |
||
| 731 | |||
| 732 | if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: |
||
| 733 | next_exetime = min(next_exetime, task['schedule'].get('age')) |
||
| 734 | else: |
||
| 735 | if retried >= retries: |
||
| 736 | next_exetime = -1 |
||
| 737 | elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'): |
||
| 738 | next_exetime = task['schedule'].get('age') |
||
| 739 | |||
| 740 | if next_exetime < 0: |
||
| 741 | task['status'] = self.taskdb.FAILED |
||
| 742 | task['lastcrawltime'] = time.time() |
||
| 743 | self.update_task(task) |
||
| 744 | |||
| 745 | project = task['project'] |
||
| 746 | self._cnt['5m'].event((project, 'failed'), +1) |
||
| 747 | self._cnt['1h'].event((project, 'failed'), +1) |
||
| 748 | self._cnt['1d'].event((project, 'failed'), +1) |
||
| 749 | self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1) |
||
| 750 | logger.info('task failed %(project)s:%(taskid)s %(url)s' % task) |
||
| 751 | return task |
||
| 752 | else: |
||
| 753 | task['schedule']['retried'] = retried + 1 |
||
| 754 | task['schedule']['exetime'] = time.time() + next_exetime |
||
| 755 | task['lastcrawltime'] = time.time() |
||
| 756 | self.update_task(task) |
||
| 757 | self.put_task(task) |
||
| 758 | |||
| 759 | project = task['project'] |
||
| 760 | self._cnt['5m'].event((project, 'retry'), +1) |
||
| 761 | self._cnt['1h'].event((project, 'retry'), +1) |
||
| 762 | self._cnt['1d'].event((project, 'retry'), +1) |
||
| 763 | # self._cnt['all'].event((project, 'retry'), +1) |
||
| 764 | logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % ( |
||
| 765 | retried, retries), task) |
||
| 766 | return task |
||
| 767 | |||
| 768 | def on_select_task(self, task): |
||
| 769 | '''Called when a task is selected to fetch & process''' |
||
| 770 | # inject informations about project |
||
| 771 | logger.info('select %(project)s:%(taskid)s %(url)s', task) |
||
| 772 | |||
| 773 | project_info = self.projects.get(task['project']) |
||
| 774 | assert project_info, 'no such project' |
||
| 775 | task['group'] = project_info.get('group') |
||
| 776 | task['project_md5sum'] = project_info.get('md5sum') |
||
| 777 | task['project_updatetime'] = project_info.get('updatetime', 0) |
||
| 778 | project_info['active_tasks'].appendleft((time.time(), task)) |
||
| 779 | self.send_task(task) |
||
| 780 | return task |
||
| 781 | |||
| 1063 |