| 1 |  |  | #!/usr/bin/env python | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  | # -*- encoding: utf-8 -*- | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  | # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  | # Author: Binux<[email protected]> | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 |  |  | #         http://binux.me | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 |  |  | # Created on 2014-02-07 17:05:11 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 |  |  | import itertools | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 |  |  | import json | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 |  |  | import logging | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  | import os | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  | import time | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  | from collections import deque | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  | from six import iteritems, itervalues | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  | from six.moves import queue as Queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  | from pyspider.libs import counter, utils | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 |  |  | from pyspider.libs.base_handler import BaseHandler | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  | from .task_queue import TaskQueue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 |  |  | logger = logging.getLogger('scheduler') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  | class Project(object): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  |     ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  |     project for scheduler | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 |  |  |     ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  |     def __init__(self, scheduler, project_info): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 |  |  |         ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 |  |  |         ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 |  |  |         self.scheduler = scheduler | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 |  |  |         self.active_tasks = deque(maxlen=scheduler.ACTIVE_TASKS) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 |  |  |         self.task_queue = TaskQueue() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 |  |  |         self.task_loaded = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 |  |  |         self._selected_tasks = False  # selected tasks after recent pause | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 |  |  |         self._send_finished_event_wait = 0  # wait for scheduler.FAIL_PAUSE_NUM loop steps before sending the event | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 |  |  |         self.md5sum = None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 |  |  |         self._send_on_get_info = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 |  |  |         self.waiting_get_info = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 |  |  |         self._paused = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 |  |  |         self._paused_time = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 47 |  |  |         self._unpause_last_seen = None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 48 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 49 |  |  |         self.update(project_info) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 50 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 51 |  |  |     @property | 
            
                                                                                                            
                            
            
                                    
            
            
                | 52 |  |  |     def paused(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 53 |  |  |         if self.scheduler.FAIL_PAUSE_NUM <= 0: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 54 |  |  |             return False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 55 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 56 |  |  |         # unpaused --(last FAIL_PAUSE_NUM task failed)--> paused --(PAUSE_TIME)--> unpause_checking | 
            
                                                                                                            
                            
            
                                    
            
            
                | 57 |  |  |         #                         unpaused <--(last UNPAUSE_CHECK_NUM task have success)--| | 
            
                                                                                                            
                            
            
                                    
            
            
                | 58 |  |  |         #                             paused <--(last UNPAUSE_CHECK_NUM task no success)--| | 
            
                                                                                                            
                            
            
                                    
            
            
                | 59 |  |  |         if not self._paused: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 60 |  |  |             fail_cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 61 |  |  |             for _, task in self.active_tasks: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 62 |  |  |                 # ignore select task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 63 |  |  |                 if task.get('type') == self.scheduler.TASK_PACK: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 64 |  |  |                     continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 65 |  |  |                 if 'process' not in task['track']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 66 |  |  |                     logger.error('process not in task, %r', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 67 |  |  |                 if task['track']['process']['ok']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 68 |  |  |                     break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 69 |  |  |                 else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 70 |  |  |                     fail_cnt += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 71 |  |  |                 if fail_cnt >= self.scheduler.FAIL_PAUSE_NUM: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 72 |  |  |                     break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 73 |  |  |             if fail_cnt >= self.scheduler.FAIL_PAUSE_NUM: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 74 |  |  |                 self._paused = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 75 |  |  |                 self._paused_time = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 76 |  |  |         elif self._paused is True and (self._paused_time + self.scheduler.PAUSE_TIME < time.time()): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 77 |  |  |             self._paused = 'checking' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 78 |  |  |             self._unpause_last_seen = self.active_tasks[0][1] if len(self.active_tasks) else None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 79 |  |  |         elif self._paused == 'checking': | 
            
                                                                                                            
                            
            
                                    
            
            
                | 80 |  |  |             cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 81 |  |  |             fail_cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 82 |  |  |             for _, task in self.active_tasks: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 83 |  |  |                 if task is self._unpause_last_seen: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 84 |  |  |                     break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 85 |  |  |                 # ignore select task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 86 |  |  |                 if task.get('type') == self.scheduler.TASK_PACK: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 87 |  |  |                     continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 88 |  |  |                 cnt += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 89 |  |  |                 if task['track']['process']['ok']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 90 |  |  |                     # break with enough check cnt | 
            
                                                                                                            
                            
            
                                    
            
            
                | 91 |  |  |                     cnt = max(cnt, self.scheduler.UNPAUSE_CHECK_NUM) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 92 |  |  |                     break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 93 |  |  |                 else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 94 |  |  |                     fail_cnt += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 95 |  |  |             if cnt >= self.scheduler.UNPAUSE_CHECK_NUM: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 96 |  |  |                 if fail_cnt == cnt: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 97 |  |  |                     self._paused = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 98 |  |  |                     self._paused_time = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 99 |  |  |                 else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 100 |  |  |                     self._paused = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 101 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 102 |  |  |         return self._paused is True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 103 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 104 |  |  |     def update(self, project_info): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 105 |  |  |         self.project_info = project_info | 
            
                                                                                                            
                            
            
                                    
            
            
                | 106 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 107 |  |  |         self.name = project_info['name'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 108 |  |  |         self.group = project_info['group'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 109 |  |  |         self.db_status = project_info['status'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 110 |  |  |         self.updatetime = project_info['updatetime'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 111 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 112 |  |  |         md5sum = utils.md5string(project_info['script']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 113 |  |  |         if self.md5sum != md5sum: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 114 |  |  |             self.waiting_get_info = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 115 |  |  |             self.md5sum = md5sum | 
            
                                                                                                            
                            
            
                                    
            
            
                | 116 |  |  |         if self.waiting_get_info and self.active: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 117 |  |  |             self._send_on_get_info = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 118 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 119 |  |  |         if self.active: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 120 |  |  |             self.task_queue.rate = project_info['rate'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 121 |  |  |             self.task_queue.burst = project_info['burst'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 122 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 123 |  |  |             self.task_queue.rate = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 124 |  |  |             self.task_queue.burst = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 125 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 126 |  |  |         logger.info('project %s updated, status:%s, paused:%s, %d tasks', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 127 |  |  |                     self.name, self.db_status, self.paused, len(self.task_queue)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 128 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 129 |  |  |     def on_get_info(self, info): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 130 |  |  |         self.waiting_get_info = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 131 |  |  |         self.min_tick = info.get('min_tick', 0) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 132 |  |  |         self.retry_delay = info.get('retry_delay', {}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 133 |  |  |         self.crawl_config = info.get('crawl_config', {}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 134 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 135 |  |  |     @property | 
            
                                                                                                            
                            
            
                                    
            
            
                | 136 |  |  |     def active(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 137 |  |  |         return self.db_status in ('RUNNING', 'DEBUG') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 138 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 139 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 140 |  |  | class Scheduler(object): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 141 |  |  |     UPDATE_PROJECT_INTERVAL = 5 * 60 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 142 |  |  |     default_schedule = { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 143 |  |  |         'priority': 0, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 144 |  |  |         'retries': 3, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 145 |  |  |         'exetime': 0, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 146 |  |  |         'age': -1, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 147 |  |  |         'itag': None, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 148 |  |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 149 |  |  |     LOOP_LIMIT = 1000 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 150 |  |  |     LOOP_INTERVAL = 0.1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 151 |  |  |     ACTIVE_TASKS = 100 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 152 |  |  |     INQUEUE_LIMIT = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 153 |  |  |     EXCEPTION_LIMIT = 3 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 154 |  |  |     DELETE_TIME = 24 * 60 * 60 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 155 |  |  |     DEFAULT_RETRY_DELAY = { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 156 |  |  |         0: 30, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 157 |  |  |         1: 1*60*60, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 158 |  |  |         2: 6*60*60, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 159 |  |  |         3: 12*60*60, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 160 |  |  |         '': 24*60*60 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 161 |  |  |     } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 162 |  |  |     FAIL_PAUSE_NUM = 10 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 163 |  |  |     PAUSE_TIME = 5*60 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 164 |  |  |     UNPAUSE_CHECK_NUM = 3 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 165 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 166 |  |  |     TASK_PACK = 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 167 |  |  |     STATUS_PACK = 2  # current not used | 
            
                                                                                                            
                            
            
                                    
            
            
                | 168 |  |  |     REQUEST_PACK = 3  # current not used | 
            
                                                                                                            
                            
            
                                    
            
            
                | 169 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 170 |  |  |     def __init__(self, taskdb, projectdb, newtask_queue, status_queue, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 171 |  |  |                  out_queue, data_path='./data', resultdb=None): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 172 |  |  |         self.taskdb = taskdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 173 |  |  |         self.projectdb = projectdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 174 |  |  |         self.resultdb = resultdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 175 |  |  |         self.newtask_queue = newtask_queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 176 |  |  |         self.status_queue = status_queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 177 |  |  |         self.out_queue = out_queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 178 |  |  |         self.data_path = data_path | 
            
                                                                                                            
                            
            
                                    
            
            
                | 179 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 180 |  |  |         self._send_buffer = deque() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 181 |  |  |         self._quit = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 182 |  |  |         self._exceptions = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 183 |  |  |         self.projects = dict() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 184 |  |  |         self._force_update_project = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 185 |  |  |         self._last_update_project = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 186 |  |  |         self._last_tick = int(time.time()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 187 |  |  |         self._postpone_request = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 188 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 189 |  |  |         self._cnt = { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 190 |  |  |             "5m_time": counter.CounterManager( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 191 |  |  |                 lambda: counter.TimebaseAverageEventCounter(30, 10)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 192 |  |  |             "5m": counter.CounterManager( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 193 |  |  |                 lambda: counter.TimebaseAverageWindowCounter(30, 10)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 194 |  |  |             "1h": counter.CounterManager( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 195 |  |  |                 lambda: counter.TimebaseAverageWindowCounter(60, 60)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 196 |  |  |             "1d": counter.CounterManager( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 197 |  |  |                 lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 198 |  |  |             "all": counter.CounterManager( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 199 |  |  |                 lambda: counter.TotalCounter()), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 200 |  |  |         } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 201 |  |  |         self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 202 |  |  |         self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 203 |  |  |         self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 204 |  |  |         self._last_dump_cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 205 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 206 |  |  |     def _update_projects(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 207 |  |  |         '''Check project update''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 208 |  |  |         now = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 209 |  |  |         if ( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 210 |  |  |                 not self._force_update_project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 211 |  |  |                 and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now | 
            
                                                                                                            
                            
            
                                    
            
            
                | 212 |  |  |         ): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 213 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 214 |  |  |         for project in self.projectdb.check_update(self._last_update_project): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 215 |  |  |             self._update_project(project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 216 |  |  |             logger.debug("project: %s updated.", project['name']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 217 |  |  |         self._force_update_project = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 218 |  |  |         self._last_update_project = now | 
            
                                                                                                            
                            
            
                                    
            
            
                | 219 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 220 |  |  |     get_info_attributes = ['min_tick', 'retry_delay', 'crawl_config'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 221 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 222 |  |  |     def _update_project(self, project): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 223 |  |  |         '''update one project''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 224 |  |  |         if project['name'] not in self.projects: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 225 |  |  |             self.projects[project['name']] = Project(self, project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 226 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 227 |  |  |             self.projects[project['name']].update(project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 228 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 229 |  |  |         project = self.projects[project['name']] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 230 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 231 |  |  |         if project._send_on_get_info: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 232 |  |  |             # update project runtime info from processor by sending a _on_get_info | 
            
                                                                                                            
                            
            
                                    
            
            
                | 233 |  |  |             # request, result is in status_page.track.save | 
            
                                                                                                            
                            
            
                                    
            
            
                | 234 |  |  |             project._send_on_get_info = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 235 |  |  |             self.on_select_task({ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 236 |  |  |                 'taskid': '_on_get_info', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 237 |  |  |                 'project': project.name, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 238 |  |  |                 'url': 'data:,_on_get_info', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 239 |  |  |                 'status': self.taskdb.SUCCESS, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 240 |  |  |                 'fetch': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 241 |  |  |                     'save': self.get_info_attributes, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 242 |  |  |                 }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 243 |  |  |                 'process': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 244 |  |  |                     'callback': '_on_get_info', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 245 |  |  |                 }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 246 |  |  |             }) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 247 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 248 |  |  |         # load task queue when project is running and delete task_queue when project is stoped | 
            
                                                                                                            
                            
            
                                    
            
            
                | 249 |  |  |         if project.active: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 250 |  |  |             if not project.task_loaded: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 251 |  |  |                 self._load_tasks(project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 252 |  |  |                 project.task_loaded = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 253 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 254 |  |  |             if project.task_loaded: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 255 |  |  |                 project.task_queue = TaskQueue() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 256 |  |  |                 project.task_loaded = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 257 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 258 |  |  |             if project not in self._cnt['all']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 259 |  |  |                 self._update_project_cnt(project.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 260 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 261 |  |  |     scheduler_task_fields = ['taskid', 'project', 'schedule', ] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 262 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 263 |  |  |     def _load_tasks(self, project): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 264 |  |  |         '''load tasks from database''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 265 |  |  |         task_queue = project.task_queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 266 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 267 |  |  |         for task in self.taskdb.load_tasks( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 268 |  |  |                 self.taskdb.ACTIVE, project.name, self.scheduler_task_fields | 
            
                                                                                                            
                            
            
                                    
            
            
                | 269 |  |  |         ): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 270 |  |  |             taskid = task['taskid'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 271 |  |  |             _schedule = task.get('schedule', self.default_schedule) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 272 |  |  |             priority = _schedule.get('priority', self.default_schedule['priority']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 273 |  |  |             exetime = _schedule.get('exetime', self.default_schedule['exetime']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 274 |  |  |             task_queue.put(taskid, priority, exetime) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 275 |  |  |         project.task_loaded = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 276 |  |  |         logger.debug('project: %s loaded %d tasks.', project.name, len(task_queue)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 277 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 278 |  |  |         if project not in self._cnt['all']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 279 |  |  |             self._update_project_cnt(project.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 280 |  |  |         self._cnt['all'].value((project.name, 'pending'), len(project.task_queue)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 281 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 282 |  |  |     def _update_project_cnt(self, project_name): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 283 |  |  |         status_count = self.taskdb.status_count(project_name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 284 |  |  |         self._cnt['all'].value( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 285 |  |  |             (project_name, 'success'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 286 |  |  |             status_count.get(self.taskdb.SUCCESS, 0) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 287 |  |  |         ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 288 |  |  |         self._cnt['all'].value( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 289 |  |  |             (project_name, 'failed'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 290 |  |  |             status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 291 |  |  |         ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 292 |  |  |         self._cnt['all'].value( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 293 |  |  |             (project_name, 'pending'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 294 |  |  |             status_count.get(self.taskdb.ACTIVE, 0) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 295 |  |  |         ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 296 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 297 |  |  |     def task_verify(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 298 |  |  |         ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 299 |  |  |         return False if any of 'taskid', 'project', 'url' is not in task dict | 
            
                                                                                                            
                            
            
                                    
            
            
                | 300 |  |  |                         or project in not in task_queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 301 |  |  |         ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 302 |  |  |         for each in ('taskid', 'project', 'url', ): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 303 |  |  |             if each not in task or not task[each]: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 304 |  |  |                 logger.error('%s not in task: %.200r', each, task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 305 |  |  |                 return False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 306 |  |  |         if task['project'] not in self.projects: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 307 |  |  |             logger.error('unknown project: %s', task['project']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 308 |  |  |             return False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 309 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 310 |  |  |         project = self.projects[task['project']] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 311 |  |  |         if not project.active: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 312 |  |  |             logger.error('project %s not started, please set status to RUNNING or DEBUG', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 313 |  |  |                          task['project']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 314 |  |  |             return False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 315 |  |  |         return True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 316 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 317 |  |  |     def insert_task(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 318 |  |  |         '''insert task into database''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 319 |  |  |         return self.taskdb.insert(task['project'], task['taskid'], task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 320 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 321 |  |  |     def update_task(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 322 |  |  |         '''update task in database''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 323 |  |  |         return self.taskdb.update(task['project'], task['taskid'], task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 324 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 325 |  |  |     def put_task(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 326 |  |  |         '''put task to task queue''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 327 |  |  |         _schedule = task.get('schedule', self.default_schedule) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 328 |  |  |         self.projects[task['project']].task_queue.put( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 329 |  |  |             task['taskid'], | 
            
                                                                                                            
                            
            
                                    
            
            
                | 330 |  |  |             priority=_schedule.get('priority', self.default_schedule['priority']), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 331 |  |  |             exetime=_schedule.get('exetime', self.default_schedule['exetime']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 332 |  |  |         ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 333 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 334 |  |  |     def send_task(self, task, force=True): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 335 |  |  |         ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 336 |  |  |         dispatch task to fetcher | 
            
                                                                                                            
                            
            
                                    
            
            
                | 337 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 338 |  |  |         out queue may have size limit to prevent block, a send_buffer is used | 
            
                                                                                                            
                            
            
                                    
            
            
                | 339 |  |  |         ''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 340 |  |  |         try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 341 |  |  |             self.out_queue.put_nowait(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 342 |  |  |         except Queue.Full: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 343 |  |  |             if force: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 344 |  |  |                 self._send_buffer.appendleft(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 345 |  |  |             else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 346 |  |  |                 raise | 
            
                                                                                                            
                            
            
                                    
            
            
                | 347 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 348 |  |  |     def _check_task_done(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 349 |  |  |         '''Check status queue''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 350 |  |  |         cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 351 |  |  |         try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 352 |  |  |             while True: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 353 |  |  |                 task = self.status_queue.get_nowait() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 354 |  |  |                 # check _on_get_info result here | 
            
                                                                                                            
                            
            
                                    
            
            
                | 355 |  |  |                 if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 356 |  |  |                     if task['project'] not in self.projects: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 357 |  |  |                         continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 358 |  |  |                     project = self.projects[task['project']] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 359 |  |  |                     project.on_get_info(task['track'].get('save') or {}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 360 |  |  |                     logger.info( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 361 |  |  |                         '%s on_get_info %r', task['project'], task['track'].get('save', {}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 362 |  |  |                     ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 363 |  |  |                     continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 364 |  |  |                 elif not self.task_verify(task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 365 |  |  |                     continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 366 |  |  |                 self.on_task_status(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 367 |  |  |                 cnt += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 368 |  |  |         except Queue.Empty: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 369 |  |  |             pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 370 |  |  |         return cnt | 
            
                                                                                                            
                            
            
                                    
            
            
                | 371 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 372 |  |  |     merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 373 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 374 |  |  |     def _check_request(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 375 |  |  |         '''Check new task queue''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 376 |  |  |         # check _postpone_request first | 
            
                                                                                                            
                            
            
                                    
            
            
                | 377 |  |  |         todo = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 378 |  |  |         for task in self._postpone_request: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 379 |  |  |             if task['project'] not in self.projects: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 380 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 381 |  |  |             if self.projects[task['project']].task_queue.is_processing(task['taskid']): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 382 |  |  |                 todo.append(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 383 |  |  |             else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 384 |  |  |                 self.on_request(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 385 |  |  |         self._postpone_request = todo | 
            
                                                                                                            
                            
            
                                    
            
            
                | 386 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 387 |  |  |         tasks = {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 388 |  |  |         while len(tasks) < self.LOOP_LIMIT: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 389 |  |  |             try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 390 |  |  |                 task = self.newtask_queue.get_nowait() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 391 |  |  |             except Queue.Empty: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 392 |  |  |                 break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 393 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 394 |  |  |             if isinstance(task, list): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 395 |  |  |                 _tasks = task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 396 |  |  |             else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 397 |  |  |                 _tasks = (task, ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 398 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 399 |  |  |             for task in _tasks: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 400 |  |  |                 if not self.task_verify(task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 401 |  |  |                     continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 402 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 403 |  |  |                 if task['taskid'] in self.projects[task['project']].task_queue: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 404 |  |  |                     if not task.get('schedule', {}).get('force_update', False): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 405 |  |  |                         logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 406 |  |  |                         continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 407 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 408 |  |  |                 if task['taskid'] in tasks: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 409 |  |  |                     if not task.get('schedule', {}).get('force_update', False): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 410 |  |  |                         continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 411 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 412 |  |  |                 tasks[task['taskid']] = task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 413 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 414 |  |  |         for task in itervalues(tasks): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 415 |  |  |             self.on_request(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 416 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 417 |  |  |         return len(tasks) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 418 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 419 |  |  |     def _check_cronjob(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 420 |  |  |         """Check projects cronjob tick, return True when a new tick is sended""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 421 |  |  |         now = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 422 |  |  |         self._last_tick = int(self._last_tick) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 423 |  |  |         if now - self._last_tick < 1: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 424 |  |  |             return False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 425 |  |  |         self._last_tick += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 426 |  |  |         for project in itervalues(self.projects): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 427 |  |  |             if not project.active: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 428 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 429 |  |  |             if project.waiting_get_info: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 430 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 431 |  |  |             if int(project.min_tick) == 0: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 432 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 433 |  |  |             if self._last_tick % int(project.min_tick) != 0: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 434 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 435 |  |  |             self.on_select_task({ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 436 |  |  |                 'taskid': '_on_cronjob', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 437 |  |  |                 'project': project.name, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 438 |  |  |                 'url': 'data:,_on_cronjob', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 439 |  |  |                 'status': self.taskdb.SUCCESS, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 440 |  |  |                 'fetch': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 441 |  |  |                     'save': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 442 |  |  |                         'tick': self._last_tick, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 443 |  |  |                     }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 444 |  |  |                 }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 445 |  |  |                 'process': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 446 |  |  |                     'callback': '_on_cronjob', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 447 |  |  |                 }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 448 |  |  |             }) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 449 |  |  |         return True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 450 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 451 |  |  |     request_task_fields = [ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 452 |  |  |         'taskid', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 453 |  |  |         'project', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 454 |  |  |         'url', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 455 |  |  |         'status', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 456 |  |  |         'schedule', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 457 |  |  |         'fetch', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 458 |  |  |         'process', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 459 |  |  |         'track', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 460 |  |  |         'lastcrawltime' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 461 |  |  |     ] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 462 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 463 |  |  |     def _check_select(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 464 |  |  |         '''Select task to fetch & process''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 465 |  |  |         while self._send_buffer: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 466 |  |  |             _task = self._send_buffer.pop() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 467 |  |  |             try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 468 |  |  |                 # use force=False here to prevent automatic send_buffer append and get exception | 
            
                                                                                                            
                            
            
                                    
            
            
                | 469 |  |  |                 self.send_task(_task, False) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 470 |  |  |             except Queue.Full: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 471 |  |  |                 self._send_buffer.append(_task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 472 |  |  |                 break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 473 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 474 |  |  |         if self.out_queue.full(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 475 |  |  |             return {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 476 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 477 |  |  |         taskids = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 478 |  |  |         cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 479 |  |  |         cnt_dict = dict() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 480 |  |  |         limit = self.LOOP_LIMIT | 
            
                                                                                                            
                            
            
                                    
            
            
                | 481 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 482 |  |  |         # dynamic assign select limit for each project, use qsize as weight | 
            
                                                                                                            
                            
            
                                    
            
            
                | 483 |  |  |         project_weights, total_weight = dict(), 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 484 |  |  |         for project in itervalues(self.projects):  # type:Project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 485 |  |  |             if not project.active: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 486 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 487 |  |  |             # only check project pause when select new tasks, cronjob and new request still working | 
            
                                                                                                            
                            
            
                                    
            
            
                | 488 |  |  |             if project.paused: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 489 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 490 |  |  |             if project.waiting_get_info: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 491 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 492 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 493 |  |  |             # task queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 494 |  |  |             task_queue = project.task_queue  # type:TaskQueue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 495 |  |  |             pro_weight = task_queue.size() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 496 |  |  |             total_weight += pro_weight | 
            
                                                                                                            
                            
            
                                    
            
            
                | 497 |  |  |             project_weights[project.name] = pro_weight | 
            
                                                                                                            
                            
            
                                    
            
            
                | 498 |  |  |             pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 499 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 500 |  |  |         min_project_limit = int(limit / 10.)  # ensure minimum select limit for each project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 501 |  |  |         max_project_limit = int(limit / 3.0)  # ensure maximum select limit for each project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 502 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 503 |  |  |         for pro_name, pro_weight in iteritems(project_weights): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 504 |  |  |             if cnt >= limit: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 505 |  |  |                 break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 506 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 507 |  |  |             project = self.projects[pro_name]  # type:Project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 508 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 509 |  |  |             # task queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 510 |  |  |             task_queue = project.task_queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 511 |  |  |             task_queue.check_update() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 512 |  |  |             project_cnt = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 513 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 514 |  |  |             # calculate select limit for project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 515 |  |  |             if total_weight < 1 or pro_weight < 1: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 516 |  |  |                 project_limit = min_project_limit | 
            
                                                                                                            
                            
            
                                    
            
            
                | 517 |  |  |             else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 518 |  |  |                 project_limit = int((1.0 * pro_weight / total_weight) * limit) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 519 |  |  |                 if project_limit < min_project_limit: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 520 |  |  |                     project_limit = min_project_limit | 
            
                                                                                                            
                            
            
                                    
            
            
                | 521 |  |  |                 elif project_limit > max_project_limit: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 522 |  |  |                     project_limit = max_project_limit | 
            
                                                                                                            
                            
            
                                    
            
            
                | 523 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 524 |  |  |             # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks | 
            
                                                                                                            
                            
            
                                    
            
            
                | 525 |  |  |             while cnt < limit and project_cnt < project_limit: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 526 |  |  |                 taskid = task_queue.get() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 527 |  |  |                 if not taskid: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 528 |  |  |                     break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 529 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 530 |  |  |                 taskids.append((project.name, taskid)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 531 |  |  |                 if taskid != 'on_finished': | 
            
                                                                                                            
                            
            
                                    
            
            
                | 532 |  |  |                     project_cnt += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 533 |  |  |                 cnt += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 534 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 535 |  |  |             cnt_dict[project.name] = project_cnt | 
            
                                                                                                            
                            
            
                                    
            
            
                | 536 |  |  |             if project_cnt: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 537 |  |  |                 project._selected_tasks = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 538 |  |  |                 project._send_finished_event_wait = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 539 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 540 |  |  |             # check and send finished event to project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 541 |  |  |             if not project_cnt and len(task_queue) == 0 and project._selected_tasks: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 542 |  |  |                 # wait for self.FAIL_PAUSE_NUM steps to make sure all tasks in queue have been processed | 
            
                                                                                                            
                            
            
                                    
            
            
                | 543 |  |  |                 if project._send_finished_event_wait < self.FAIL_PAUSE_NUM: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 544 |  |  |                     project._send_finished_event_wait += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 545 |  |  |                 else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 546 |  |  |                     project._selected_tasks = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 547 |  |  |                     project._send_finished_event_wait = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 548 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 549 |  |  |                     self._postpone_request.append({ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 550 |  |  |                         'project': project.name, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 551 |  |  |                         'taskid': 'on_finished', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 552 |  |  |                         'url': 'data:,on_finished', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 553 |  |  |                         'process': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 554 |  |  |                             'callback': 'on_finished', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 555 |  |  |                         }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 556 |  |  |                         "schedule": { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 557 |  |  |                             "age": 0, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 558 |  |  |                             "priority": 9, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 559 |  |  |                             "force_update": True, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 560 |  |  |                         }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 561 |  |  |                     }) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 562 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 563 |  |  |         for project, taskid in taskids: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 564 |  |  |             self._load_put_task(project, taskid) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 565 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 566 |  |  |         return cnt_dict | 
            
                                                                                                            
                            
            
                                    
            
            
                | 567 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 568 |  |  |     def _load_put_task(self, project, taskid): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 569 |  |  |         try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 570 |  |  |             task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 571 |  |  |         except ValueError: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 572 |  |  |             logger.error('bad task pack %s:%s', project, taskid) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 573 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 574 |  |  |         if not task: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 575 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 576 |  |  |         task = self.on_select_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 577 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 578 |  |  |     def _print_counter_log(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 579 |  |  |         # print top 5 active counters | 
            
                                                                                                            
                            
            
                                    
            
            
                | 580 |  |  |         keywords = ('pending', 'success', 'retry', 'failed') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 581 |  |  |         total_cnt = {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 582 |  |  |         project_actives = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 583 |  |  |         project_fails = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 584 |  |  |         for key in keywords: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 585 |  |  |             total_cnt[key] = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 586 |  |  |         for project, subcounter in iteritems(self._cnt['5m']): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 587 |  |  |             actives = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 588 |  |  |             for key in keywords: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 589 |  |  |                 cnt = subcounter.get(key, None) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 590 |  |  |                 if cnt: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 591 |  |  |                     cnt = cnt.sum | 
            
                                                                                                            
                            
            
                                    
            
            
                | 592 |  |  |                     total_cnt[key] += cnt | 
            
                                                                                                            
                            
            
                                    
            
            
                | 593 |  |  |                     actives += cnt | 
            
                                                                                                            
                            
            
                                    
            
            
                | 594 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 595 |  |  |             project_actives.append((actives, project)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 596 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 597 |  |  |             fails = subcounter.get('failed', None) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 598 |  |  |             if fails: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 599 |  |  |                 project_fails.append((fails.sum, project)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 600 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 601 |  |  |         top_2_fails = sorted(project_fails, reverse=True)[:2] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 602 |  |  |         top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails], | 
            
                                                                                                            
                            
            
                                    
            
            
                | 603 |  |  |                                reverse=True)[:5 - len(top_2_fails)] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 604 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 605 |  |  |         log_str = ("in 5m: new:%(pending)d,success:%(success)d," | 
            
                                                                                                            
                            
            
                                    
            
            
                | 606 |  |  |                    "retry:%(retry)d,failed:%(failed)d" % total_cnt) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 607 |  |  |         for _, project in itertools.chain(top_3_actives, top_2_fails): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 608 |  |  |             subcounter = self._cnt['5m'][project].to_dict(get_value='sum') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 609 |  |  |             log_str += " %s:%d,%d,%d,%d" % (project, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 610 |  |  |                                             subcounter.get('pending', 0), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 611 |  |  |                                             subcounter.get('success', 0), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 612 |  |  |                                             subcounter.get('retry', 0), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 613 |  |  |                                             subcounter.get('failed', 0)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 614 |  |  |         logger.info(log_str) | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 615 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 616 |  |  |     def _dump_cnt(self): | 
            
                                                                        
                            
            
                                    
            
            
                | 617 |  |  |         '''Dump counters to file''' | 
            
                                                                        
                            
            
                                    
            
            
                | 618 |  |  |         self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h')) | 
            
                                                                        
                            
            
                                    
            
            
                | 619 |  |  |         self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d')) | 
            
                                                                        
                            
            
                                    
            
            
                | 620 |  |  |         self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 621 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 622 |  |  |     def _try_dump_cnt(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 623 |  |  |         '''Dump counters every 60 seconds''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 624 |  |  |         now = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 625 |  |  |         if now - self._last_dump_cnt > 60: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 626 |  |  |             self._last_dump_cnt = now | 
            
                                                                                                            
                            
            
                                    
            
            
                | 627 |  |  |             self._dump_cnt() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 628 |  |  |             self._print_counter_log() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 629 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 630 |  |  |     def _check_delete(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 631 |  |  |         '''Check project delete''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 632 |  |  |         now = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 633 |  |  |         for project in list(itervalues(self.projects)): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 634 |  |  |             if project.db_status != 'STOP': | 
            
                                                                                                            
                            
            
                                    
            
            
                | 635 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 636 |  |  |             if now - project.updatetime < self.DELETE_TIME: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 637 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 638 |  |  |             if 'delete' not in self.projectdb.split_group(project.group): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 639 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 640 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 641 |  |  |             logger.warning("deleting project: %s!", project.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 642 |  |  |             del self.projects[project.name] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 643 |  |  |             self.taskdb.drop(project.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 644 |  |  |             self.projectdb.drop(project.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 645 |  |  |             if self.resultdb: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 646 |  |  |                 self.resultdb.drop(project.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 647 |  |  |             for each in self._cnt.values(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 648 |  |  |                 del each[project.name] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 649 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 650 |  |  |     def __len__(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 651 |  |  |         return sum(len(x.task_queue) for x in itervalues(self.projects)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 652 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 653 |  |  |     def quit(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 654 |  |  |         '''Set quit signal''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 655 |  |  |         self._quit = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 656 |  |  |         # stop xmlrpc server | 
            
                                                                                                            
                            
            
                                    
            
            
                | 657 |  |  |         if hasattr(self, 'xmlrpc_server'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 658 |  |  |             self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 659 |  |  |             self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 660 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 661 |  |  |     def run_once(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 662 |  |  |         '''comsume queues and feed tasks to fetcher, once''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 663 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 664 |  |  |         self._update_projects() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 665 |  |  |         self._check_task_done() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 666 |  |  |         self._check_request() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 667 |  |  |         while self._check_cronjob(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 668 |  |  |             pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 669 |  |  |         self._check_select() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 670 |  |  |         self._check_delete() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 671 |  |  |         self._try_dump_cnt() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 672 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 673 |  |  |     def run(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 674 |  |  |         '''Start scheduler loop''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 675 |  |  |         logger.info("scheduler starting...") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 676 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 677 |  |  |         while not self._quit: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 678 |  |  |             try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 679 |  |  |                 time.sleep(self.LOOP_INTERVAL) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 680 |  |  |                 self.run_once() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 681 |  |  |                 self._exceptions = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 682 |  |  |             except KeyboardInterrupt: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 683 |  |  |                 break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 684 |  |  |             except Exception as e: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 685 |  |  |                 logger.exception(e) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 686 |  |  |                 self._exceptions += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 687 |  |  |                 if self._exceptions > self.EXCEPTION_LIMIT: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 688 |  |  |                     break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 689 |  |  |                 continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 690 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 691 |  |  |         logger.info("scheduler exiting...") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 692 |  |  |         self._dump_cnt() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 693 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 694 |  |  |     def trigger_on_start(self, project): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 695 |  |  |         '''trigger an on_start callback of project''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 696 |  |  |         self.newtask_queue.put({ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 697 |  |  |             "project": project, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 698 |  |  |             "taskid": "on_start", | 
            
                                                                                                            
                            
            
                                    
            
            
                | 699 |  |  |             "url": "data:,on_start", | 
            
                                                                                                            
                            
            
                                    
            
            
                | 700 |  |  |             "process": { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 701 |  |  |                 "callback": "on_start", | 
            
                                                                                                            
                            
            
                                    
            
            
                | 702 |  |  |             }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 703 |  |  |         }) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 704 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 705 |  |  |     def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 706 |  |  |         '''Start xmlrpc interface''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 707 |  |  |         from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication | 
            
                                                                                                            
                            
            
                                    
            
            
                | 708 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 709 |  |  |         application = WSGIXMLRPCApplication() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 710 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 711 |  |  |         application.register_function(self.quit, '_quit') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 712 |  |  |         application.register_function(self.__len__, 'size') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 713 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 714 |  |  |         def dump_counter(_time, _type): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 715 |  |  |             try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 716 |  |  |                 return self._cnt[_time].to_dict(_type) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 717 |  |  |             except: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 718 |  |  |                 logger.exception('') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 719 |  |  |         application.register_function(dump_counter, 'counter') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 720 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 721 |  |  |         def new_task(task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 722 |  |  |             if self.task_verify(task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 723 |  |  |                 self.newtask_queue.put(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 724 |  |  |                 return True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 725 |  |  |             return False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 726 |  |  |         application.register_function(new_task, 'newtask') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 727 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 728 |  |  |         def send_task(task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 729 |  |  |             '''dispatch task to fetcher''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 730 |  |  |             self.send_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 731 |  |  |             return True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 732 |  |  |         application.register_function(send_task, 'send_task') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 733 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 734 |  |  |         def update_project(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 735 |  |  |             self._force_update_project = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 736 |  |  |         application.register_function(update_project, 'update_project') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 737 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 738 |  |  |         def get_active_tasks(project=None, limit=100): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 739 |  |  |             allowed_keys = set(( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 740 |  |  |                 'type', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 741 |  |  |                 'taskid', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 742 |  |  |                 'project', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 743 |  |  |                 'status', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 744 |  |  |                 'url', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 745 |  |  |                 'lastcrawltime', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 746 |  |  |                 'updatetime', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 747 |  |  |                 'track', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 748 |  |  |             )) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 749 |  |  |             track_allowed_keys = set(( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 750 |  |  |                 'ok', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 751 |  |  |                 'time', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 752 |  |  |                 'follows', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 753 |  |  |                 'status_code', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 754 |  |  |             )) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 755 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 756 |  |  |             iters = [iter(x.active_tasks) for k, x in iteritems(self.projects) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 757 |  |  |                      if x and (k == project if project else True)] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 758 |  |  |             tasks = [next(x, None) for x in iters] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 759 |  |  |             result = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 760 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 761 |  |  |             while len(result) < limit and tasks and not all(x is None for x in tasks): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 762 |  |  |                 updatetime, task = t = max(t for t in tasks if t) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 763 |  |  |                 i = tasks.index(t) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 764 |  |  |                 tasks[i] = next(iters[i], None) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 765 |  |  |                 for key in list(task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 766 |  |  |                     if key == 'track': | 
            
                                                                                                            
                            
            
                                    
            
            
                | 767 |  |  |                         for k in list(task[key].get('fetch', [])): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 768 |  |  |                             if k not in track_allowed_keys: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 769 |  |  |                                 del task[key]['fetch'][k] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 770 |  |  |                         for k in list(task[key].get('process', [])): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 771 |  |  |                             if k not in track_allowed_keys: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 772 |  |  |                                 del task[key]['process'][k] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 773 |  |  |                     if key in allowed_keys: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 774 |  |  |                         continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 775 |  |  |                     del task[key] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 776 |  |  |                 result.append(t) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 777 |  |  |             # fix for "<type 'exceptions.TypeError'>:dictionary key must be string" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 778 |  |  |             # have no idea why | 
            
                                                                                                            
                            
            
                                    
            
            
                | 779 |  |  |             return json.loads(json.dumps(result)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 780 |  |  |         application.register_function(get_active_tasks, 'get_active_tasks') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 781 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 782 |  |  |         def get_projects_pause_status(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 783 |  |  |             result = {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 784 |  |  |             for project_name, project in iteritems(self.projects): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 785 |  |  |                 result[project_name] = project.paused | 
            
                                                                                                            
                            
            
                                    
            
            
                | 786 |  |  |             return result | 
            
                                                                                                            
                            
            
                                    
            
            
                | 787 |  |  |         application.register_function(get_projects_pause_status, 'get_projects_pause_status') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 788 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 789 |  |  |         def webui_update(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 790 |  |  |             return { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 791 |  |  |                 'pause_status': get_projects_pause_status(), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 792 |  |  |                 'counter': { | 
            
                                                                                                            
                            
            
                                    
            
            
                | 793 |  |  |                     '5m_time': dump_counter('5m_time', 'avg'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 794 |  |  |                     '5m': dump_counter('5m', 'sum'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 795 |  |  |                     '1h': dump_counter('1h', 'sum'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 796 |  |  |                     '1d': dump_counter('1d', 'sum'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 797 |  |  |                     'all': dump_counter('all', 'sum'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 798 |  |  |                 }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 799 |  |  |             } | 
            
                                                                                                            
                            
            
                                    
            
            
                | 800 |  |  |         application.register_function(webui_update, 'webui_update') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 801 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 802 |  |  |         import tornado.wsgi | 
            
                                                                                                            
                            
            
                                    
            
            
                | 803 |  |  |         import tornado.ioloop | 
            
                                                                                                            
                            
            
                                    
            
            
                | 804 |  |  |         import tornado.httpserver | 
            
                                                                                                            
                            
            
                                    
            
            
                | 805 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 806 |  |  |         container = tornado.wsgi.WSGIContainer(application) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 807 |  |  |         self.xmlrpc_ioloop = tornado.ioloop.IOLoop() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 808 |  |  |         self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 809 |  |  |         self.xmlrpc_server.listen(port=port, address=bind) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 810 |  |  |         logger.info('scheduler.xmlrpc listening on %s:%s', bind, port) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 811 |  |  |         self.xmlrpc_ioloop.start() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 812 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 813 |  |  |     def on_request(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 814 |  |  |         if self.INQUEUE_LIMIT and len(self.projects[task['project']].task_queue) >= self.INQUEUE_LIMIT: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 815 |  |  |             logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 816 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 817 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 818 |  |  |         oldtask = self.taskdb.get_task(task['project'], task['taskid'], | 
            
                                                                                                            
                            
            
                                    
            
            
                | 819 |  |  |                                        fields=self.merge_task_fields) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 820 |  |  |         if oldtask: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 821 |  |  |             return self.on_old_request(task, oldtask) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 822 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 823 |  |  |             return self.on_new_request(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 824 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 825 |  |  |     def on_new_request(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 826 |  |  |         '''Called when a new request is arrived''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 827 |  |  |         task['status'] = self.taskdb.ACTIVE | 
            
                                                                                                            
                            
            
                                    
            
            
                | 828 |  |  |         self.insert_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 829 |  |  |         self.put_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 830 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 831 |  |  |         project = task['project'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 832 |  |  |         self._cnt['5m'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 833 |  |  |         self._cnt['1h'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 834 |  |  |         self._cnt['1d'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 835 |  |  |         self._cnt['all'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 836 |  |  |         logger.info('new task %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 837 |  |  |         return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 838 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 839 |  |  |     def on_old_request(self, task, old_task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 840 |  |  |         '''Called when a crawled task is arrived''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 841 |  |  |         now = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 842 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 843 |  |  |         _schedule = task.get('schedule', self.default_schedule) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 844 |  |  |         old_schedule = old_task.get('schedule', {}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 845 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 846 |  |  |         if _schedule.get('force_update') and self.projects[task['project']].task_queue.is_processing(task['taskid']): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 847 |  |  |             # when a task is in processing, the modify may conflict with the running task. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 848 |  |  |             # postpone the modify after task finished. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 849 |  |  |             logger.info('postpone modify task %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 850 |  |  |             self._postpone_request.append(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 851 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 852 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 853 |  |  |         restart = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 854 |  |  |         schedule_age = _schedule.get('age', self.default_schedule['age']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 855 |  |  |         if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 856 |  |  |             restart = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 857 |  |  |         elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 858 |  |  |             restart = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 859 |  |  |         elif _schedule.get('force_update'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 860 |  |  |             restart = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 861 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 862 |  |  |         if not restart: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 863 |  |  |             logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 864 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 865 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 866 |  |  |         if _schedule.get('cancel'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 867 |  |  |             logger.info('cancel task %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 868 |  |  |             task['status'] = self.taskdb.BAD | 
            
                                                                                                            
                            
            
                                    
            
            
                | 869 |  |  |             self.update_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 870 |  |  |             self.projects[task['project']].task_queue.delete(task['taskid']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 871 |  |  |             return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 872 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 873 |  |  |         task['status'] = self.taskdb.ACTIVE | 
            
                                                                                                            
                            
            
                                    
            
            
                | 874 |  |  |         self.update_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 875 |  |  |         self.put_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 876 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 877 |  |  |         project = task['project'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 878 |  |  |         if old_task['status'] != self.taskdb.ACTIVE: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 879 |  |  |             self._cnt['5m'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 880 |  |  |             self._cnt['1h'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 881 |  |  |             self._cnt['1d'].event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 882 |  |  |         if old_task['status'] == self.taskdb.SUCCESS: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 883 |  |  |             self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 884 |  |  |         elif old_task['status'] == self.taskdb.FAILED: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 885 |  |  |             self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 886 |  |  |         logger.info('restart task %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 887 |  |  |         return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 888 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 889 |  |  |     def on_task_status(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 890 |  |  |         '''Called when a status pack is arrived''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 891 |  |  |         try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 892 |  |  |             procesok = task['track']['process']['ok'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 893 |  |  |             if not self.projects[task['project']].task_queue.done(task['taskid']): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 894 |  |  |                 logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 895 |  |  |                 return None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 896 |  |  |         except KeyError as e: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 897 |  |  |             logger.error("Bad status pack: %s", e) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 898 |  |  |             return None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 899 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 900 |  |  |         if procesok: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 901 |  |  |             ret = self.on_task_done(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 902 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 903 |  |  |             ret = self.on_task_failed(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 904 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 905 |  |  |         if task['track']['fetch'].get('time'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 906 |  |  |             self._cnt['5m_time'].event((task['project'], 'fetch_time'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 907 |  |  |                                        task['track']['fetch']['time']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 908 |  |  |         if task['track']['process'].get('time'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 909 |  |  |             self._cnt['5m_time'].event((task['project'], 'process_time'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 910 |  |  |                                        task['track']['process'].get('time')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 911 |  |  |         self.projects[task['project']].active_tasks.appendleft((time.time(), task)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 912 |  |  |         return ret | 
            
                                                                                                            
                            
            
                                    
            
            
                | 913 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 914 |  |  |     def on_task_done(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 915 |  |  |         '''Called when a task is done and success, called by `on_task_status`''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 916 |  |  |         task['status'] = self.taskdb.SUCCESS | 
            
                                                                                                            
                            
            
                                    
            
            
                | 917 |  |  |         task['lastcrawltime'] = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 918 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 919 |  |  |         if 'schedule' in task: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 920 |  |  |             if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 921 |  |  |                 task['status'] = self.taskdb.ACTIVE | 
            
                                                                                                            
                            
            
                                    
            
            
                | 922 |  |  |                 next_exetime = task['schedule'].get('age') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 923 |  |  |                 task['schedule']['exetime'] = time.time() + next_exetime | 
            
                                                                                                            
                            
            
                                    
            
            
                | 924 |  |  |                 self.put_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 925 |  |  |             else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 926 |  |  |                 del task['schedule'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 927 |  |  |         self.update_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 928 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 929 |  |  |         project = task['project'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 930 |  |  |         self._cnt['5m'].event((project, 'success'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 931 |  |  |         self._cnt['1h'].event((project, 'success'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 932 |  |  |         self._cnt['1d'].event((project, 'success'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 933 |  |  |         self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 934 |  |  |         logger.info('task done %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 935 |  |  |         return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 936 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 937 |  |  |     def on_task_failed(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 938 |  |  |         '''Called when a task is failed, called by `on_task_status`''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 939 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 940 |  |  |         if 'schedule' not in task: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 941 |  |  |             old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 942 |  |  |             if old_task is None: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 943 |  |  |                 logging.error('unknown status pack: %s' % task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 944 |  |  |                 return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 945 |  |  |             task['schedule'] = old_task.get('schedule', {}) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 946 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 947 |  |  |         retries = task['schedule'].get('retries', self.default_schedule['retries']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 948 |  |  |         retried = task['schedule'].get('retried', 0) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 949 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 950 |  |  |         project_info = self.projects[task['project']] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 951 |  |  |         retry_delay = project_info.retry_delay or self.DEFAULT_RETRY_DELAY | 
            
                                                                                                            
                            
            
                                    
            
            
                | 952 |  |  |         next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY[''])) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 953 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 954 |  |  |         if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 955 |  |  |             next_exetime = min(next_exetime, task['schedule'].get('age')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 956 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 957 |  |  |             if retried >= retries: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 958 |  |  |                 next_exetime = -1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 959 |  |  |             elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 960 |  |  |                 next_exetime = task['schedule'].get('age') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 961 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 962 |  |  |         if next_exetime < 0: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 963 |  |  |             task['status'] = self.taskdb.FAILED | 
            
                                                                                                            
                            
            
                                    
            
            
                | 964 |  |  |             task['lastcrawltime'] = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 965 |  |  |             self.update_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 966 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 967 |  |  |             project = task['project'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 968 |  |  |             self._cnt['5m'].event((project, 'failed'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 969 |  |  |             self._cnt['1h'].event((project, 'failed'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 970 |  |  |             self._cnt['1d'].event((project, 'failed'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 971 |  |  |             self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 972 |  |  |             logger.info('task failed %(project)s:%(taskid)s %(url)s' % task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 973 |  |  |             return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 974 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 975 |  |  |             task['schedule']['retried'] = retried + 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 976 |  |  |             task['schedule']['exetime'] = time.time() + next_exetime | 
            
                                                                                                            
                            
            
                                    
            
            
                | 977 |  |  |             task['lastcrawltime'] = time.time() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 978 |  |  |             self.update_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 979 |  |  |             self.put_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 980 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 981 |  |  |             project = task['project'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 982 |  |  |             self._cnt['5m'].event((project, 'retry'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 983 |  |  |             self._cnt['1h'].event((project, 'retry'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 984 |  |  |             self._cnt['1d'].event((project, 'retry'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 985 |  |  |             # self._cnt['all'].event((project, 'retry'), +1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 986 |  |  |             logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % ( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 987 |  |  |                 retried, retries), task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 988 |  |  |             return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 989 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 990 |  |  |     def on_select_task(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 991 |  |  |         '''Called when a task is selected to fetch & process''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 992 |  |  |         # inject informations about project | 
            
                                                                                                            
                            
            
                                    
            
            
                | 993 |  |  |         logger.info('select %(project)s:%(taskid)s %(url)s', task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 994 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 995 |  |  |         project_info = self.projects.get(task['project']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 996 |  |  |         assert project_info, 'no such project' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 997 |  |  |         task['type'] = self.TASK_PACK | 
            
                                                                                                            
                            
            
                                    
            
            
                | 998 |  |  |         task['group'] = project_info.group | 
            
                                                                                                            
                            
            
                                    
            
            
                | 999 |  |  |         task['project_md5sum'] = project_info.md5sum | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1000 |  |  |         task['project_updatetime'] = project_info.updatetime | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1001 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1002 |  |  |         # lazy join project.crawl_config | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1003 |  |  |         if getattr(project_info, 'crawl_config', None): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1004 |  |  |             task = BaseHandler.task_join_crawl_config(task, project_info.crawl_config) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1005 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1006 |  |  |         project_info.active_tasks.appendleft((time.time(), task)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1007 |  |  |         self.send_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1008 |  |  |         return task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1009 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1010 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1011 |  |  | from tornado import gen | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1012 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1013 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1014 |  |  | class OneScheduler(Scheduler): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1015 |  |  |     """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1016 |  |  |     Scheduler Mixin class for one mode | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1017 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1018 |  |  |     overwirted send_task method | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1019 |  |  |     call processor.on_task(fetcher.fetch(task)) instead of consuming queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1020 |  |  |     """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1021 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1022 |  |  |     def _check_select(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1023 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1024 |  |  |         interactive mode of select tasks | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1025 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1026 |  |  |         if not self.interactive: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1027 |  |  |             return super(OneScheduler, self)._check_select() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1028 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1029 |  |  |         # waiting for running tasks | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1030 |  |  |         if self.running_task > 0: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1031 |  |  |             return | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1032 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1033 |  |  |         is_crawled = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1034 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1035 |  |  |         def run(project=None): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1036 |  |  |             return crawl('on_start', project=project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1037 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1038 |  |  |         def crawl(url, project=None, **kwargs): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1039 |  |  |             """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1040 |  |  |             Crawl given url, same parameters as BaseHandler.crawl | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1041 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1042 |  |  |             url - url or taskid, parameters will be used if in taskdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1043 |  |  |             project - can be ignored if only one project exists. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1044 |  |  |             """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1045 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1046 |  |  |             # looking up the project instance | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1047 |  |  |             if project is None: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1048 |  |  |                 if len(self.projects) == 1: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1049 |  |  |                     project = list(self.projects.keys())[0] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1050 |  |  |                 else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1051 |  |  |                     raise LookupError('You need specify the project: %r' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1052 |  |  |                                       % list(self.projects.keys())) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1053 |  |  |             project_data = self.processor.project_manager.get(project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1054 |  |  |             if not project_data: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1055 |  |  |                 raise LookupError('no such project: %s' % project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1056 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1057 |  |  |             # get task package | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1058 |  |  |             instance = project_data['instance'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1059 |  |  |             instance._reset() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1060 |  |  |             task = instance.crawl(url, **kwargs) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1061 |  |  |             if isinstance(task, list): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1062 |  |  |                 raise Exception('url list is not allowed in interactive mode') | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1063 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1064 |  |  |             # check task in taskdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1065 |  |  |             if not kwargs: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1066 |  |  |                 dbtask = self.taskdb.get_task(task['project'], task['taskid'], | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1067 |  |  |                                               fields=self.request_task_fields) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1068 |  |  |                 if not dbtask: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1069 |  |  |                     dbtask = self.taskdb.get_task(task['project'], task['url'], | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1070 |  |  |                                                   fields=self.request_task_fields) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1071 |  |  |                 if dbtask: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1072 |  |  |                     task = dbtask | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1073 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1074 |  |  |             # select the task | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1075 |  |  |             self.on_select_task(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1076 |  |  |             is_crawled.append(True) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1077 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1078 |  |  |             shell.ask_exit() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1079 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1080 |  |  |         def quit_interactive(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1081 |  |  |             '''Quit interactive mode''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1082 |  |  |             is_crawled.append(True) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1083 |  |  |             self.interactive = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1084 |  |  |             shell.ask_exit() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1085 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1086 |  |  |         def quit_pyspider(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1087 |  |  |             '''Close pyspider''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1088 |  |  |             is_crawled[:] = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1089 |  |  |             shell.ask_exit() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1090 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1091 |  |  |         shell = utils.get_python_console() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1092 |  |  |         banner = ( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1093 |  |  |             'pyspider shell - Select task\n' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1094 |  |  |             'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1095 |  |  |             'quit_interactive() - Quit interactive mode\n' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1096 |  |  |             'quit_pyspider() - Close pyspider' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1097 |  |  |         ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1098 |  |  |         if hasattr(shell, 'show_banner'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1099 |  |  |             shell.show_banner(banner) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1100 |  |  |             shell.interact() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1101 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1102 |  |  |             shell.interact(banner) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1103 |  |  |         if not is_crawled: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1104 |  |  |             self.ioloop.add_callback(self.ioloop.stop) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1105 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1106 |  |  |     def __getattr__(self, name): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1107 |  |  |         """patch for crawl(url, callback=self.index_page) API""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1108 |  |  |         if self.interactive: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1109 |  |  |             return name | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1110 |  |  |         raise AttributeError(name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1111 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1112 |  |  |     def on_task_status(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1113 |  |  |         """Ignore not processing error in interactive mode""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1114 |  |  |         if not self.interactive: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1115 |  |  |             super(OneScheduler, self).on_task_status(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1116 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1117 |  |  |         try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1118 |  |  |             procesok = task['track']['process']['ok'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1119 |  |  |         except KeyError as e: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1120 |  |  |             logger.error("Bad status pack: %s", e) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1121 |  |  |             return None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1122 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1123 |  |  |         if procesok: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1124 |  |  |             ret = self.on_task_done(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1125 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1126 |  |  |             ret = self.on_task_failed(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1127 |  |  |         if task['track']['fetch'].get('time'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1128 |  |  |             self._cnt['5m_time'].event((task['project'], 'fetch_time'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1129 |  |  |                                        task['track']['fetch']['time']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1130 |  |  |         if task['track']['process'].get('time'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1131 |  |  |             self._cnt['5m_time'].event((task['project'], 'process_time'), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1132 |  |  |                                        task['track']['process'].get('time')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1133 |  |  |         self.projects[task['project']].active_tasks.appendleft((time.time(), task)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1134 |  |  |         return ret | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1135 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1136 |  |  |     def init_one(self, ioloop, fetcher, processor, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1137 |  |  |                  result_worker=None, interactive=False): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1138 |  |  |         self.ioloop = ioloop | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1139 |  |  |         self.fetcher = fetcher | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1140 |  |  |         self.processor = processor | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1141 |  |  |         self.result_worker = result_worker | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1142 |  |  |         self.interactive = interactive | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1143 |  |  |         self.running_task = 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1144 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1145 |  |  |     @gen.coroutine | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1146 |  |  |     def do_task(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1147 |  |  |         self.running_task += 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1148 |  |  |         result = yield gen.Task(self.fetcher.fetch, task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1149 |  |  |         type, task, response = result.args | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1150 |  |  |         self.processor.on_task(task, response) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1151 |  |  |         # do with message | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1152 |  |  |         while not self.processor.inqueue.empty(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1153 |  |  |             _task, _response = self.processor.inqueue.get() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1154 |  |  |             self.processor.on_task(_task, _response) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1155 |  |  |         # do with results | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1156 |  |  |         while not self.processor.result_queue.empty(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1157 |  |  |             _task, _result = self.processor.result_queue.get() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1158 |  |  |             if self.result_worker: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1159 |  |  |                 self.result_worker.on_result(_task, _result) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1160 |  |  |         self.running_task -= 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1161 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1162 |  |  |     def send_task(self, task, force=True): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1163 |  |  |         if self.fetcher.http_client.free_size() <= 0: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1164 |  |  |             if force: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1165 |  |  |                 self._send_buffer.appendleft(task) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1166 |  |  |             else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1167 |  |  |                 raise self.outqueue.Full | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1168 |  |  |         self.ioloop.add_future(self.do_task(task), lambda x: x.result()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1169 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1170 |  |  |     def run(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1171 |  |  |         import tornado.ioloop | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1172 |  |  |         tornado.ioloop.PeriodicCallback(self.run_once, 100, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1173 |  |  |                                         io_loop=self.ioloop).start() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1174 |  |  |         self.ioloop.start() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1175 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1176 |  |  |     def quit(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1177 |  |  |         self.ioloop.stop() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1178 |  |  |         logger.info("scheduler exiting...") | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1179 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1180 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1181 |  |  | import random | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1182 |  |  | import threading | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1183 |  |  | from pyspider.database.sqlite.sqlitebase import SQLiteMixin | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1184 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1185 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1186 |  |  | class ThreadBaseScheduler(Scheduler): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1187 |  |  |     def __init__(self, threads=4, *args, **kwargs): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1188 |  |  |         self.local = threading.local() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1189 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1190 |  |  |         super(ThreadBaseScheduler, self).__init__(*args, **kwargs) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1191 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1192 |  |  |         if isinstance(self.taskdb, SQLiteMixin): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1193 |  |  |             self.threads = 1 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1194 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1195 |  |  |             self.threads = threads | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1196 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1197 |  |  |         self._taskdb = self.taskdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1198 |  |  |         self._projectdb = self.projectdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1199 |  |  |         self._resultdb = self.resultdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1200 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1201 |  |  |         self.thread_objs = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1202 |  |  |         self.thread_queues = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1203 |  |  |         self._start_threads() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1204 |  |  |         assert len(self.thread_queues) > 0 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1205 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1206 |  |  |     @property | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1207 |  |  |     def taskdb(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1208 |  |  |         if not hasattr(self.local, 'taskdb'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1209 |  |  |             self.taskdb = self._taskdb.copy() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1210 |  |  |         return self.local.taskdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1211 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1212 |  |  |     @taskdb.setter | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1213 |  |  |     def taskdb(self, taskdb): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1214 |  |  |         self.local.taskdb = taskdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1215 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1216 |  |  |     @property | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1217 |  |  |     def projectdb(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1218 |  |  |         if not hasattr(self.local, 'projectdb'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1219 |  |  |             self.projectdb = self._projectdb.copy() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1220 |  |  |         return self.local.projectdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1221 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1222 |  |  |     @projectdb.setter | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1223 |  |  |     def projectdb(self, projectdb): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1224 |  |  |         self.local.projectdb = projectdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1225 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1226 |  |  |     @property | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1227 |  |  |     def resultdb(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1228 |  |  |         if not hasattr(self.local, 'resultdb'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1229 |  |  |             self.resultdb = self._resultdb.copy() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1230 |  |  |         return self.local.resultdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1231 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1232 |  |  |     @resultdb.setter | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1233 |  |  |     def resultdb(self, resultdb): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1234 |  |  |         self.local.resultdb = resultdb | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1235 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1236 |  |  |     def _start_threads(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1237 |  |  |         for i in range(self.threads): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1238 |  |  |             queue = Queue.Queue() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1239 |  |  |             thread = threading.Thread(target=self._thread_worker, args=(queue, )) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1240 |  |  |             thread.daemon = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1241 |  |  |             thread.start() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1242 |  |  |             self.thread_objs.append(thread) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1243 |  |  |             self.thread_queues.append(queue) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1244 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1245 |  |  |     def _thread_worker(self, queue): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1246 |  |  |         while True: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1247 |  |  |             method, args, kwargs = queue.get() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1248 |  |  |             try: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1249 |  |  |                 method(*args, **kwargs) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1250 |  |  |             except Exception as e: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1251 |  |  |                 logger.exception(e) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1252 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1253 |  |  |     def _run_in_thread(self, method, *args, **kwargs): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1254 |  |  |         i = kwargs.pop('_i', None) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1255 |  |  |         block = kwargs.pop('_block', False) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1256 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1257 |  |  |         if i is None: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1258 |  |  |             while True: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1259 |  |  |                 for queue in self.thread_queues: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1260 |  |  |                     if queue.empty(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1261 |  |  |                         break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1262 |  |  |                 else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1263 |  |  |                     if block: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1264 |  |  |                         time.sleep(0.1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1265 |  |  |                         continue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1266 |  |  |                     else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1267 |  |  |                         queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1268 |  |  |                 break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1269 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1270 |  |  |             queue = self.thread_queues[i % len(self.thread_queues)] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1271 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1272 |  |  |         queue.put((method, args, kwargs)) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1273 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1274 |  |  |         if block: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1275 |  |  |             self._wait_thread() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1276 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1277 |  |  |     def _wait_thread(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1278 |  |  |         while True: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1279 |  |  |             if all(queue.empty() for queue in self.thread_queues): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1280 |  |  |                 break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1281 |  |  |             time.sleep(0.1) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1282 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1283 |  |  |     def _update_project(self, project): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1284 |  |  |         self._run_in_thread(Scheduler._update_project, self, project) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1285 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1286 |  |  |     def on_task_status(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1287 |  |  |         i = hash(task['taskid']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1288 |  |  |         self._run_in_thread(Scheduler.on_task_status, self, task, _i=i) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1289 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1290 |  |  |     def on_request(self, task): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1291 |  |  |         i = hash(task['taskid']) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1292 |  |  |         self._run_in_thread(Scheduler.on_request, self, task, _i=i) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1293 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1294 |  |  |     def _load_put_task(self, project, taskid): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1295 |  |  |         i = hash(taskid) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1296 |  |  |         self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1297 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1298 |  |  |     def run_once(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 1299 |  |  |         super(ThreadBaseScheduler, self).run_once() | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 1300 |  |  |         self._wait_thread() | 
            
                                                        
            
                                    
            
            
                | 1301 |  |  |  |