Completed
Push — master ( e6dbce...addc19 )
by Roy
02:40
created

Fetcher   F

Complexity

Total Complexity 120

Size/Duplication

Total Lines 547
Duplicated Lines 0 %

Importance

Changes 13
Bugs 7 Features 0
Metric Value
c 13
b 7
f 0
dl 0
loc 547
rs 1.5789
wmc 120

21 Methods

Rating   Name   Duplication   Size   Complexity  
B can_fetch() 0 30 6
B __init__() 0 25 4
B async_fetch() 0 25 5
A fetch() 0 5 2
A handle_error() 0 14 1
A data_fetch() 0 23 2
A sync_fetch() 0 22 1
A send_result() 0 7 3
A callback() 0 7 1
C queue_loop() 0 21 9
F pack_tornado_request_parameters() 0 68 26
A dump_counter() 0 2 1
A clear_robot_txt_cache() 0 5 3
F http_fetch() 0 103 24
F run() 0 36 11
A quit() 0 8 2
A size() 0 2 1
F phantomjs_fetch() 0 88 16
B xmlrpc_run() 0 34 4
A on_fetch() 0 3 1
A on_result() 0 16 4

How to fix   Complexity   

Complex Class

Complex classes like Fetcher often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
30
from pyspider.libs import utils, dataurl, counter
31
from pyspider.libs.url import quote_chinese
32
from .cookie_utils import extract_cookies_to_jar
33
logger = logging.getLogger('fetcher')
34
35
36
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
37
38
    def free_size(self):
39
        return len(self._free_list)
40
41
    def size(self):
42
        return len(self._curls) - self.free_size()
43
44
45
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
46
47
    def free_size(self):
48
        return self.max_clients - self.size()
49
50
    def size(self):
51
        return len(self.active)
52
53
fetcher_output = {
54
    "status_code": int,
55
    "orig_url": str,
56
    "url": str,
57
    "headers": dict,
58
    "content": str,
59
    "cookies": dict,
60
}
61
62
63
class Fetcher(object):
64
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
65
    default_options = {
66
        'method': 'GET',
67
        'headers': {
68
        },
69
        'use_gzip': True,
70
        'timeout': 120,
71
        'connect_timeout': 20,
72
    }
73
    phantomjs_proxy = None
74
    robot_txt_age = 60*60  # 1h
75
76
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
77
        self.inqueue = inqueue
78
        self.outqueue = outqueue
79
80
        self.poolsize = poolsize
81
        self._running = False
82
        self._quit = False
83
        self.proxy = proxy
84
        self.async = async
85
        self.ioloop = tornado.ioloop.IOLoop()
86
87
        self.robots_txt_cache = {}
88
89
        # binding io_loop to http_client here
90
        if self.async:
91
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
92
                                                     io_loop=self.ioloop)
93
        else:
94
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
95
96
        self._cnt = {
97
            '5m': counter.CounterManager(
98
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
99
            '1h': counter.CounterManager(
100
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
101
        }
102
103
    def send_result(self, type, task, result):
104
        '''Send fetch result to processor'''
105
        if self.outqueue:
106
            try:
107
                self.outqueue.put((task, result))
108
            except Exception as e:
109
                logger.exception(e)
110
111
    def fetch(self, task, callback=None):
112
        if self.async:
113
            return self.async_fetch(task, callback)
114
        else:
115
            return self.async_fetch(task, callback).result()
116
117
    @gen.coroutine
118
    def async_fetch(self, task, callback=None):
119
        '''Do one fetch'''
120
        url = task.get('url', 'data:,')
121
        if callback is None:
122
            callback = self.send_result
123
124
        type = 'None'
125
        try:
126
            if url.startswith('data:'):
127
                type = 'data'
128
                result = yield gen.maybe_future(self.data_fetch(url, task))
129
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
130
                type = 'phantomjs'
131
                result = yield self.phantomjs_fetch(url, task)
132
            else:
133
                type = 'http'
134
                result = yield self.http_fetch(url, task)
135
        except Exception as e:
136
            logger.exception(e)
137
            result = self.handle_error(type, url, task, e)
138
139
        callback(type, task, result)
140
        self.on_result(type, task, result)
141
        raise gen.Return(result)
142
143
    def sync_fetch(self, task):
144
        '''Synchronization fetch, usually used in xmlrpc thread'''
145
        if not self._running:
146
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
147
148
        wait_result = threading.Condition()
149
        _result = {}
150
151
        def callback(type, task, result):
152
            wait_result.acquire()
153
            _result['type'] = type
154
            _result['task'] = task
155
            _result['result'] = result
156
            wait_result.notify()
157
            wait_result.release()
158
159
        wait_result.acquire()
160
        self.ioloop.add_callback(self.fetch, task, callback)
161
        while 'result' not in _result:
162
            wait_result.wait()
163
        wait_result.release()
164
        return _result['result']
165
166
    def data_fetch(self, url, task):
167
        '''A fake fetcher for dataurl'''
168
        self.on_fetch('data', task)
169
        result = {}
170
        result['orig_url'] = url
171
        result['content'] = dataurl.decode(url)
172
        result['headers'] = {}
173
        result['status_code'] = 200
174
        result['url'] = url
175
        result['cookies'] = {}
176
        result['time'] = 0
177
        result['save'] = task.get('fetch', {}).get('save')
178
        if len(result['content']) < 70:
179
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
180
        else:
181
            logger.info(
182
                "[200] %s:%s data:,%s...[content:%d] 0s",
183
                task.get('project'), task.get('taskid'),
184
                result['content'][:70],
185
                len(result['content'])
186
            )
187
188
        return result
189
190
    def handle_error(self, type, url, task, start_time, error):
191
        result = {
192
            'status_code': getattr(error, 'code', 599),
193
            'error': utils.text(error),
194
            'content': "",
195
            'time': time.time() - start_time,
196
            'orig_url': url,
197
            'url': url,
198
            "save": task.get('fetch', {}).get('save')
199
        }
200
        logger.error("[%d] %s:%s %s, %r %.2fs",
201
                     result['status_code'], task.get('project'), task.get('taskid'),
202
                     url, error, result['time'])
203
        return result
204
205
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
206
207
    def pack_tornado_request_parameters(self, url, task):
208
        fetch = copy.deepcopy(self.default_options)
209
        fetch['url'] = url
210
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
211
        fetch['headers']['User-Agent'] = self.user_agent
212
        task_fetch = task.get('fetch', {})
213
        for each in self.allowed_options:
214
            if each in task_fetch:
215
                fetch[each] = task_fetch[each]
216
        fetch['headers'].update(task_fetch.get('headers', {}))
217
218
        if task.get('track'):
219
            track_headers = tornado.httputil.HTTPHeaders(
220
                task.get('track', {}).get('fetch', {}).get('headers') or {})
221
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
222
        else:
223
            track_headers = {}
224
            track_ok = False
225
        # proxy
226
        proxy_string = None
227
        if isinstance(task_fetch.get('proxy'), six.string_types):
228
            proxy_string = task_fetch['proxy']
229
        elif self.proxy and task_fetch.get('proxy', True):
230
            proxy_string = self.proxy
231
        if proxy_string:
232
            if '://' not in proxy_string:
233
                proxy_string = 'http://' + proxy_string
234
            proxy_splited = urlsplit(proxy_string)
235
            fetch['proxy_host'] = proxy_splited.hostname
236
            if proxy_splited.username:
237
                fetch['proxy_username'] = proxy_splited.username
238
            if proxy_splited.password:
239
                fetch['proxy_password'] = proxy_splited.password
240
            if six.PY2:
241
                for key in ('proxy_host', 'proxy_username', 'proxy_password'):
242
                    if key in fetch:
243
                        fetch[key] = fetch[key].encode('utf8')
244
            fetch['proxy_port'] = proxy_splited.port or 8080
245
246
        # etag
247
        if task_fetch.get('etag', True):
248
            _t = None
249
            if isinstance(task_fetch.get('etag'), six.string_types):
250
                _t = task_fetch.get('etag')
251
            elif track_ok:
252
                _t = track_headers.get('etag')
253
            if _t and 'If-None-Match' not in fetch['headers']:
254
                fetch['headers']['If-None-Match'] = _t
255
        # last modifed
256
        if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
257
            last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
258
            _t = None
259
            if isinstance(last_modified, six.string_types):
260
                _t = last_modified
261
            elif track_ok:
262
                _t = track_headers.get('last-modified')
263
            if _t and 'If-Modified-Since' not in fetch['headers']:
264
                fetch['headers']['If-Modified-Since'] = _t
265
        # timeout
266
        if 'timeout' in fetch:
267
            fetch['request_timeout'] = fetch['timeout']
268
            del fetch['timeout']
269
        # data rename to body
270
        if 'data' in fetch:
271
            fetch['body'] = fetch['data']
272
            del fetch['data']
273
274
        return fetch
275
276
    @gen.coroutine
277
    def can_fetch(self, user_agent, url):
278
        parsed = urlsplit(url)
279
        domain = parsed.netloc
280
        if domain in self.robots_txt_cache:
281
            robot_txt = self.robots_txt_cache[domain]
282
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
283
                robot_txt = None
284
        else:
285
            robot_txt = None
286
287
        if robot_txt is None:
288
            robot_txt = RobotFileParser()
289
            try:
290
                response = yield gen.maybe_future(self.http_client.fetch(
291
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
292
                content = response.body
293
            except tornado.httpclient.HTTPError as e:
294
                logger.error('load robots.txt from %s error: %r', domain, e)
295
                content = ''
296
297
            try:
298
                content = content.decode('utf8', 'ignore')
299
            except UnicodeDecodeError:
300
                content = ''
301
302
            robot_txt.parse(content.splitlines())
303
            self.robots_txt_cache[domain] = robot_txt
304
305
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
306
307
    def clear_robot_txt_cache(self):
308
        now = time.time()
309
        for domain, robot_txt in self.robots_txt_cache.items():
310
            if now - robot_txt.mtime() > self.robot_txt_age:
311
                del self.robots_txt_cache[domain]
312
313
    @gen.coroutine
314
    def http_fetch(self, url, task):
315
        '''HTTP fetcher'''
316
        start_time = time.time()
317
        self.on_fetch('http', task)
318
        handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
319
320
        # setup request parameters
321
        fetch = self.pack_tornado_request_parameters(url, task)
322
        task_fetch = task.get('fetch', {})
323
324
        session = cookies.RequestsCookieJar()
325
        # fix for tornado request obj
326
        if 'Cookie' in fetch['headers']:
327
            c = http_cookies.SimpleCookie()
328
            try:
329
                c.load(fetch['headers']['Cookie'])
330
            except AttributeError:
331
                c.load(utils.utf8(fetch['headers']['Cookie']))
332
            for key in c:
333
                session.set(key, c[key])
334
            del fetch['headers']['Cookie']
335
        if 'cookies' in fetch:
336
            session.update(fetch['cookies'])
337
            del fetch['cookies']
338
339
        max_redirects = task_fetch.get('max_redirects', 5)
340
        # we will handle redirects by hand to capture cookies
341
        fetch['follow_redirects'] = False
342
343
        # making requests
344
        while True:
345
            # robots.txt
346
            if task_fetch.get('robots_txt', False):
347
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
348
                if not can_fetch:
349
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
350
                    raise gen.Return(handle_error(error))
351
352
            try:
353
                request = tornado.httpclient.HTTPRequest(**fetch)
354
                # if cookie already in header, get_cookie_header wouldn't work
355
                old_cookie_header = request.headers.get('Cookie')
356
                if old_cookie_header:
357
                    del request.headers['Cookie']
358
                cookie_header = cookies.get_cookie_header(session, request)
359
                if cookie_header:
360
                    request.headers['Cookie'] = cookie_header
361
                elif old_cookie_header:
362
                    request.headers['Cookie'] = old_cookie_header
363
            except Exception as e:
364
                logger.exception(fetch)
365
                raise gen.Return(handle_error(e))
366
367
            try:
368
                response = yield gen.maybe_future(self.http_client.fetch(request))
369
            except tornado.httpclient.HTTPError as e:
370
                if e.response:
371
                    response = e.response
372
                else:
373
                    raise gen.Return(handle_error(e))
374
375
            extract_cookies_to_jar(session, response.request, response.headers)
376
            if (response.code in (301, 302, 303, 307)
377
                    and response.headers.get('Location')
378
                    and task_fetch.get('allow_redirects', True)):
379
                if max_redirects <= 0:
380
                    error = tornado.httpclient.HTTPError(
381
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
382
                        response)
383
                    raise gen.Return(handle_error(error))
384
                if response.code in (302, 303):
385
                    fetch['method'] = 'GET'
386
                    if 'body' in fetch:
387
                        del fetch['body']
388
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
389
                fetch['request_timeout'] -= time.time() - start_time
390
                if fetch['request_timeout'] < 0:
391
                    fetch['request_timeout'] = 0.1
392
                max_redirects -= 1
393
                continue
394
395
            result = {}
396
            result['orig_url'] = url
397
            result['content'] = response.body or ''
398
            result['headers'] = dict(response.headers)
399
            result['status_code'] = response.code
400
            result['url'] = response.effective_url or url
401
            result['time'] = time.time() - start_time
402
            result['cookies'] = session.get_dict()
403
            result['save'] = task_fetch.get('save')
404
            if response.error:
405
                result['error'] = utils.text(response.error)
406
            if 200 <= response.code < 300:
407
                logger.info("[%d] %s:%s %s %.2fs", response.code,
408
                            task.get('project'), task.get('taskid'),
409
                            url, result['time'])
410
            else:
411
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
412
                               task.get('project'), task.get('taskid'),
413
                               url, result['time'])
414
415
            raise gen.Return(result)
416
417
    @gen.coroutine
418
    def phantomjs_fetch(self, url, task):
419
        '''Fetch with phantomjs proxy'''
420
        start_time = time.time()
421
        self.on_fetch('phantomjs', task)
422
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
423
424
        # check phantomjs proxy is enabled
425
        if not self.phantomjs_proxy:
426
            result = {
427
                "orig_url": url,
428
                "content": "phantomjs is not enabled.",
429
                "headers": {},
430
                "status_code": 501,
431
                "url": url,
432
                "time": time.time() - start_time,
433
                "cookies": {},
434
                "save": task.get('fetch', {}).get('save')
435
            }
436
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
437
            raise gen.Return(result)
438
439
        # setup request parameters
440
        fetch = self.pack_tornado_request_parameters(url, task)
441
        task_fetch = task.get('fetch', {})
442
        for each in task_fetch:
443
            if each not in fetch:
444
                fetch[each] = task_fetch[each]
445
446
        # robots.txt
447
        if task_fetch.get('robots_txt', False):
448
            user_agent = fetch['headers']['User-Agent']
449
            can_fetch = yield self.can_fetch(user_agent, url)
450
            if not can_fetch:
451
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
452
                raise gen.Return(handle_error(error))
453
454
        request_conf = {
455
            'follow_redirects': False
456
        }
457
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
458
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
459
460
        session = cookies.RequestsCookieJar()
461
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
462
        if fetch.get('cookies'):
463
            session.update(fetch['cookies'])
464
            if 'Cookie' in request.headers:
465
                del request.headers['Cookie']
466
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
467
468
        # making requests
469
        fetch['headers'] = dict(fetch['headers'])
470
        try:
471
            request = tornado.httpclient.HTTPRequest(
472
                url="%s" % self.phantomjs_proxy, method="POST",
473
                body=json.dumps(fetch), **request_conf)
474
        except Exception as e:
475
            raise gen.Return(handle_error(e))
476
477
        try:
478
            response = yield gen.maybe_future(self.http_client.fetch(request))
479
        except tornado.httpclient.HTTPError as e:
480
            if e.response:
481
                response = e.response
482
            else:
483
                raise gen.Return(handle_error(e))
484
485
        if not response.body:
486
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
487
488
        result = {}
489
        try:
490
            result = json.loads(utils.text(response.body))
491
        except Exception as e:
492
            if response.error:
493
                result['error'] = utils.text(response.error)
494
            raise gen.Return(handle_error(e))
495
496
        if result.get('status_code', 200):
497
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
498
                        task.get('project'), task.get('taskid'), url, result['time'])
499
        else:
500
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
501
                         task.get('project'), task.get('taskid'),
502
                         url, result['content'], result['time'])
503
504
        raise gen.Return(result)
505
506
    def run(self):
507
        '''Run loop'''
508
        logger.info("fetcher starting...")
509
510
        def queue_loop():
511
            if not self.outqueue or not self.inqueue:
512
                return
513
            while not self._quit:
514
                try:
515
                    if self.outqueue.full():
516
                        break
517
                    if self.http_client.free_size() <= 0:
518
                        break
519
                    task = self.inqueue.get_nowait()
520
                    # FIXME: decode unicode_obj should used after data selete from
521
                    # database, it's used here for performance
522
                    task = utils.decode_unicode_obj(task)
523
                    self.fetch(task)
524
                except queue.Empty:
525
                    break
526
                except KeyboardInterrupt:
527
                    break
528
                except Exception as e:
529
                    logger.exception(e)
530
                    break
531
532
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
533
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
534
        self._running = True
535
536
        try:
537
            self.ioloop.start()
538
        except KeyboardInterrupt:
539
            pass
540
541
        logger.info("fetcher exiting...")
542
543
    def quit(self):
544
        '''Quit fetcher'''
545
        self._running = False
546
        self._quit = True
547
        self.ioloop.add_callback(self.ioloop.stop)
548
        if hasattr(self, 'xmlrpc_server'):
549
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
550
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
551
552
    def size(self):
553
        return self.http_client.size()
554
555
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
556
        '''Run xmlrpc server'''
557
        import umsgpack
558
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
559
        try:
560
            from xmlrpc.client import Binary
561
        except ImportError:
562
            from xmlrpclib import Binary
563
564
        application = WSGIXMLRPCApplication()
565
566
        application.register_function(self.quit, '_quit')
567
        application.register_function(self.size)
568
569
        def sync_fetch(task):
570
            result = self.sync_fetch(task)
571
            result = Binary(umsgpack.packb(result))
572
            return result
573
        application.register_function(sync_fetch, 'fetch')
574
575
        def dump_counter(_time, _type):
576
            return self._cnt[_time].to_dict(_type)
577
        application.register_function(dump_counter, 'counter')
578
579
        import tornado.wsgi
580
        import tornado.ioloop
581
        import tornado.httpserver
582
583
        container = tornado.wsgi.WSGIContainer(application)
584
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
585
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
586
        self.xmlrpc_server.listen(port=port, address=bind)
587
        logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
588
        self.xmlrpc_ioloop.start()
589
590
    def on_fetch(self, type, task):
591
        '''Called before task fetch'''
592
        pass
593
594
    def on_result(self, type, task, result):
595
        '''Called after task fetched'''
596
        status_code = result.get('status_code', 599)
597
        if status_code != 599:
598
            status_code = (int(status_code) / 100 * 100)
599
        self._cnt['5m'].event((task.get('project'), status_code), +1)
600
        self._cnt['1h'].event((task.get('project'), status_code), +1)
601
602
        if type in ('http', 'phantomjs') and result.get('time'):
603
            content_len = len(result.get('content', ''))
604
            self._cnt['5m'].event((task.get('project'), 'speed'),
605
                                  float(content_len) / result.get('time'))
606
            self._cnt['1h'].event((task.get('project'), 'speed'),
607
                                  float(content_len) / result.get('time'))
608
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
609
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
610