Fetcher.run()   F
last analyzed

Complexity

Conditions 11

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
dl 0
loc 36
rs 3.1764
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
C Fetcher.queue_loop() 0 21 9

How to fix   Complexity   

Complexity

Complex classes like Fetcher.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import os
11
import sys
12
import six
13
import copy
14
import time
15
import json
16
import logging
17
import traceback
18
import functools
19
import threading
20
import tornado.ioloop
21
import tornado.httputil
22
import tornado.httpclient
23
import pyspider
24
25
from six.moves import queue, http_cookies
26
from six.moves.urllib.robotparser import RobotFileParser
27
from requests import cookies
28
from six.moves.urllib.parse import urljoin, urlsplit
29
from tornado import gen
30
from tornado.curl_httpclient import CurlAsyncHTTPClient
31
from tornado.simple_httpclient import SimpleAsyncHTTPClient
32
33
from pyspider.libs import utils, dataurl, counter
34
from pyspider.libs.url import quote_chinese
35
from .cookie_utils import extract_cookies_to_jar
36
logger = logging.getLogger('fetcher')
37
38
39
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
40
41
    def free_size(self):
42
        return len(self._free_list)
43
44
    def size(self):
45
        return len(self._curls) - self.free_size()
46
47
48
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
49
50
    def free_size(self):
51
        return self.max_clients - self.size()
52
53
    def size(self):
54
        return len(self.active)
55
56
fetcher_output = {
57
    "status_code": int,
58
    "orig_url": str,
59
    "url": str,
60
    "headers": dict,
61
    "content": str,
62
    "cookies": dict,
63
}
64
65
66
class Fetcher(object):
67
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
68
    default_options = {
69
        'method': 'GET',
70
        'headers': {
71
        },
72
        'use_gzip': True,
73
        'timeout': 120,
74
        'connect_timeout': 20,
75
    }
76
    phantomjs_proxy = None
77
    splash_endpoint = None
78
    splash_lua_source = open(os.path.join(os.path.dirname(__file__), "splash_fetcher.lua")).read()
79
    robot_txt_age = 60*60  # 1h
80
81
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async_mode=True):
82
        self.inqueue = inqueue
83
        self.outqueue = outqueue
84
85
        self.poolsize = poolsize
86
        self._running = False
87
        self._quit = False
88
        self.proxy = proxy
89
        self.async = async_mode
90
        self.ioloop = tornado.ioloop.IOLoop()
91
92
        self.robots_txt_cache = {}
93
94
        # binding io_loop to http_client here
95
        if self.async:
96
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
97
                                                     io_loop=self.ioloop)
98
        else:
99
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
100
101
        self._cnt = {
102
            '5m': counter.CounterManager(
103
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
104
            '1h': counter.CounterManager(
105
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
106
        }
107
108
    def send_result(self, type, task, result):
109
        '''Send fetch result to processor'''
110
        if self.outqueue:
111
            try:
112
                self.outqueue.put((task, result))
113
            except Exception as e:
114
                logger.exception(e)
115
116
    def fetch(self, task, callback=None):
117
        if self.async:
118
            return self.async_fetch(task, callback)
119
        else:
120
            return self.async_fetch(task, callback).result()
121
122
    @gen.coroutine
123
    def async_fetch(self, task, callback=None):
124
        '''Do one fetch'''
125
        url = task.get('url', 'data:,')
126
        if callback is None:
127
            callback = self.send_result
128
129
        type = 'None'
130
        start_time = time.time()
131
        try:
132
            if url.startswith('data:'):
133
                type = 'data'
134
                result = yield gen.maybe_future(self.data_fetch(url, task))
135
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
136
                type = 'phantomjs'
137
                result = yield self.phantomjs_fetch(url, task)
138
            elif task.get('fetch', {}).get('fetch_type') in ('splash', ):
139
                type = 'splash'
140
                result = yield self.splash_fetch(url, task)
141
            else:
142
                type = 'http'
143
                result = yield self.http_fetch(url, task)
144
        except Exception as e:
145
            logger.exception(e)
146
            result = self.handle_error(type, url, task, start_time, e)
147
148
        callback(type, task, result)
149
        self.on_result(type, task, result)
150
        raise gen.Return(result)
151
152
    def sync_fetch(self, task):
153
        '''Synchronization fetch, usually used in xmlrpc thread'''
154
        if not self._running:
155
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
156
157
        wait_result = threading.Condition()
158
        _result = {}
159
160
        def callback(type, task, result):
161
            wait_result.acquire()
162
            _result['type'] = type
163
            _result['task'] = task
164
            _result['result'] = result
165
            wait_result.notify()
166
            wait_result.release()
167
168
        wait_result.acquire()
169
        self.ioloop.add_callback(self.fetch, task, callback)
170
        while 'result' not in _result:
171
            wait_result.wait()
172
        wait_result.release()
173
        return _result['result']
174
175
    def data_fetch(self, url, task):
176
        '''A fake fetcher for dataurl'''
177
        self.on_fetch('data', task)
178
        result = {}
179
        result['orig_url'] = url
180
        result['content'] = dataurl.decode(url)
181
        result['headers'] = {}
182
        result['status_code'] = 200
183
        result['url'] = url
184
        result['cookies'] = {}
185
        result['time'] = 0
186
        result['save'] = task.get('fetch', {}).get('save')
187
        if len(result['content']) < 70:
188
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
189
        else:
190
            logger.info(
191
                "[200] %s:%s data:,%s...[content:%d] 0s",
192
                task.get('project'), task.get('taskid'),
193
                result['content'][:70],
194
                len(result['content'])
195
            )
196
197
        return result
198
199
    def handle_error(self, type, url, task, start_time, error):
200
        result = {
201
            'status_code': getattr(error, 'code', 599),
202
            'error': utils.text(error),
203
            'traceback': traceback.format_exc() if sys.exc_info()[0] else None,
204
            'content': "",
205
            'time': time.time() - start_time,
206
            'orig_url': url,
207
            'url': url,
208
            "save": task.get('fetch', {}).get('save')
209
        }
210
        logger.error("[%d] %s:%s %s, %r %.2fs",
211
                     result['status_code'], task.get('project'), task.get('taskid'),
212
                     url, error, result['time'])
213
        return result
214
215
    allowed_options = ['method', 'data', 'connect_timeout', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
216
217
    def pack_tornado_request_parameters(self, url, task):
218
        fetch = copy.deepcopy(self.default_options)
219
        fetch['url'] = url
220
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
221
        fetch['headers']['User-Agent'] = self.user_agent
222
        task_fetch = task.get('fetch', {})
223
        for each in self.allowed_options:
224
            if each in task_fetch:
225
                fetch[each] = task_fetch[each]
226
        fetch['headers'].update(task_fetch.get('headers', {}))
227
228
        if task.get('track'):
229
            track_headers = tornado.httputil.HTTPHeaders(
230
                task.get('track', {}).get('fetch', {}).get('headers') or {})
231
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
232
        else:
233
            track_headers = {}
234
            track_ok = False
235
        # proxy
236
        proxy_string = None
237
        if isinstance(task_fetch.get('proxy'), six.string_types):
238
            proxy_string = task_fetch['proxy']
239
        elif self.proxy and task_fetch.get('proxy', True):
240
            proxy_string = self.proxy
241
        if proxy_string:
242
            if '://' not in proxy_string:
243
                proxy_string = 'http://' + proxy_string
244
            proxy_splited = urlsplit(proxy_string)
245
            fetch['proxy_host'] = proxy_splited.hostname
246
            if proxy_splited.username:
247
                fetch['proxy_username'] = proxy_splited.username
248
            if proxy_splited.password:
249
                fetch['proxy_password'] = proxy_splited.password
250
            if six.PY2:
251
                for key in ('proxy_host', 'proxy_username', 'proxy_password'):
252
                    if key in fetch:
253
                        fetch[key] = fetch[key].encode('utf8')
254
            fetch['proxy_port'] = proxy_splited.port or 8080
255
256
        # etag
257
        if task_fetch.get('etag', True):
258
            _t = None
259
            if isinstance(task_fetch.get('etag'), six.string_types):
260
                _t = task_fetch.get('etag')
261
            elif track_ok:
262
                _t = track_headers.get('etag')
263
            if _t and 'If-None-Match' not in fetch['headers']:
264
                fetch['headers']['If-None-Match'] = _t
265
        # last modifed
266
        if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
267
            last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
268
            _t = None
269
            if isinstance(last_modified, six.string_types):
270
                _t = last_modified
271
            elif track_ok:
272
                _t = track_headers.get('last-modified')
273
            if _t and 'If-Modified-Since' not in fetch['headers']:
274
                fetch['headers']['If-Modified-Since'] = _t
275
        # timeout
276
        if 'timeout' in fetch:
277
            fetch['request_timeout'] = fetch['timeout']
278
            del fetch['timeout']
279
        # data rename to body
280
        if 'data' in fetch:
281
            fetch['body'] = fetch['data']
282
            del fetch['data']
283
284
        return fetch
285
286
    @gen.coroutine
287
    def can_fetch(self, user_agent, url):
288
        parsed = urlsplit(url)
289
        domain = parsed.netloc
290
        if domain in self.robots_txt_cache:
291
            robot_txt = self.robots_txt_cache[domain]
292
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
293
                robot_txt = None
294
        else:
295
            robot_txt = None
296
297
        if robot_txt is None:
298
            robot_txt = RobotFileParser()
299
            try:
300
                response = yield gen.maybe_future(self.http_client.fetch(
301
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
302
                content = response.body
303
            except tornado.httpclient.HTTPError as e:
304
                logger.error('load robots.txt from %s error: %r', domain, e)
305
                content = ''
306
307
            try:
308
                content = content.decode('utf8', 'ignore')
309
            except UnicodeDecodeError:
310
                content = ''
311
312
            robot_txt.parse(content.splitlines())
313
            self.robots_txt_cache[domain] = robot_txt
314
315
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
316
317
    def clear_robot_txt_cache(self):
318
        now = time.time()
319
        for domain, robot_txt in self.robots_txt_cache.items():
320
            if now - robot_txt.mtime() > self.robot_txt_age:
321
                del self.robots_txt_cache[domain]
322
323
    @gen.coroutine
324
    def http_fetch(self, url, task):
325
        '''HTTP fetcher'''
326
        start_time = time.time()
327
        self.on_fetch('http', task)
328
        handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
329
330
        # setup request parameters
331
        fetch = self.pack_tornado_request_parameters(url, task)
332
        task_fetch = task.get('fetch', {})
333
334
        session = cookies.RequestsCookieJar()
335
        # fix for tornado request obj
336 View Code Duplication
        if 'Cookie' in fetch['headers']:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
337
            c = http_cookies.SimpleCookie()
338
            try:
339
                c.load(fetch['headers']['Cookie'])
340
            except AttributeError:
341
                c.load(utils.utf8(fetch['headers']['Cookie']))
342
            for key in c:
343
                session.set(key, c[key])
344
            del fetch['headers']['Cookie']
345
        if 'cookies' in fetch:
346
            session.update(fetch['cookies'])
347
            del fetch['cookies']
348
349
        max_redirects = task_fetch.get('max_redirects', 5)
350
        # we will handle redirects by hand to capture cookies
351
        fetch['follow_redirects'] = False
352
353
        # making requests
354
        while True:
355
            # robots.txt
356
            if task_fetch.get('robots_txt', False):
357
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
358
                if not can_fetch:
359
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
360
                    raise gen.Return(handle_error(error))
361
362
            try:
363
                request = tornado.httpclient.HTTPRequest(**fetch)
364
                # if cookie already in header, get_cookie_header wouldn't work
365
                old_cookie_header = request.headers.get('Cookie')
366
                if old_cookie_header:
367
                    del request.headers['Cookie']
368
                cookie_header = cookies.get_cookie_header(session, request)
369
                if cookie_header:
370
                    request.headers['Cookie'] = cookie_header
371
                elif old_cookie_header:
372
                    request.headers['Cookie'] = old_cookie_header
373
            except Exception as e:
374
                logger.exception(fetch)
375
                raise gen.Return(handle_error(e))
376
377
            try:
378
                response = yield gen.maybe_future(self.http_client.fetch(request))
379
            except tornado.httpclient.HTTPError as e:
380
                if e.response:
381
                    response = e.response
382
                else:
383
                    raise gen.Return(handle_error(e))
384
385
            extract_cookies_to_jar(session, response.request, response.headers)
386
            if (response.code in (301, 302, 303, 307)
387
                    and response.headers.get('Location')
388
                    and task_fetch.get('allow_redirects', True)):
389
                if max_redirects <= 0:
390
                    error = tornado.httpclient.HTTPError(
391
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
392
                        response)
393
                    raise gen.Return(handle_error(error))
394
                if response.code in (302, 303):
395
                    fetch['method'] = 'GET'
396
                    if 'body' in fetch:
397
                        del fetch['body']
398
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
399
                fetch['request_timeout'] -= time.time() - start_time
400
                if fetch['request_timeout'] < 0:
401
                    fetch['request_timeout'] = 0.1
402
                max_redirects -= 1
403
                continue
404
405
            result = {}
406
            result['orig_url'] = url
407
            result['content'] = response.body or ''
408
            result['headers'] = dict(response.headers)
409
            result['status_code'] = response.code
410
            result['url'] = response.effective_url or url
411
            result['time'] = time.time() - start_time
412
            result['cookies'] = session.get_dict()
413
            result['save'] = task_fetch.get('save')
414
            if response.error:
415
                result['error'] = utils.text(response.error)
416
            if 200 <= response.code < 300:
417
                logger.info("[%d] %s:%s %s %.2fs", response.code,
418
                            task.get('project'), task.get('taskid'),
419
                            url, result['time'])
420
            else:
421
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
422
                               task.get('project'), task.get('taskid'),
423
                               url, result['time'])
424
425
            raise gen.Return(result)
426
427
    @gen.coroutine
428
    def phantomjs_fetch(self, url, task):
429
        '''Fetch with phantomjs proxy'''
430
        start_time = time.time()
431
        self.on_fetch('phantomjs', task)
432
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
433
434
        # check phantomjs proxy is enabled
435
        if not self.phantomjs_proxy:
436
            result = {
437
                "orig_url": url,
438
                "content": "phantomjs is not enabled.",
439
                "headers": {},
440
                "status_code": 501,
441
                "url": url,
442
                "time": time.time() - start_time,
443
                "cookies": {},
444
                "save": task.get('fetch', {}).get('save')
445
            }
446
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
447
            raise gen.Return(result)
448
449
        # setup request parameters
450
        fetch = self.pack_tornado_request_parameters(url, task)
451
        task_fetch = task.get('fetch', {})
452
        for each in task_fetch:
453
            if each not in fetch:
454
                fetch[each] = task_fetch[each]
455
456
        # robots.txt
457
        if task_fetch.get('robots_txt', False):
458
            user_agent = fetch['headers']['User-Agent']
459
            can_fetch = yield self.can_fetch(user_agent, url)
460
            if not can_fetch:
461
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
462
                raise gen.Return(handle_error(error))
463
464
        request_conf = {
465
            'follow_redirects': False
466
        }
467
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
468
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
469
470
        session = cookies.RequestsCookieJar()
471 View Code Duplication
        if 'Cookie' in fetch['headers']:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
472
            c = http_cookies.SimpleCookie()
473
            try:
474
                c.load(fetch['headers']['Cookie'])
475
            except AttributeError:
476
                c.load(utils.utf8(fetch['headers']['Cookie']))
477
            for key in c:
478
                session.set(key, c[key])
479
            del fetch['headers']['Cookie']
480
        if 'cookies' in fetch:
481
            session.update(fetch['cookies'])
482
            del fetch['cookies']
483
484
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
485
        cookie_header = cookies.get_cookie_header(session, request)
486
        if cookie_header:
487
            fetch['headers']['Cookie'] = cookie_header
488
489
        # making requests
490
        fetch['headers'] = dict(fetch['headers'])
491
        try:
492
            request = tornado.httpclient.HTTPRequest(
493
                url=self.phantomjs_proxy, method="POST",
494
                body=json.dumps(fetch), **request_conf)
495
        except Exception as e:
496
            raise gen.Return(handle_error(e))
497
498
        try:
499
            response = yield gen.maybe_future(self.http_client.fetch(request))
500
        except tornado.httpclient.HTTPError as e:
501
            if e.response:
502
                response = e.response
503
            else:
504
                raise gen.Return(handle_error(e))
505
506
        if not response.body:
507
            raise gen.Return(handle_error(Exception('no response from phantomjs: %r' % response)))
508
509
        result = {}
510
        try:
511
            result = json.loads(utils.text(response.body))
512
            assert 'status_code' in result, result
513
        except Exception as e:
514
            if response.error:
515
                result['error'] = utils.text(response.error)
516
            raise gen.Return(handle_error(e))
517
518
        if result.get('status_code', 200):
519
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
520
                        task.get('project'), task.get('taskid'), url, result['time'])
521
        else:
522
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
523
                         task.get('project'), task.get('taskid'),
524
                         url, result['content'], result['time'])
525
526
        raise gen.Return(result)
527
528
    @gen.coroutine
529
    def splash_fetch(self, url, task):
530
        '''Fetch with splash'''
531
        start_time = time.time()
532
        self.on_fetch('splash', task)
533
        handle_error = lambda x: self.handle_error('splash', url, task, start_time, x)
534
535
        # check phantomjs proxy is enabled
536
        if not self.splash_endpoint:
537
            result = {
538
                "orig_url": url,
539
                "content": "splash is not enabled.",
540
                "headers": {},
541
                "status_code": 501,
542
                "url": url,
543
                "time": time.time() - start_time,
544
                "cookies": {},
545
                "save": task.get('fetch', {}).get('save')
546
            }
547
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
548
            raise gen.Return(result)
549
550
        # setup request parameters
551
        fetch = self.pack_tornado_request_parameters(url, task)
552
        task_fetch = task.get('fetch', {})
553
        for each in task_fetch:
554
            if each not in fetch:
555
                fetch[each] = task_fetch[each]
556
557
        # robots.txt
558
        if task_fetch.get('robots_txt', False):
559
            user_agent = fetch['headers']['User-Agent']
560
            can_fetch = yield self.can_fetch(user_agent, url)
561
            if not can_fetch:
562
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
563
                raise gen.Return(handle_error(error))
564
565
        request_conf = {
566
            'follow_redirects': False,
567
            'headers': {
568
                'Content-Type': 'application/json',
569
            }
570
        }
571
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
572
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
573
574
        session = cookies.RequestsCookieJar()
575 View Code Duplication
        if 'Cookie' in fetch['headers']:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
576
            c = http_cookies.SimpleCookie()
577
            try:
578
                c.load(fetch['headers']['Cookie'])
579
            except AttributeError:
580
                c.load(utils.utf8(fetch['headers']['Cookie']))
581
            for key in c:
582
                session.set(key, c[key])
583
            del fetch['headers']['Cookie']
584
        if 'cookies' in fetch:
585
            session.update(fetch['cookies'])
586
            del fetch['cookies']
587
588
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
589
        cookie_header = cookies.get_cookie_header(session, request)
590
        if cookie_header:
591
            fetch['headers']['Cookie'] = cookie_header
592
593
        # making requests
594
        fetch['lua_source'] = self.splash_lua_source
595
        fetch['headers'] = dict(fetch['headers'])
596
        try:
597
            request = tornado.httpclient.HTTPRequest(
598
                url=self.splash_endpoint, method="POST",
599
                body=json.dumps(fetch), **request_conf)
600
        except Exception as e:
601
            raise gen.Return(handle_error(e))
602
603
        try:
604
            response = yield gen.maybe_future(self.http_client.fetch(request))
605
        except tornado.httpclient.HTTPError as e:
606
            if e.response:
607
                response = e.response
608
            else:
609
                raise gen.Return(handle_error(e))
610
611
        if not response.body:
612
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
613
614
        result = {}
615
        try:
616
            result = json.loads(utils.text(response.body))
617
            assert 'status_code' in result, result
618
        except ValueError as e:
619
            logger.error("result is not json: %r", response.body[:500])
620
            raise gen.Return(handle_error(e))
621
        except Exception as e:
622
            if response.error:
623
                result['error'] = utils.text(response.error)
624
            raise gen.Return(handle_error(e))
625
626
        if result.get('status_code', 200):
627
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
628
                        task.get('project'), task.get('taskid'), url, result['time'])
629
        else:
630
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
631
                         task.get('project'), task.get('taskid'),
632
                         url, result['content'], result['time'])
633
634
        raise gen.Return(result)
635
636
    def run(self):
637
        '''Run loop'''
638
        logger.info("fetcher starting...")
639
640
        def queue_loop():
641
            if not self.outqueue or not self.inqueue:
642
                return
643
            while not self._quit:
644
                try:
645
                    if self.outqueue.full():
646
                        break
647
                    if self.http_client.free_size() <= 0:
648
                        break
649
                    task = self.inqueue.get_nowait()
650
                    # FIXME: decode unicode_obj should used after data selete from
651
                    # database, it's used here for performance
652
                    task = utils.decode_unicode_obj(task)
653
                    self.fetch(task)
654
                except queue.Empty:
655
                    break
656
                except KeyboardInterrupt:
657
                    break
658
                except Exception as e:
659
                    logger.exception(e)
660
                    break
661
662
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
663
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
664
        self._running = True
665
666
        try:
667
            self.ioloop.start()
668
        except KeyboardInterrupt:
669
            pass
670
671
        logger.info("fetcher exiting...")
672
673
    def quit(self):
674
        '''Quit fetcher'''
675
        self._running = False
676
        self._quit = True
677
        self.ioloop.add_callback(self.ioloop.stop)
678
        if hasattr(self, 'xmlrpc_server'):
679
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
680
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
681
682
    def size(self):
683
        return self.http_client.size()
684
685
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
686
        '''Run xmlrpc server'''
687
        import umsgpack
688
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
689
        try:
690
            from xmlrpc.client import Binary
691
        except ImportError:
692
            from xmlrpclib import Binary
693
694
        application = WSGIXMLRPCApplication()
695
696
        application.register_function(self.quit, '_quit')
697
        application.register_function(self.size)
698
699
        def sync_fetch(task):
700
            result = self.sync_fetch(task)
701
            result = Binary(umsgpack.packb(result))
702
            return result
703
        application.register_function(sync_fetch, 'fetch')
704
705
        def dump_counter(_time, _type):
706
            return self._cnt[_time].to_dict(_type)
707
        application.register_function(dump_counter, 'counter')
708
709
        import tornado.wsgi
710
        import tornado.ioloop
711
        import tornado.httpserver
712
713
        container = tornado.wsgi.WSGIContainer(application)
714
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
715
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
716
        self.xmlrpc_server.listen(port=port, address=bind)
717
        logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
718
        self.xmlrpc_ioloop.start()
719
720
    def on_fetch(self, type, task):
721
        '''Called before task fetch'''
722
        pass
723
724
    def on_result(self, type, task, result):
725
        '''Called after task fetched'''
726
        status_code = result.get('status_code', 599)
727
        if status_code != 599:
728
            status_code = (int(status_code) / 100 * 100)
729
        self._cnt['5m'].event((task.get('project'), status_code), +1)
730
        self._cnt['1h'].event((task.get('project'), status_code), +1)
731
732
        if type in ('http', 'phantomjs') and result.get('time'):
733
            content_len = len(result.get('content', ''))
734
            self._cnt['5m'].event((task.get('project'), 'speed'),
735
                                  float(content_len) / result.get('time'))
736
            self._cnt['1h'].event((task.get('project'), 'speed'),
737
                                  float(content_len) / result.get('time'))
738
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
739
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
740