Completed
Push — ghost.py ( a374b0 )
by Roy
04:41 queued 02:49
created

pyspider.fetcher.Fetcher.phantomjs_fetch()   F

Complexity

Conditions 17

Size

Total Lines 88

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 17
dl 0
loc 88
rs 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pyspider.fetcher.Fetcher.phantomjs_fetch() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import threading
16
import tornado.ioloop
17
import tornado.httputil
18
import tornado.httpclient
19
import pyspider
20
21
from six.moves import queue, http_cookies
22
from requests import cookies
23
from six.moves.urllib.parse import urljoin, urlsplit
24
from tornado.curl_httpclient import CurlAsyncHTTPClient
25
from tornado.simple_httpclient import SimpleAsyncHTTPClient
26
from pyspider.libs import utils, dataurl, counter
27
from .cookie_utils import extract_cookies_to_jar
28
logger = logging.getLogger('fetcher')
29
30
try:
31
    from ghost import Ghost, TimeoutError
32
except ImportError:
33
    Ghost = None
34
    TimeoutError = None
35
36
37
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
38
39
    def free_size(self):
40
        return len(self._free_list)
41
42
    def size(self):
43
        return len(self._curls) - self.free_size()
44
45
46
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
47
48
    def free_size(self):
49
        return self.max_clients - self.size()
50
51
    def size(self):
52
        return len(self.active)
53
54
fetcher_output = {
55
    "status_code": int,
56
    "orig_url": str,
57
    "url": str,
58
    "headers": dict,
59
    "content": str,
60
    "cookies": dict,
61
}
62
63
64
class Fetcher(object):
65
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
66
    default_options = {
67
        'method': 'GET',
68
        'headers': {
69
        },
70
        'use_gzip': True,
71
        'timeout': 120,
72
    }
73
    phantomjs_proxy = None
74
75
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
76
        self.inqueue = inqueue
77
        self.outqueue = outqueue
78
79
        self.poolsize = poolsize
80
        self._running = False
81
        self._quit = False
82
        self.proxy = proxy
83
        self.async = async
84
        self.ioloop = tornado.ioloop.IOLoop()
85
        if Ghost:
86
            self.ghost = Ghost()
87
        else:
88
            self.ghost = None
89
90
        # binding io_loop to http_client here
91
        if self.async:
92
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
93
                                                     io_loop=self.ioloop)
94
        else:
95
            self.http_client = tornado.httpclient.HTTPClient(
96
                MyCurlAsyncHTTPClient, max_clients=self.poolsize
97
            )
98
99
        self._cnt = {
100
            '5m': counter.CounterManager(
101
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
102
            '1h': counter.CounterManager(
103
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
104
        }
105
106
    def send_result(self, type, task, result):
107
        '''Send fetch result to processor'''
108
        if self.outqueue:
109
            try:
110
                self.outqueue.put((task, result))
111
            except Exception as e:
112
                logger.exception(e)
113
114
    def fetch(self, task, callback=None):
115
        '''Do one fetch'''
116
        url = task.get('url', 'data:,')
117
        if callback is None:
118
            callback = self.send_result
119
        if url.startswith('data:'):
120
            return self.data_fetch(url, task, callback)
121
        elif task.get('fetch', {}).get('fetch_type') in ('js', ):
122
            if self.ghost:
123
                return self.ghost_fetch(url, task, callback)
124
            else:
125
                return self.phantomjs_fetch(url, task, callback)
126
        elif task.get('fetch', {}).get('fetch_type') in ('ghost', ):
127
            return self.ghost_fetch(url, task, callback)
128
        elif task.get('fetch', {}).get('fetch_type') in ('phantomjs', ):
129
            return self.phantomjs_fetch(url, task, callback)
130
        else:
131
            return self.http_fetch(url, task, callback)
132
133
    def sync_fetch(self, task):
134
        '''Synchronization fetch'''
135
        wait_result = threading.Condition()
136
        _result = {}
137
138
        def callback(type, task, result):
139
            wait_result.acquire()
140
            _result['type'] = type
141
            _result['task'] = task
142
            _result['result'] = result
143
            wait_result.notify()
144
            wait_result.release()
145
146
        wait_result.acquire()
147
        self.fetch(task, callback=callback)
148
        while 'result' not in _result:
149
            wait_result.wait()
150
        wait_result.release()
151
        return _result['result']
152
153
    def data_fetch(self, url, task, callback):
154
        '''A fake fetcher for dataurl'''
155
        self.on_fetch('data', task)
156
        result = {}
157
        result['orig_url'] = url
158
        result['content'] = dataurl.decode(url)
159
        result['headers'] = {}
160
        result['status_code'] = 200
161
        result['url'] = url
162
        result['cookies'] = {}
163
        result['time'] = 0
164
        result['save'] = task.get('fetch', {}).get('save')
165
        if len(result['content']) < 70:
166
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
167
        else:
168
            logger.info(
169
                "[200] %s:%s data:,%s...[content:%d] 0s",
170
                task.get('project'), task.get('taskid'),
171
                result['content'][:70],
172
                len(result['content'])
173
            )
174
175
        callback('data', task, result)
176
        self.on_result('data', task, result)
177
        return task, result
178
179
    def handle_error(self, type, url, task, start_time, callback, error):
180
        result = {
181
            'status_code': getattr(error, 'code', 599),
182
            'error': utils.text(error),
183
            'content': "",
184
            'time': time.time() - start_time,
185
            'orig_url': url,
186
            'url': url,
187
        }
188
        logger.error("[%d] %s:%s %s, %r %.2fs",
189
                     result['status_code'], task.get('project'), task.get('taskid'),
190
                     url, error, result['time'])
191
        callback(type, task, result)
192
        self.on_result(type, task, result)
193
        return task, result
194
195
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
196
197
    def http_fetch(self, url, task, callback):
198
        '''HTTP fetcher'''
199
        start_time = time.time()
200
201
        self.on_fetch('http', task)
202
        fetch = copy.deepcopy(self.default_options)
203
        fetch['url'] = url
204
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
205
        fetch['headers']['User-Agent'] = self.user_agent
206
        task_fetch = task.get('fetch', {})
207
        for each in self.allowed_options:
208
            if each in task_fetch:
209
                fetch[each] = task_fetch[each]
210
        fetch['headers'].update(task_fetch.get('headers', {}))
211
212
        if task.get('track'):
213
            track_headers = tornado.httputil.HTTPHeaders(
214
                task.get('track', {}).get('fetch', {}).get('headers') or {})
215
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
216
        else:
217
            track_headers = {}
218
            track_ok = False
219
        # proxy
220
        proxy_string = None
221
        if isinstance(task_fetch.get('proxy'), six.string_types):
222
            proxy_string = task_fetch['proxy']
223
        elif self.proxy and task_fetch.get('proxy', True):
224
            proxy_string = self.proxy
225
        if proxy_string:
226
            if '://' not in proxy_string:
227
                proxy_string = 'http://' + proxy_string
228
            proxy_splited = urlsplit(proxy_string)
229
            if proxy_splited.username:
230
                fetch['proxy_username'] = proxy_splited.username
231
                if six.PY2:
232
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
233
            if proxy_splited.password:
234
                fetch['proxy_password'] = proxy_splited.password
235
                if six.PY2:
236
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
237
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
238
            if six.PY2:
239
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
240
            fetch['proxy_port'] = proxy_splited.port or 8080
241
242
        # etag
243
        if task_fetch.get('etag', True):
244
            _t = None
245
            if isinstance(task_fetch.get('etag'), six.string_types):
246
                _t = task_fetch.get('etag')
247
            elif track_ok:
248
                _t = track_headers.get('etag')
249
            if _t and 'If-None-Match' not in fetch['headers']:
250
                fetch['headers']['If-None-Match'] = _t
251
        # last modifed
252
        if task_fetch.get('last_modified', True):
253
            _t = None
254
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
255
                _t = task_fetch.get('last_modifed')
256
            elif track_ok:
257
                _t = track_headers.get('last-modified')
258
            if _t and 'If-Modified-Since' not in fetch['headers']:
259
                fetch['headers']['If-Modified-Since'] = _t
260
261
        session = cookies.RequestsCookieJar()
262
263
        # fix for tornado request obj
264
        if 'Cookie' in fetch['headers']:
265
            c = http_cookies.SimpleCookie()
266
            try:
267
                c.load(fetch['headers']['Cookie'])
268
            except AttributeError:
269
                c.load(utils.utf8(fetch['headers']['Cookie']))
270
            for key in c:
271
                session.set(key, c[key])
272
            del fetch['headers']['Cookie']
273
        fetch['follow_redirects'] = False
274
        if 'timeout' in fetch:
275
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
276
            del fetch['timeout']
277
        if 'data' in fetch:
278
            fetch['body'] = fetch['data']
279
            del fetch['data']
280
        if 'cookies' in fetch:
281
            session.update(fetch['cookies'])
282
            del fetch['cookies']
283
284
        store = {}
285
        store['max_redirects'] = task_fetch.get('max_redirects', 5)
286
287
        def handle_response(response):
288
            extract_cookies_to_jar(session, response.request, response.headers)
289
            if (response.code in (301, 302, 303, 307)
290
                    and response.headers.get('Location')
291
                    and task_fetch.get('allow_redirects', True)):
292
                if store['max_redirects'] <= 0:
293
                    error = tornado.httpclient.HTTPError(
294
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
295
                        response)
296
                    return handle_error(error)
297
                if response.code in (302, 303):
298
                    fetch['method'] = 'GET'
299
                    if 'body' in fetch:
300
                        del fetch['body']
301
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
302
                fetch['request_timeout'] -= time.time() - start_time
303
                if fetch['request_timeout'] < 0:
304
                    fetch['request_timeout'] = 0.1
305
                fetch['connect_timeout'] = fetch['request_timeout']
306
                store['max_redirects'] -= 1
307
                return make_request(fetch)
308
309
            result = {}
310
            result['orig_url'] = url
311
            result['content'] = response.body or ''
312
            result['headers'] = dict(response.headers)
313
            result['status_code'] = response.code
314
            result['url'] = response.effective_url or url
315
            result['cookies'] = session.get_dict()
316
            result['time'] = time.time() - start_time
317
            result['save'] = task_fetch.get('save')
318
            if response.error:
319
                result['error'] = utils.text(response.error)
320
            if 200 <= response.code < 300:
321
                logger.info("[%d] %s:%s %s %.2fs", response.code,
322
                            task.get('project'), task.get('taskid'),
323
                            url, result['time'])
324
            else:
325
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
326
                               task.get('project'), task.get('taskid'),
327
                               url, result['time'])
328
            callback('http', task, result)
329
            self.on_result('http', task, result)
330
            return task, result
331
332
        handle_error = lambda x: self.handle_error('http',
333
                                                   url, task, start_time, callback, x)
334
335
        def make_request(fetch):
336
            try:
337
                request = tornado.httpclient.HTTPRequest(**fetch)
338
                cookie_header = cookies.get_cookie_header(session, request)
339
                if cookie_header:
340
                    request.headers['Cookie'] = cookie_header
341
                if self.async:
342
                    self.http_client.fetch(request, handle_response)
343
                else:
344
                    return handle_response(self.http_client.fetch(request))
345
            except tornado.httpclient.HTTPError as e:
346
                if e.response:
347
                    return handle_response(e.response)
348
                else:
349
                    return handle_error(e)
350
            except Exception as e:
351
                logger.exception(fetch)
352
                return handle_error(e)
353
354
        return make_request(fetch)
355
356
    def ghost_fetch(self, url, task, callback):
357
        '''Fetch with ghost.py'''
358
        start_time = time.time()
359
360
        self.on_fetch('ghost', task)
361
        if not self.ghost:
362
            result = {
363
                "orig_url": url,
364
                "content": "ghost is not enabled.",
365
                "headers": {},
366
                "status_code": 501,
367
                "url": url,
368
                "cookies": {},
369
                "time": 0,
370
                "save": task.get('fetch', {}).get('save')
371
            }
372
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
373
            callback('http', task, result)
374
            self.on_result('http', task, result)
375
            return task, result
376
377
        fetch = copy.deepcopy(self.default_options)
378
        fetch['url'] = url
379
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
380
        fetch['headers']['User-Agent'] = self.user_agent
381
        task_fetch = task.get('fetch', {})
382
        for each in task_fetch:
383
            if each != 'headers':
384
                fetch[each] = task_fetch[each]
385
        fetch['headers'].update(task_fetch.get('headers', {}))
386
387
        ghost_config = {
388
            'user_agent': fetch['headers']['User-Agent'],
389
            'viewport_size': (fetch.get('js_viewport_height', 768*3), fetch.get('js_viewport_width', 1024)),
390
            'wait_timeout': 0,
391
            'display': False,
392
            'ignore_ssl_errors': True,
393
            'download_images': fetch.get('load_images', False),
394
        }
395
396
        def handle_response(session):
397
            page = get_page_from_session(session)
398
            if not page:
399
                return handle_error('Unable to load requested page')
400
401
            result = {
402
                'orig_url': url,
403
                'status_code': page.http_status,
404
                'error': None,
405
                'content': session.content,
406
                'headers': page.headers,
407
                'url': page.url,
408
                'cookies': session.cookies,
409
                'time': time.time() - start_time,
410
                'js_script_result': session.js_script_result,
411
                'save': task_fetch.get('save'),
412
            }
413
            session.exit()
414
415
            if 200 <= result['status_code'] < 300:
416
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
417
                            task.get('project'), task.get('taskid'),
418
                            url, result['time'])
419
            else:
420
                logger.warning("[%d] %s:%s %s %.2fs", result['status_code'],
421
                               task.get('project'), task.get('taskid'),
422
                               url, result['time'])
423
            callback('ghost', task, result)
424
            self.on_result('ghost', task, result)
425
            return task, result
426
427
        handle_error = lambda x: self.handle_error('ghost', url, task, start_time, callback, x)
428
429
        def check_output(session):
430
            if time.time() - start_time > fetch.get('timeout', 120) or session.loaded:
431
                if fetch.get('js_script', None) and fetch.get('js_run_at', 'document-end') != 'document-start' \
432
                        and not getattr(session, 'js_run', False):
433
                    session.js_script_result, resources = session.evaluate(fetch.get('js_script', None))
434
                    session.http_resources = resources
435
                    session.js_run = True
436
                    self.ioloop.call_later(1, check_output, session)
437
                    return
438
                return handle_response(session)
439
            self.ioloop.call_later(1, check_output, session)
440
441
        def get_page_from_session(session):
442
            resources = session.http_resources
443
444
            url = self.main_frame.url().toString()
445
            url_without_hash = url.split("#")[0]
446
447
            for resource in resources:
448
                if url == resource.url or url_without_hash == resource.url:
449
                    return resource
450
451
        session = self.ghost.start(**ghost_config)
452
453
        try:
454
            # proxy
455
            proxy_string = None
456
            if isinstance(task_fetch.get('proxy'), six.string_types):
457
                proxy_string = task_fetch['proxy']
458
            elif self.proxy and task_fetch.get('proxy', True):
459
                proxy_string = self.proxy
460
            if proxy_string:
461
                if '://' not in proxy_string:
462
                    proxy_string = 'http://' + proxy_string
463
                proxy_splited = urlsplit(proxy_string)
464
                session.set_proxy(proxy_splited.schema, host=proxy_splited.hostname, port=(proxy_splited.port or 8080),
465
                                  user=proxy_splited.username, password=proxy_splited.password)
466
467
            session.js_script_result = None
468
            session.open(fetch['url'], method=fetch['method'], headers=dict(fetch['headers']),
469
                         body=fetch.get('data', None), wait=False, user_agent=fetch['headers']['User-Agent'])
470
471
            # document-start
472
            if fetch.get('js_script', None) and fetch.get('js_run_at', 'document-end') == 'document-start':
473
                session.js_script_result, resources = session.evaluate(fetch.get('js_script', None))
474
                session.js_run = True
475
476
            if self.async:
477
                check_output(session)
478
            else:
479
                session.wait_for(lambda: session.loaded, 'Unable to load requested page', fetch.get('timeout', 120))
480
                if fetch.get('js_script', None) and fetch.get('js_run_at', 'document-end') != 'document-start':
481
                    session.js_script_result, resources = session.evaluate(fetch.get('js_script', None))
482
                    session.http_resources = resources
483
                    session.js_run = True
484
                time.sleep(1)
485
                session.wait_for(lambda: session.loaded, 'Unable to load requested page',
486
                                 fetch.get('timeout', 120) - (time.time() - start_time))
487
                return handle_response(session)
488
        except TimeoutError:
489
            return handle_response(session)
490
        except Exception as e:
491
            session.exit()
492
            return handle_error(e)
493
494
    def phantomjs_fetch(self, url, task, callback):
495
        '''Fetch with phantomjs proxy'''
496
        start_time = time.time()
497
498
        self.on_fetch('phantomjs', task)
499
        if not self.phantomjs_proxy:
500
            result = {
501
                "orig_url": url,
502
                "content": "phantomjs is not enabled.",
503
                "headers": {},
504
                "status_code": 501,
505
                "url": url,
506
                "cookies": {},
507
                "time": 0,
508
                "save": task.get('fetch', {}).get('save')
509
            }
510
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
511
            callback('http', task, result)
512
            self.on_result('http', task, result)
513
            return task, result
514
515
        request_conf = {
516
            'follow_redirects': False
517
        }
518
519
        fetch = copy.deepcopy(self.default_options)
520
        fetch['url'] = url
521
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
522
        fetch['headers']['User-Agent'] = self.user_agent
523
        task_fetch = task.get('fetch', {})
524
        for each in task_fetch:
525
            if each != 'headers':
526
                fetch[each] = task_fetch[each]
527
        fetch['headers'].update(task_fetch.get('headers', {}))
528
529
        if 'timeout' in fetch:
530
            request_conf['connect_timeout'] = fetch['timeout']
531
            request_conf['request_timeout'] = fetch['timeout'] + 1
532
533
        session = cookies.RequestsCookieJar()
534
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
535
        if fetch.get('cookies'):
536
            session.update(fetch['cookies'])
537
            if 'Cookie' in request.headers:
538
                del request.headers['Cookie']
539
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
540
541
        def handle_response(response):
542
            if not response.body:
543
                return handle_error(Exception('no response from phantomjs'))
544
545
            try:
546
                result = json.loads(utils.text(response.body))
547
                if response.error:
548
                    result['error'] = utils.text(response.error)
549
            except Exception as e:
550
                return handle_error(e)
551
552
            if result.get('status_code', 200):
553
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
554
                            task.get('project'), task.get('taskid'), url, result['time'])
555
            else:
556
                logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
557
                             task.get('project'), task.get('taskid'),
558
                             url, result['content'], result['time'])
559
            callback('phantomjs', task, result)
560
            self.on_result('phantomjs', task, result)
561
            return task, result
562
563
        handle_error = lambda x: self.handle_error('phantomjs',
564
                                                   url, task, start_time, callback, x)
565
566
        fetch['headers'] = dict(fetch['headers'])
567
        try:
568
            request = tornado.httpclient.HTTPRequest(
569
                url="%s" % self.phantomjs_proxy, method="POST",
570
                body=json.dumps(fetch), **request_conf)
571
            if self.async:
572
                self.http_client.fetch(request, handle_response)
573
            else:
574
                return handle_response(self.http_client.fetch(request))
575
        except tornado.httpclient.HTTPError as e:
576
            if e.response:
577
                return handle_response(e.response)
578
            else:
579
                return handle_error(e)
580
        except Exception as e:
581
            return handle_error(e)
582
583
    def run(self):
584
        '''Run loop'''
585
        logger.info("fetcher starting...")
586
587
        def queue_loop():
588
            if not self.outqueue or not self.inqueue:
589
                return
590
            while not self._quit:
591
                try:
592
                    if self.outqueue.full():
593
                        break
594
                    if self.http_client.free_size() <= 0:
595
                        break
596
                    task = self.inqueue.get_nowait()
597
                    # FIXME: decode unicode_obj should used after data selete from
598
                    # database, it's used here for performance
599
                    task = utils.decode_unicode_obj(task)
600
                    self.fetch(task)
601
                except queue.Empty:
602
                    break
603
                except KeyboardInterrupt:
604
                    break
605
                except Exception as e:
606
                    logger.exception(e)
607
                    break
608
609
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
610
        self._running = True
611
612
        try:
613
            self.ioloop.start()
614
        except KeyboardInterrupt:
615
            pass
616
617
        logger.info("fetcher exiting...")
618
619
    def quit(self):
620
        '''Quit fetcher'''
621
        self._running = False
622
        self._quit = True
623
        self.ioloop.stop()
624
625
    def size(self):
626
        return self.http_client.size()
627
628
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
629
        '''Run xmlrpc server'''
630
        import umsgpack
631
        try:
632
            from xmlrpc.server import SimpleXMLRPCServer
633
            from xmlrpc.client import Binary
634
        except ImportError:
635
            from SimpleXMLRPCServer import SimpleXMLRPCServer
636
            from xmlrpclib import Binary
637
638
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
639
        server.register_introspection_functions()
640
        server.register_multicall_functions()
641
642
        server.register_function(self.quit, '_quit')
643
        server.register_function(self.size)
644
645
        def sync_fetch(task):
646
            result = self.sync_fetch(task)
647
            result = Binary(umsgpack.packb(result))
648
            return result
649
        server.register_function(sync_fetch, 'fetch')
650
651
        def dump_counter(_time, _type):
652
            return self._cnt[_time].to_dict(_type)
653
        server.register_function(dump_counter, 'counter')
654
655
        server.timeout = 0.5
656
        while not self._quit:
657
            server.handle_request()
658
        server.server_close()
659
660
    def on_fetch(self, type, task):
661
        '''Called before task fetch'''
662
        pass
663
664
    def on_result(self, type, task, result):
665
        '''Called after task fetched'''
666
        status_code = result.get('status_code', 599)
667
        if status_code != 599:
668
            status_code = (int(status_code) / 100 * 100)
669
        self._cnt['5m'].event((task.get('project'), status_code), +1)
670
        self._cnt['1h'].event((task.get('project'), status_code), +1)
671
672
        if type == 'http' and result.get('time'):
673
            content_len = len(result.get('content', ''))
674
            self._cnt['5m'].event((task.get('project'), 'speed'),
675
                                  float(content_len) / result.get('time'))
676
            self._cnt['1h'].event((task.get('project'), 'speed'),
677
                                  float(content_len) / result.get('time'))
678
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
679
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
680