| Total Complexity | 102 | 
| Total Lines | 467 | 
| Duplicated Lines | 0 % | 
Complex classes like pyspider.fetcher.Fetcher often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | #!/usr/bin/env python  | 
            ||
| 59 | class Fetcher(object):  | 
            ||
| 60 | user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__  | 
            ||
| 61 |     default_options = { | 
            ||
| 62 | 'method': 'GET',  | 
            ||
| 63 |         'headers': { | 
            ||
| 64 | },  | 
            ||
| 65 | 'use_gzip': True,  | 
            ||
| 66 | 'timeout': 120,  | 
            ||
| 67 | }  | 
            ||
| 68 | phantomjs_proxy = None  | 
            ||
| 69 | |||
| 70 | def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):  | 
            ||
| 71 | self.inqueue = inqueue  | 
            ||
| 72 | self.outqueue = outqueue  | 
            ||
| 73 | |||
| 74 | self.poolsize = poolsize  | 
            ||
| 75 | self._running = False  | 
            ||
| 76 | self._quit = False  | 
            ||
| 77 | self.proxy = proxy  | 
            ||
| 78 | self.async = async  | 
            ||
| 79 | self.ioloop = tornado.ioloop.IOLoop()  | 
            ||
| 80 | |||
| 81 | # binding io_loop to http_client here  | 
            ||
| 82 | if self.async:  | 
            ||
| 83 | self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,  | 
            ||
| 84 | io_loop=self.ioloop)  | 
            ||
| 85 | else:  | 
            ||
| 86 | self.http_client = tornado.httpclient.HTTPClient(  | 
            ||
| 87 | MyCurlAsyncHTTPClient, max_clients=self.poolsize  | 
            ||
| 88 | )  | 
            ||
| 89 | |||
| 90 |         self._cnt = { | 
            ||
| 91 | '5m': counter.CounterManager(  | 
            ||
| 92 | lambda: counter.TimebaseAverageWindowCounter(30, 10)),  | 
            ||
| 93 | '1h': counter.CounterManager(  | 
            ||
| 94 | lambda: counter.TimebaseAverageWindowCounter(60, 60)),  | 
            ||
| 95 | }  | 
            ||
| 96 | |||
| 97 | def send_result(self, type, task, result):  | 
            ||
| 98 | '''Send fetch result to processor'''  | 
            ||
| 99 | if self.outqueue:  | 
            ||
| 100 | try:  | 
            ||
| 101 | self.outqueue.put((task, result))  | 
            ||
| 102 | except Exception as e:  | 
            ||
| 103 | logger.exception(e)  | 
            ||
| 104 | |||
| 105 | def fetch(self, task, callback=None):  | 
            ||
| 106 | '''Do one fetch'''  | 
            ||
| 107 |         url = task.get('url', 'data:,') | 
            ||
| 108 | if callback is None:  | 
            ||
| 109 | callback = self.send_result  | 
            ||
| 110 |         if url.startswith('data:'): | 
            ||
| 111 | return self.data_fetch(url, task, callback)  | 
            ||
| 112 |         elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'): | 
            ||
| 113 | return self.phantomjs_fetch(url, task, callback)  | 
            ||
| 114 | else:  | 
            ||
| 115 | return self.http_fetch(url, task, callback)  | 
            ||
| 116 | |||
| 117 | def sync_fetch(self, task):  | 
            ||
| 118 | '''Synchronization fetch'''  | 
            ||
| 119 | wait_result = threading.Condition()  | 
            ||
| 120 |         _result = {} | 
            ||
| 121 | |||
| 122 | def callback(type, task, result):  | 
            ||
| 123 | wait_result.acquire()  | 
            ||
| 124 | _result['type'] = type  | 
            ||
| 125 | _result['task'] = task  | 
            ||
| 126 | _result['result'] = result  | 
            ||
| 127 | wait_result.notify()  | 
            ||
| 128 | wait_result.release()  | 
            ||
| 129 | |||
| 130 | wait_result.acquire()  | 
            ||
| 131 | self.fetch(task, callback=callback)  | 
            ||
| 132 | while 'result' not in _result:  | 
            ||
| 133 | wait_result.wait()  | 
            ||
| 134 | wait_result.release()  | 
            ||
| 135 | return _result['result']  | 
            ||
| 136 | |||
| 137 | def data_fetch(self, url, task, callback):  | 
            ||
| 138 | '''A fake fetcher for dataurl'''  | 
            ||
| 139 |         self.on_fetch('data', task) | 
            ||
| 140 |         result = {} | 
            ||
| 141 | result['orig_url'] = url  | 
            ||
| 142 | result['content'] = dataurl.decode(url)  | 
            ||
| 143 |         result['headers'] = {} | 
            ||
| 144 | result['status_code'] = 200  | 
            ||
| 145 | result['url'] = url  | 
            ||
| 146 |         result['cookies'] = {} | 
            ||
| 147 | result['time'] = 0  | 
            ||
| 148 |         result['save'] = task.get('fetch', {}).get('save') | 
            ||
| 149 | if len(result['content']) < 70:  | 
            ||
| 150 |             logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url) | 
            ||
| 151 | else:  | 
            ||
| 152 | logger.info(  | 
            ||
| 153 | "[200] %s:%s data:,%s...[content:%d] 0s",  | 
            ||
| 154 |                 task.get('project'), task.get('taskid'), | 
            ||
| 155 | result['content'][:70],  | 
            ||
| 156 | len(result['content'])  | 
            ||
| 157 | )  | 
            ||
| 158 | |||
| 159 |         callback('data', task, result) | 
            ||
| 160 |         self.on_result('data', task, result) | 
            ||
| 161 | return task, result  | 
            ||
| 162 | |||
| 163 | def handle_error(self, type, url, task, start_time, callback, error):  | 
            ||
| 164 |         result = { | 
            ||
| 165 | 'status_code': getattr(error, 'code', 599),  | 
            ||
| 166 | 'error': utils.text(error),  | 
            ||
| 167 | 'content': "",  | 
            ||
| 168 | 'time': time.time() - start_time,  | 
            ||
| 169 | 'orig_url': url,  | 
            ||
| 170 | 'url': url,  | 
            ||
| 171 | }  | 
            ||
| 172 |         logger.error("[%d] %s:%s %s, %r %.2fs", | 
            ||
| 173 |                      result['status_code'], task.get('project'), task.get('taskid'), | 
            ||
| 174 | url, error, result['time'])  | 
            ||
| 175 | callback(type, task, result)  | 
            ||
| 176 | self.on_result(type, task, result)  | 
            ||
| 177 | return task, result  | 
            ||
| 178 | |||
| 179 | allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']  | 
            ||
| 180 | |||
| 181 | def http_fetch(self, url, task, callback):  | 
            ||
| 182 | '''HTTP fetcher'''  | 
            ||
| 183 | start_time = time.time()  | 
            ||
| 184 | |||
| 185 |         self.on_fetch('http', task) | 
            ||
| 186 | fetch = copy.deepcopy(self.default_options)  | 
            ||
| 187 | fetch['url'] = url  | 
            ||
| 188 | fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])  | 
            ||
| 189 | fetch['headers']['User-Agent'] = self.user_agent  | 
            ||
| 190 |         task_fetch = task.get('fetch', {}) | 
            ||
| 191 | for each in self.allowed_options:  | 
            ||
| 192 | if each in task_fetch:  | 
            ||
| 193 | fetch[each] = task_fetch[each]  | 
            ||
| 194 |         fetch['headers'].update(task_fetch.get('headers', {})) | 
            ||
| 195 | |||
| 196 |         if task.get('track'): | 
            ||
| 197 | track_headers = tornado.httputil.HTTPHeaders(  | 
            ||
| 198 |                 task.get('track', {}).get('fetch', {}).get('headers') or {}) | 
            ||
| 199 |             track_ok = task.get('track', {}).get('process', {}).get('ok', False) | 
            ||
| 200 | else:  | 
            ||
| 201 |             track_headers = {} | 
            ||
| 202 | track_ok = False  | 
            ||
| 203 | # proxy  | 
            ||
| 204 | proxy_string = None  | 
            ||
| 205 |         if isinstance(task_fetch.get('proxy'), six.string_types): | 
            ||
| 206 | proxy_string = task_fetch['proxy']  | 
            ||
| 207 |         elif self.proxy and task_fetch.get('proxy', True): | 
            ||
| 208 | proxy_string = self.proxy  | 
            ||
| 209 | if proxy_string:  | 
            ||
| 210 | if '://' not in proxy_string:  | 
            ||
| 211 | proxy_string = 'http://' + proxy_string  | 
            ||
| 212 | proxy_splited = urlsplit(proxy_string)  | 
            ||
| 213 | if proxy_splited.username:  | 
            ||
| 214 | fetch['proxy_username'] = proxy_splited.username  | 
            ||
| 215 | if six.PY2:  | 
            ||
| 216 |                     fetch['proxy_username'] = fetch['proxy_username'].encode('utf8') | 
            ||
| 217 | if proxy_splited.password:  | 
            ||
| 218 | fetch['proxy_password'] = proxy_splited.password  | 
            ||
| 219 | if six.PY2:  | 
            ||
| 220 |                     fetch['proxy_password'] = fetch['proxy_password'].encode('utf8') | 
            ||
| 221 |             fetch['proxy_host'] = proxy_splited.hostname.encode('utf8') | 
            ||
| 222 | if six.PY2:  | 
            ||
| 223 |                 fetch['proxy_host'] = fetch['proxy_host'].encode('utf8') | 
            ||
| 224 | fetch['proxy_port'] = proxy_splited.port or 8080  | 
            ||
| 225 | |||
| 226 | # etag  | 
            ||
| 227 |         if task_fetch.get('etag', True): | 
            ||
| 228 | _t = None  | 
            ||
| 229 |             if isinstance(task_fetch.get('etag'), six.string_types): | 
            ||
| 230 |                 _t = task_fetch.get('etag') | 
            ||
| 231 | elif track_ok:  | 
            ||
| 232 |                 _t = track_headers.get('etag') | 
            ||
| 233 | if _t:  | 
            ||
| 234 |                 fetch['headers'].setdefault('If-None-Match', _t) | 
            ||
| 235 | # last modifed  | 
            ||
| 236 |         if task_fetch.get('last_modified', True): | 
            ||
| 237 | _t = None  | 
            ||
| 238 |             if isinstance(task_fetch.get('last_modifed'), six.string_types): | 
            ||
| 239 |                 _t = task_fetch.get('last_modifed') | 
            ||
| 240 | elif track_ok:  | 
            ||
| 241 |                 _t = track_headers.get('last-modified') | 
            ||
| 242 | if _t:  | 
            ||
| 243 |                 fetch['headers'].setdefault('If-Modified-Since', _t) | 
            ||
| 244 | |||
| 245 | session = cookies.RequestsCookieJar()  | 
            ||
| 246 | |||
| 247 | # fix for tornado request obj  | 
            ||
| 248 | if 'Cookie' in fetch['headers']:  | 
            ||
| 249 | c = http_cookies.SimpleCookie()  | 
            ||
| 250 | try:  | 
            ||
| 251 | c.load(fetch['headers']['Cookie'])  | 
            ||
| 252 | except AttributeError:  | 
            ||
| 253 | c.load(utils.utf8(fetch['headers']['Cookie']))  | 
            ||
| 254 | for key in c:  | 
            ||
| 255 | session.set(key, c[key])  | 
            ||
| 256 | del fetch['headers']['Cookie']  | 
            ||
| 257 | fetch['follow_redirects'] = False  | 
            ||
| 258 | if 'timeout' in fetch:  | 
            ||
| 259 | fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']  | 
            ||
| 260 | del fetch['timeout']  | 
            ||
| 261 | if 'data' in fetch:  | 
            ||
| 262 | fetch['body'] = fetch['data']  | 
            ||
| 263 | del fetch['data']  | 
            ||
| 264 | if 'cookies' in fetch:  | 
            ||
| 265 | session.update(fetch['cookies'])  | 
            ||
| 266 | del fetch['cookies']  | 
            ||
| 267 | |||
| 268 |         store = {} | 
            ||
| 269 |         store['max_redirects'] = task_fetch.get('max_redirects', 5) | 
            ||
| 270 | |||
| 271 | def handle_response(response):  | 
            ||
| 272 | extract_cookies_to_jar(session, response.request, response.headers)  | 
            ||
| 273 | if (response.code in (301, 302, 303, 307)  | 
            ||
| 274 |                     and response.headers.get('Location') | 
            ||
| 275 |                     and task_fetch.get('allow_redirects', True)): | 
            ||
| 276 | if store['max_redirects'] <= 0:  | 
            ||
| 277 | error = tornado.httpclient.HTTPError(  | 
            ||
| 278 |                         599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5), | 
            ||
| 279 | response)  | 
            ||
| 280 | return handle_error(error)  | 
            ||
| 281 | if response.code in (302, 303):  | 
            ||
| 282 | fetch['method'] = 'GET'  | 
            ||
| 283 | if 'body' in fetch:  | 
            ||
| 284 | del fetch['body']  | 
            ||
| 285 | fetch['url'] = urljoin(fetch['url'], response.headers['Location'])  | 
            ||
| 286 | fetch['request_timeout'] -= time.time() - start_time  | 
            ||
| 287 | if fetch['request_timeout'] < 0:  | 
            ||
| 288 | fetch['request_timeout'] = 0.1  | 
            ||
| 289 | fetch['connect_timeout'] = fetch['request_timeout']  | 
            ||
| 290 | store['max_redirects'] -= 1  | 
            ||
| 291 | return make_request(fetch)  | 
            ||
| 292 | |||
| 293 |             result = {} | 
            ||
| 294 | result['orig_url'] = url  | 
            ||
| 295 | result['content'] = response.body or ''  | 
            ||
| 296 | result['headers'] = dict(response.headers)  | 
            ||
| 297 | result['status_code'] = response.code  | 
            ||
| 298 | result['url'] = response.effective_url or url  | 
            ||
| 299 | result['cookies'] = session.get_dict()  | 
            ||
| 300 | result['time'] = time.time() - start_time  | 
            ||
| 301 |             result['save'] = task_fetch.get('save') | 
            ||
| 302 | if response.error:  | 
            ||
| 303 | result['error'] = utils.text(response.error)  | 
            ||
| 304 | if 200 <= response.code < 300:  | 
            ||
| 305 |                 logger.info("[%d] %s:%s %s %.2fs", response.code, | 
            ||
| 306 |                             task.get('project'), task.get('taskid'), | 
            ||
| 307 | url, result['time'])  | 
            ||
| 308 | else:  | 
            ||
| 309 |                 logger.warning("[%d] %s:%s %s %.2fs", response.code, | 
            ||
| 310 |                                task.get('project'), task.get('taskid'), | 
            ||
| 311 | url, result['time'])  | 
            ||
| 312 |             callback('http', task, result) | 
            ||
| 313 |             self.on_result('http', task, result) | 
            ||
| 314 | return task, result  | 
            ||
| 315 | |||
| 316 |         handle_error = lambda x: self.handle_error('http', | 
            ||
| 317 | url, task, start_time, callback, x)  | 
            ||
| 318 | |||
| 319 | def make_request(fetch):  | 
            ||
| 320 | try:  | 
            ||
| 321 | request = tornado.httpclient.HTTPRequest(**fetch)  | 
            ||
| 322 | cookie_header = cookies.get_cookie_header(session, request)  | 
            ||
| 323 | if cookie_header:  | 
            ||
| 324 | request.headers['Cookie'] = cookie_header  | 
            ||
| 325 | if self.async:  | 
            ||
| 326 | self.http_client.fetch(request, handle_response)  | 
            ||
| 327 | else:  | 
            ||
| 328 | return handle_response(self.http_client.fetch(request))  | 
            ||
| 329 | except tornado.httpclient.HTTPError as e:  | 
            ||
| 330 | if e.response:  | 
            ||
| 331 | return handle_response(e.response)  | 
            ||
| 332 | else:  | 
            ||
| 333 | return handle_error(e)  | 
            ||
| 334 | except Exception as e:  | 
            ||
| 335 | logger.exception(fetch)  | 
            ||
| 336 | return handle_error(e)  | 
            ||
| 337 | |||
| 338 | return make_request(fetch)  | 
            ||
| 339 | |||
| 340 | def phantomjs_fetch(self, url, task, callback):  | 
            ||
| 341 | '''Fetch with phantomjs proxy'''  | 
            ||
| 342 | start_time = time.time()  | 
            ||
| 343 | |||
| 344 |         self.on_fetch('phantomjs', task) | 
            ||
| 345 | if not self.phantomjs_proxy:  | 
            ||
| 346 |             result = { | 
            ||
| 347 | "orig_url": url,  | 
            ||
| 348 | "content": "phantomjs is not enabled.",  | 
            ||
| 349 |                 "headers": {}, | 
            ||
| 350 | "status_code": 501,  | 
            ||
| 351 | "url": url,  | 
            ||
| 352 |                 "cookies": {}, | 
            ||
| 353 | "time": 0,  | 
            ||
| 354 |                 "save": task.get('fetch', {}).get('save') | 
            ||
| 355 | }  | 
            ||
| 356 |             logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url) | 
            ||
| 357 |             callback('http', task, result) | 
            ||
| 358 |             self.on_result('http', task, result) | 
            ||
| 359 | return task, result  | 
            ||
| 360 | |||
| 361 |         request_conf = { | 
            ||
| 362 | 'follow_redirects': False  | 
            ||
| 363 | }  | 
            ||
| 364 | |||
| 365 | fetch = copy.deepcopy(self.default_options)  | 
            ||
| 366 | fetch['url'] = url  | 
            ||
| 367 | fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])  | 
            ||
| 368 | fetch['headers']['User-Agent'] = self.user_agent  | 
            ||
| 369 |         task_fetch = task.get('fetch', {}) | 
            ||
| 370 | for each in task_fetch:  | 
            ||
| 371 | if each != 'headers':  | 
            ||
| 372 | fetch[each] = task_fetch[each]  | 
            ||
| 373 |         fetch['headers'].update(task_fetch.get('headers', {})) | 
            ||
| 374 | |||
| 375 | if 'timeout' in fetch:  | 
            ||
| 376 | request_conf['connect_timeout'] = fetch['timeout']  | 
            ||
| 377 | request_conf['request_timeout'] = fetch['timeout'] + 1  | 
            ||
| 378 | |||
| 379 | session = cookies.RequestsCookieJar()  | 
            ||
| 380 | request = tornado.httpclient.HTTPRequest(url=fetch['url'])  | 
            ||
| 381 |         if fetch.get('cookies'): | 
            ||
| 382 | session.update(fetch['cookies'])  | 
            ||
| 383 | if 'Cookie' in request.headers:  | 
            ||
| 384 | del request.headers['Cookie']  | 
            ||
| 385 | fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)  | 
            ||
| 386 | |||
| 387 | def handle_response(response):  | 
            ||
| 388 | if not response.body:  | 
            ||
| 389 |                 return handle_error(Exception('no response from phantomjs')) | 
            ||
| 390 | |||
| 391 | try:  | 
            ||
| 392 | result = json.loads(utils.text(response.body))  | 
            ||
| 393 | if response.error:  | 
            ||
| 394 | result['error'] = utils.text(response.error)  | 
            ||
| 395 | except Exception as e:  | 
            ||
| 396 | return handle_error(e)  | 
            ||
| 397 | |||
| 398 |             if result.get('status_code', 200): | 
            ||
| 399 |                 logger.info("[%d] %s:%s %s %.2fs", result['status_code'], | 
            ||
| 400 |                             task.get('project'), task.get('taskid'), url, result['time']) | 
            ||
| 401 | else:  | 
            ||
| 402 |                 logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'], | 
            ||
| 403 |                              task.get('project'), task.get('taskid'), | 
            ||
| 404 | url, result['content'], result['time'])  | 
            ||
| 405 |             callback('phantomjs', task, result) | 
            ||
| 406 |             self.on_result('phantomjs', task, result) | 
            ||
| 407 | return task, result  | 
            ||
| 408 | |||
| 409 |         handle_error = lambda x: self.handle_error('phantomjs', | 
            ||
| 410 | url, task, start_time, callback, x)  | 
            ||
| 411 | |||
| 412 | fetch['headers'] = dict(fetch['headers'])  | 
            ||
| 413 | try:  | 
            ||
| 414 | request = tornado.httpclient.HTTPRequest(  | 
            ||
| 415 | url="%s" % self.phantomjs_proxy, method="POST",  | 
            ||
| 416 | body=json.dumps(fetch), **request_conf)  | 
            ||
| 417 | if self.async:  | 
            ||
| 418 | self.http_client.fetch(request, handle_response)  | 
            ||
| 419 | else:  | 
            ||
| 420 | return handle_response(self.http_client.fetch(request))  | 
            ||
| 421 | except tornado.httpclient.HTTPError as e:  | 
            ||
| 422 | if e.response:  | 
            ||
| 423 | return handle_response(e.response)  | 
            ||
| 424 | else:  | 
            ||
| 425 | return handle_error(e)  | 
            ||
| 426 | except Exception as e:  | 
            ||
| 427 | return handle_error(e)  | 
            ||
| 428 | |||
| 429 | def run(self):  | 
            ||
| 430 | '''Run loop'''  | 
            ||
| 431 |         logger.info("fetcher starting...") | 
            ||
| 432 | |||
| 433 | def queue_loop():  | 
            ||
| 434 | if not self.outqueue or not self.inqueue:  | 
            ||
| 435 | return  | 
            ||
| 436 | while not self._quit:  | 
            ||
| 437 | try:  | 
            ||
| 438 | if self.outqueue.full():  | 
            ||
| 439 | break  | 
            ||
| 440 | if self.http_client.free_size() <= 0:  | 
            ||
| 441 | break  | 
            ||
| 442 | task = self.inqueue.get_nowait()  | 
            ||
| 443 | # FIXME: decode unicode_obj should used after data selete from  | 
            ||
| 444 | # database, it's used here for performance  | 
            ||
| 445 | task = utils.decode_unicode_obj(task)  | 
            ||
| 446 | self.fetch(task)  | 
            ||
| 447 | except queue.Empty:  | 
            ||
| 448 | break  | 
            ||
| 449 | except KeyboardInterrupt:  | 
            ||
| 450 | break  | 
            ||
| 451 | except Exception as e:  | 
            ||
| 452 | logger.exception(e)  | 
            ||
| 453 | break  | 
            ||
| 454 | |||
| 455 | tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()  | 
            ||
| 456 | self._running = True  | 
            ||
| 457 | |||
| 458 | try:  | 
            ||
| 459 | self.ioloop.start()  | 
            ||
| 460 | except KeyboardInterrupt:  | 
            ||
| 461 | pass  | 
            ||
| 462 | |||
| 463 |         logger.info("fetcher exiting...") | 
            ||
| 464 | |||
| 465 | def quit(self):  | 
            ||
| 466 | '''Quit fetcher'''  | 
            ||
| 467 | self._running = False  | 
            ||
| 468 | self._quit = True  | 
            ||
| 469 | self.ioloop.stop()  | 
            ||
| 470 | |||
| 471 | def size(self):  | 
            ||
| 472 | return self.http_client.size()  | 
            ||
| 473 | |||
| 474 | def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):  | 
            ||
| 475 | '''Run xmlrpc server'''  | 
            ||
| 476 | import umsgpack  | 
            ||
| 477 | try:  | 
            ||
| 478 | from xmlrpc.server import SimpleXMLRPCServer  | 
            ||
| 479 | from xmlrpc.client import Binary  | 
            ||
| 480 | except ImportError:  | 
            ||
| 481 | from SimpleXMLRPCServer import SimpleXMLRPCServer  | 
            ||
| 482 | from xmlrpclib import Binary  | 
            ||
| 483 | |||
| 484 | server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)  | 
            ||
| 485 | server.register_introspection_functions()  | 
            ||
| 486 | server.register_multicall_functions()  | 
            ||
| 487 | |||
| 488 | server.register_function(self.quit, '_quit')  | 
            ||
| 489 | server.register_function(self.size)  | 
            ||
| 490 | |||
| 491 | def sync_fetch(task):  | 
            ||
| 492 | result = self.sync_fetch(task)  | 
            ||
| 493 | result = Binary(umsgpack.packb(result))  | 
            ||
| 494 | return result  | 
            ||
| 495 | server.register_function(sync_fetch, 'fetch')  | 
            ||
| 496 | |||
| 497 | def dump_counter(_time, _type):  | 
            ||
| 498 | return self._cnt[_time].to_dict(_type)  | 
            ||
| 499 | server.register_function(dump_counter, 'counter')  | 
            ||
| 500 | |||
| 501 | server.timeout = 0.5  | 
            ||
| 502 | while not self._quit:  | 
            ||
| 503 | server.handle_request()  | 
            ||
| 504 | server.server_close()  | 
            ||
| 505 | |||
| 506 | def on_fetch(self, type, task):  | 
            ||
| 507 | '''Called before task fetch'''  | 
            ||
| 508 | pass  | 
            ||
| 509 | |||
| 510 | def on_result(self, type, task, result):  | 
            ||
| 511 | '''Called after task fetched'''  | 
            ||
| 512 |         status_code = result.get('status_code', 599) | 
            ||
| 513 | if status_code != 599:  | 
            ||
| 514 | status_code = (int(status_code) / 100 * 100)  | 
            ||
| 515 |         self._cnt['5m'].event((task.get('project'), status_code), +1) | 
            ||
| 516 |         self._cnt['1h'].event((task.get('project'), status_code), +1) | 
            ||
| 517 | |||
| 518 |         if type == 'http' and result.get('time'): | 
            ||
| 519 |             content_len = len(result.get('content', '')) | 
            ||
| 520 |             self._cnt['5m'].event((task.get('project'), 'speed'), | 
            ||
| 521 |                                   float(content_len) / result.get('time')) | 
            ||
| 522 |             self._cnt['1h'].event((task.get('project'), 'speed'), | 
            ||
| 523 |                                   float(content_len) / result.get('time')) | 
            ||
| 524 |             self._cnt['5m'].event((task.get('project'), 'time'), result.get('time')) | 
            ||
| 525 |             self._cnt['1h'].event((task.get('project'), 'time'), result.get('time')) | 
            ||
| 526 |