Completed
Push — splash ( 1aefd6 )
by Roy
01:14
created

Fetcher.splash_fetch()   F

Complexity

Conditions 18

Size

Total Lines 96

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
dl 0
loc 96
rs 2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Fetcher.splash_fetch() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import os
11
import six
12
import copy
13
import time
14
import json
15
import logging
16
import traceback
17
import functools
18
import threading
19
import tornado.ioloop
20
import tornado.httputil
21
import tornado.httpclient
22
import pyspider
23
24
from six.moves import queue, http_cookies
25
from six.moves.urllib.robotparser import RobotFileParser
26
from requests import cookies
27
from six.moves.urllib.parse import urljoin, urlsplit
28
from tornado import gen
29
from tornado.curl_httpclient import CurlAsyncHTTPClient
30
from tornado.simple_httpclient import SimpleAsyncHTTPClient
31
32
from pyspider.libs import utils, dataurl, counter
33
from pyspider.libs.url import quote_chinese
34
from .cookie_utils import extract_cookies_to_jar
35
logger = logging.getLogger('fetcher')
36
37
38
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
39
40
    def free_size(self):
41
        return len(self._free_list)
42
43
    def size(self):
44
        return len(self._curls) - self.free_size()
45
46
47
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
48
49
    def free_size(self):
50
        return self.max_clients - self.size()
51
52
    def size(self):
53
        return len(self.active)
54
55
fetcher_output = {
56
    "status_code": int,
57
    "orig_url": str,
58
    "url": str,
59
    "headers": dict,
60
    "content": str,
61
    "cookies": dict,
62
}
63
64
65
class Fetcher(object):
66
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
67
    default_options = {
68
        'method': 'GET',
69
        'headers': {
70
        },
71
        'use_gzip': True,
72
        'timeout': 120,
73
        'connect_timeout': 20,
74
    }
75
    phantomjs_proxy = None
76
    splash_endpoint = None
77
    splash_lua_source = open(os.path.join(os.path.dirname(__file__), "splash_fetcher.lua")).read()
78
    robot_txt_age = 60*60  # 1h
79
80
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
81
        self.inqueue = inqueue
82
        self.outqueue = outqueue
83
84
        self.poolsize = poolsize
85
        self._running = False
86
        self._quit = False
87
        self.proxy = proxy
88
        self.async = async
89
        self.ioloop = tornado.ioloop.IOLoop()
90
91
        self.robots_txt_cache = {}
92
93
        # binding io_loop to http_client here
94
        if self.async:
95
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
96
                                                     io_loop=self.ioloop)
97
        else:
98
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
99
100
        self._cnt = {
101
            '5m': counter.CounterManager(
102
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
103
            '1h': counter.CounterManager(
104
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
105
        }
106
107
    def send_result(self, type, task, result):
108
        '''Send fetch result to processor'''
109
        if self.outqueue:
110
            try:
111
                self.outqueue.put((task, result))
112
            except Exception as e:
113
                logger.exception(e)
114
115
    def fetch(self, task, callback=None):
116
        if self.async:
117
            return self.async_fetch(task, callback)
118
        else:
119
            return self.async_fetch(task, callback).result()
120
121
    @gen.coroutine
122
    def async_fetch(self, task, callback=None):
123
        '''Do one fetch'''
124
        url = task.get('url', 'data:,')
125
        if callback is None:
126
            callback = self.send_result
127
128
        type = 'None'
129
        start_time = time.time()
130
        try:
131
            if url.startswith('data:'):
132
                type = 'data'
133
                result = yield gen.maybe_future(self.data_fetch(url, task))
134
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
135
                type = 'phantomjs'
136
                result = yield self.phantomjs_fetch(url, task)
137
            elif task.get('fetch', {}).get('fetch_type') in ('splash', ):
138
                type = 'splash'
139
                result = yield self.splash_fetch(url, task)
140
            else:
141
                type = 'http'
142
                result = yield self.http_fetch(url, task)
143
        except Exception as e:
144
            logger.exception(e)
145
            result = self.handle_error(type, url, task, start_time, e)
146
147
        callback(type, task, result)
148
        self.on_result(type, task, result)
149
        raise gen.Return(result)
150
151
    def sync_fetch(self, task):
152
        '''Synchronization fetch, usually used in xmlrpc thread'''
153
        if not self._running:
154
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
155
156
        wait_result = threading.Condition()
157
        _result = {}
158
159
        def callback(type, task, result):
160
            wait_result.acquire()
161
            _result['type'] = type
162
            _result['task'] = task
163
            _result['result'] = result
164
            wait_result.notify()
165
            wait_result.release()
166
167
        wait_result.acquire()
168
        self.ioloop.add_callback(self.fetch, task, callback)
169
        while 'result' not in _result:
170
            wait_result.wait()
171
        wait_result.release()
172
        return _result['result']
173
174
    def data_fetch(self, url, task):
175
        '''A fake fetcher for dataurl'''
176
        self.on_fetch('data', task)
177
        result = {}
178
        result['orig_url'] = url
179
        result['content'] = dataurl.decode(url)
180
        result['headers'] = {}
181
        result['status_code'] = 200
182
        result['url'] = url
183
        result['cookies'] = {}
184
        result['time'] = 0
185
        result['save'] = task.get('fetch', {}).get('save')
186
        if len(result['content']) < 70:
187
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
188
        else:
189
            logger.info(
190
                "[200] %s:%s data:,%s...[content:%d] 0s",
191
                task.get('project'), task.get('taskid'),
192
                result['content'][:70],
193
                len(result['content'])
194
            )
195
196
        return result
197
198
    def handle_error(self, type, url, task, start_time, error):
199
        result = {
200
            'status_code': getattr(error, 'code', 599),
201
            'error': utils.text(error),
202
            'traceback': traceback.format_exc(),
203
            'content': "",
204
            'time': time.time() - start_time,
205
            'orig_url': url,
206
            'url': url,
207
            "save": task.get('fetch', {}).get('save')
208
        }
209
        logger.error("[%d] %s:%s %s, %r %.2fs",
210
                     result['status_code'], task.get('project'), task.get('taskid'),
211
                     url, error, result['time'])
212
        return result
213
214
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
215
216
    def pack_tornado_request_parameters(self, url, task):
217
        fetch = copy.deepcopy(self.default_options)
218
        fetch['url'] = url
219
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
220
        fetch['headers']['User-Agent'] = self.user_agent
221
        task_fetch = task.get('fetch', {})
222
        for each in self.allowed_options:
223
            if each in task_fetch:
224
                fetch[each] = task_fetch[each]
225
        fetch['headers'].update(task_fetch.get('headers', {}))
226
227
        if task.get('track'):
228
            track_headers = tornado.httputil.HTTPHeaders(
229
                task.get('track', {}).get('fetch', {}).get('headers') or {})
230
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
231
        else:
232
            track_headers = {}
233
            track_ok = False
234
        # proxy
235
        proxy_string = None
236
        if isinstance(task_fetch.get('proxy'), six.string_types):
237
            proxy_string = task_fetch['proxy']
238
        elif self.proxy and task_fetch.get('proxy', True):
239
            proxy_string = self.proxy
240
        if proxy_string:
241
            if '://' not in proxy_string:
242
                proxy_string = 'http://' + proxy_string
243
            proxy_splited = urlsplit(proxy_string)
244
            fetch['proxy_host'] = proxy_splited.hostname
245
            if proxy_splited.username:
246
                fetch['proxy_username'] = proxy_splited.username
247
            if proxy_splited.password:
248
                fetch['proxy_password'] = proxy_splited.password
249
            if six.PY2:
250
                for key in ('proxy_host', 'proxy_username', 'proxy_password'):
251
                    if key in fetch:
252
                        fetch[key] = fetch[key].encode('utf8')
253
            fetch['proxy_port'] = proxy_splited.port or 8080
254
255
        # etag
256
        if task_fetch.get('etag', True):
257
            _t = None
258
            if isinstance(task_fetch.get('etag'), six.string_types):
259
                _t = task_fetch.get('etag')
260
            elif track_ok:
261
                _t = track_headers.get('etag')
262
            if _t and 'If-None-Match' not in fetch['headers']:
263
                fetch['headers']['If-None-Match'] = _t
264
        # last modifed
265
        if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
266
            last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
267
            _t = None
268
            if isinstance(last_modified, six.string_types):
269
                _t = last_modified
270
            elif track_ok:
271
                _t = track_headers.get('last-modified')
272
            if _t and 'If-Modified-Since' not in fetch['headers']:
273
                fetch['headers']['If-Modified-Since'] = _t
274
        # timeout
275
        if 'timeout' in fetch:
276
            fetch['request_timeout'] = fetch['timeout']
277
            del fetch['timeout']
278
        # data rename to body
279
        if 'data' in fetch:
280
            fetch['body'] = fetch['data']
281
            del fetch['data']
282
283
        return fetch
284
285
    @gen.coroutine
286
    def can_fetch(self, user_agent, url):
287
        parsed = urlsplit(url)
288
        domain = parsed.netloc
289
        if domain in self.robots_txt_cache:
290
            robot_txt = self.robots_txt_cache[domain]
291
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
292
                robot_txt = None
293
        else:
294
            robot_txt = None
295
296
        if robot_txt is None:
297
            robot_txt = RobotFileParser()
298
            try:
299
                response = yield gen.maybe_future(self.http_client.fetch(
300
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
301
                content = response.body
302
            except tornado.httpclient.HTTPError as e:
303
                logger.error('load robots.txt from %s error: %r', domain, e)
304
                content = ''
305
306
            try:
307
                content = content.decode('utf8', 'ignore')
308
            except UnicodeDecodeError:
309
                content = ''
310
311
            robot_txt.parse(content.splitlines())
312
            self.robots_txt_cache[domain] = robot_txt
313
314
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
315
316
    def clear_robot_txt_cache(self):
317
        now = time.time()
318
        for domain, robot_txt in self.robots_txt_cache.items():
319
            if now - robot_txt.mtime() > self.robot_txt_age:
320
                del self.robots_txt_cache[domain]
321
322
    @gen.coroutine
323
    def http_fetch(self, url, task):
324
        '''HTTP fetcher'''
325
        start_time = time.time()
326
        self.on_fetch('http', task)
327
        handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
328
329
        # setup request parameters
330
        fetch = self.pack_tornado_request_parameters(url, task)
331
        task_fetch = task.get('fetch', {})
332
333
        session = cookies.RequestsCookieJar()
334
        # fix for tornado request obj
335
        if 'Cookie' in fetch['headers']:
336
            c = http_cookies.SimpleCookie()
337
            try:
338
                c.load(fetch['headers']['Cookie'])
339
            except AttributeError:
340
                c.load(utils.utf8(fetch['headers']['Cookie']))
341
            for key in c:
342
                session.set(key, c[key])
343
            del fetch['headers']['Cookie']
344
        if 'cookies' in fetch:
345
            session.update(fetch['cookies'])
346
            del fetch['cookies']
347
348
        max_redirects = task_fetch.get('max_redirects', 5)
349
        # we will handle redirects by hand to capture cookies
350
        fetch['follow_redirects'] = False
351
352
        # making requests
353
        while True:
354
            # robots.txt
355
            if task_fetch.get('robots_txt', False):
356
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
357
                if not can_fetch:
358
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
359
                    raise gen.Return(handle_error(error))
360
361
            try:
362
                request = tornado.httpclient.HTTPRequest(**fetch)
363
                # if cookie already in header, get_cookie_header wouldn't work
364
                old_cookie_header = request.headers.get('Cookie')
365
                if old_cookie_header:
366
                    del request.headers['Cookie']
367
                cookie_header = cookies.get_cookie_header(session, request)
368
                if cookie_header:
369
                    request.headers['Cookie'] = cookie_header
370
                elif old_cookie_header:
371
                    request.headers['Cookie'] = old_cookie_header
372
            except Exception as e:
373
                logger.exception(fetch)
374
                raise gen.Return(handle_error(e))
375
376
            try:
377
                response = yield gen.maybe_future(self.http_client.fetch(request))
378
            except tornado.httpclient.HTTPError as e:
379
                if e.response:
380
                    response = e.response
381
                else:
382
                    raise gen.Return(handle_error(e))
383
384
            extract_cookies_to_jar(session, response.request, response.headers)
385
            if (response.code in (301, 302, 303, 307)
386
                    and response.headers.get('Location')
387
                    and task_fetch.get('allow_redirects', True)):
388
                if max_redirects <= 0:
389
                    error = tornado.httpclient.HTTPError(
390
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
391
                        response)
392
                    raise gen.Return(handle_error(error))
393
                if response.code in (302, 303):
394
                    fetch['method'] = 'GET'
395
                    if 'body' in fetch:
396
                        del fetch['body']
397
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
398
                fetch['request_timeout'] -= time.time() - start_time
399
                if fetch['request_timeout'] < 0:
400
                    fetch['request_timeout'] = 0.1
401
                max_redirects -= 1
402
                continue
403
404
            result = {}
405
            result['orig_url'] = url
406
            result['content'] = response.body or ''
407
            result['headers'] = dict(response.headers)
408
            result['status_code'] = response.code
409
            result['url'] = response.effective_url or url
410
            result['time'] = time.time() - start_time
411
            result['cookies'] = session.get_dict()
412
            result['save'] = task_fetch.get('save')
413
            if response.error:
414
                result['error'] = utils.text(response.error)
415
            if 200 <= response.code < 300:
416
                logger.info("[%d] %s:%s %s %.2fs", response.code,
417
                            task.get('project'), task.get('taskid'),
418
                            url, result['time'])
419
            else:
420
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
421
                               task.get('project'), task.get('taskid'),
422
                               url, result['time'])
423
424
            raise gen.Return(result)
425
426
    @gen.coroutine
427
    def phantomjs_fetch(self, url, task):
428
        '''Fetch with phantomjs proxy'''
429
        start_time = time.time()
430
        self.on_fetch('phantomjs', task)
431
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
432
433
        # check phantomjs proxy is enabled
434
        if not self.phantomjs_proxy:
435
            result = {
436
                "orig_url": url,
437
                "content": "phantomjs is not enabled.",
438
                "headers": {},
439
                "status_code": 501,
440
                "url": url,
441
                "time": time.time() - start_time,
442
                "cookies": {},
443
                "save": task.get('fetch', {}).get('save')
444
            }
445
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
446
            raise gen.Return(result)
447
448
        # setup request parameters
449
        fetch = self.pack_tornado_request_parameters(url, task)
450
        task_fetch = task.get('fetch', {})
451
        for each in task_fetch:
452
            if each not in fetch:
453
                fetch[each] = task_fetch[each]
454
455
        # robots.txt
456
        if task_fetch.get('robots_txt', False):
457
            user_agent = fetch['headers']['User-Agent']
458
            can_fetch = yield self.can_fetch(user_agent, url)
459
            if not can_fetch:
460
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
461
                raise gen.Return(handle_error(error))
462
463
        request_conf = {
464
            'follow_redirects': False
465
        }
466
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
467
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
468
469
        session = cookies.RequestsCookieJar()
470
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
471
        if fetch.get('cookies'):
472
            session.update(fetch['cookies'])
473
            if 'Cookie' in request.headers:
474
                del request.headers['Cookie']
475
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
476
477
        # making requests
478
        fetch['headers'] = dict(fetch['headers'])
479
        try:
480
            request = tornado.httpclient.HTTPRequest(
481
                url=self.phantomjs_proxy, method="POST",
482
                body=json.dumps(fetch), **request_conf)
483
        except Exception as e:
484
            raise gen.Return(handle_error(e))
485
486
        try:
487
            response = yield gen.maybe_future(self.http_client.fetch(request))
488
        except tornado.httpclient.HTTPError as e:
489
            if e.response:
490
                response = e.response
491
            else:
492
                raise gen.Return(handle_error(e))
493
494
        if not response.body:
495
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
496
497
        result = {}
498
        try:
499
            result = json.loads(utils.text(response.body))
500
            assert 'status_code' in result, result
501
        except Exception as e:
502
            if response.error:
503
                result['error'] = utils.text(response.error)
504
            raise gen.Return(handle_error(e))
505
506
        if result.get('status_code', 200):
507
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
508
                        task.get('project'), task.get('taskid'), url, result['time'])
509
        else:
510
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
511
                         task.get('project'), task.get('taskid'),
512
                         url, result['content'], result['time'])
513
514
        raise gen.Return(result)
515
516
    @gen.coroutine
517
    def splash_fetch(self, url, task):
518
        '''Fetch with splash'''
519
        start_time = time.time()
520
        self.on_fetch('splash', task)
521
        handle_error = lambda x: self.handle_error('splash', url, task, start_time, x)
522
523
        # check phantomjs proxy is enabled
524
        if not self.splash_endpoint:
525
            result = {
526
                "orig_url": url,
527
                "content": "splash is not enabled.",
528
                "headers": {},
529
                "status_code": 501,
530
                "url": url,
531
                "time": time.time() - start_time,
532
                "cookies": {},
533
                "save": task.get('fetch', {}).get('save')
534
            }
535
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
536
            raise gen.Return(result)
537
538
        # setup request parameters
539
        fetch = self.pack_tornado_request_parameters(url, task)
540
        task_fetch = task.get('fetch', {})
541
        for each in task_fetch:
542
            if each not in fetch:
543
                fetch[each] = task_fetch[each]
544
545
        # robots.txt
546
        if task_fetch.get('robots_txt', False):
547
            user_agent = fetch['headers']['User-Agent']
548
            can_fetch = yield self.can_fetch(user_agent, url)
549
            if not can_fetch:
550
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
551
                raise gen.Return(handle_error(error))
552
553
        request_conf = {
554
            'follow_redirects': False,
555
            'headers': {
556
                'Content-Type': 'application/json',
557
            }
558
        }
559
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
560
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
561
562
        session = cookies.RequestsCookieJar()
563
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
564
        if fetch.get('cookies'):
565
            session.update(fetch['cookies'])
566
            if 'Cookie' in request.headers:
567
                del request.headers['Cookie']
568
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
569
570
        # making requests
571
        fetch['lua_source'] = self.splash_lua_source
572
        fetch['headers'] = dict(fetch['headers'])
573
        try:
574
            request = tornado.httpclient.HTTPRequest(
575
                url=self.splash_endpoint, method="POST",
576
                body=json.dumps(fetch), **request_conf)
577
        except Exception as e:
578
            raise gen.Return(handle_error(e))
579
580
        try:
581
            response = yield gen.maybe_future(self.http_client.fetch(request))
582
        except tornado.httpclient.HTTPError as e:
583
            if e.response:
584
                response = e.response
585
            else:
586
                raise gen.Return(handle_error(e))
587
588
        if not response.body:
589
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
590
591
        result = {}
592
        try:
593
            result = json.loads(utils.text(response.body))
594
            assert 'status_code' in result, result
595
        except ValueError as e:
596
            logger.error("result is not json: %r", response.body[:500])
597
            raise gen.Return(handle_error(e))
598
        except Exception as e:
599
            if response.error:
600
                result['error'] = utils.text(response.error)
601
            raise gen.Return(handle_error(e))
602
603
        if result.get('status_code', 200):
604
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
605
                        task.get('project'), task.get('taskid'), url, result['time'])
606
        else:
607
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
608
                         task.get('project'), task.get('taskid'),
609
                         url, result['content'], result['time'])
610
611
        raise gen.Return(result)
612
613
    def run(self):
614
        '''Run loop'''
615
        logger.info("fetcher starting...")
616
617
        def queue_loop():
618
            if not self.outqueue or not self.inqueue:
619
                return
620
            while not self._quit:
621
                try:
622
                    if self.outqueue.full():
623
                        break
624
                    if self.http_client.free_size() <= 0:
625
                        break
626
                    task = self.inqueue.get_nowait()
627
                    # FIXME: decode unicode_obj should used after data selete from
628
                    # database, it's used here for performance
629
                    task = utils.decode_unicode_obj(task)
630
                    self.fetch(task)
631
                except queue.Empty:
632
                    break
633
                except KeyboardInterrupt:
634
                    break
635
                except Exception as e:
636
                    logger.exception(e)
637
                    break
638
639
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
640
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
641
        self._running = True
642
643
        try:
644
            self.ioloop.start()
645
        except KeyboardInterrupt:
646
            pass
647
648
        logger.info("fetcher exiting...")
649
650
    def quit(self):
651
        '''Quit fetcher'''
652
        self._running = False
653
        self._quit = True
654
        self.ioloop.add_callback(self.ioloop.stop)
655
        if hasattr(self, 'xmlrpc_server'):
656
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
657
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
658
659
    def size(self):
660
        return self.http_client.size()
661
662
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
663
        '''Run xmlrpc server'''
664
        import umsgpack
665
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
666
        try:
667
            from xmlrpc.client import Binary
668
        except ImportError:
669
            from xmlrpclib import Binary
670
671
        application = WSGIXMLRPCApplication()
672
673
        application.register_function(self.quit, '_quit')
674
        application.register_function(self.size)
675
676
        def sync_fetch(task):
677
            result = self.sync_fetch(task)
678
            result = Binary(umsgpack.packb(result))
679
            return result
680
        application.register_function(sync_fetch, 'fetch')
681
682
        def dump_counter(_time, _type):
683
            return self._cnt[_time].to_dict(_type)
684
        application.register_function(dump_counter, 'counter')
685
686
        import tornado.wsgi
687
        import tornado.ioloop
688
        import tornado.httpserver
689
690
        container = tornado.wsgi.WSGIContainer(application)
691
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
692
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
693
        self.xmlrpc_server.listen(port=port, address=bind)
694
        logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
695
        self.xmlrpc_ioloop.start()
696
697
    def on_fetch(self, type, task):
698
        '''Called before task fetch'''
699
        pass
700
701
    def on_result(self, type, task, result):
702
        '''Called after task fetched'''
703
        status_code = result.get('status_code', 599)
704
        if status_code != 599:
705
            status_code = (int(status_code) / 100 * 100)
706
        self._cnt['5m'].event((task.get('project'), status_code), +1)
707
        self._cnt['1h'].event((task.get('project'), status_code), +1)
708
709
        if type in ('http', 'phantomjs') and result.get('time'):
710
            content_len = len(result.get('content', ''))
711
            self._cnt['5m'].event((task.get('project'), 'speed'),
712
                                  float(content_len) / result.get('time'))
713
            self._cnt['1h'].event((task.get('project'), 'speed'),
714
                                  float(content_len) / result.get('time'))
715
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
716
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
717