Completed
Push — master ( 8f71e0...468202 )
by Roy
01:20
created

pyspider.fetcher.Fetcher.http_fetch()   F

Complexity

Conditions 22

Size

Total Lines 101

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 22
dl 0
loc 101
rs 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pyspider.fetcher.Fetcher.http_fetch() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
from pyspider.libs import utils, dataurl, counter
30
from .cookie_utils import extract_cookies_to_jar
31
logger = logging.getLogger('fetcher')
32
33
34
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
35
36
    def free_size(self):
37
        return len(self._free_list)
38
39
    def size(self):
40
        return len(self._curls) - self.free_size()
41
42
43
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
44
45
    def free_size(self):
46
        return self.max_clients - self.size()
47
48
    def size(self):
49
        return len(self.active)
50
51
fetcher_output = {
52
    "status_code": int,
53
    "orig_url": str,
54
    "url": str,
55
    "headers": dict,
56
    "content": str,
57
    "cookies": dict,
58
}
59
60
61
class Fetcher(object):
62
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
63
    default_options = {
64
        'method': 'GET',
65
        'headers': {
66
        },
67
        'use_gzip': True,
68
        'timeout': 120,
69
    }
70
    phantomjs_proxy = None
71
    robot_txt_age = 60*60  # 1h
72
73
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
74
        self.inqueue = inqueue
75
        self.outqueue = outqueue
76
77
        self.poolsize = poolsize
78
        self._running = False
79
        self._quit = False
80
        self.proxy = proxy
81
        self.async = async
82
        self.ioloop = tornado.ioloop.IOLoop()
83
84
        self.robots_txt_cache = {}
85
86
        # binding io_loop to http_client here
87
        if self.async:
88
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
89
                                                     io_loop=self.ioloop)
90
        else:
91
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
92
93
        self._cnt = {
94
            '5m': counter.CounterManager(
95
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
96
            '1h': counter.CounterManager(
97
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
98
        }
99
100
    def send_result(self, type, task, result):
101
        '''Send fetch result to processor'''
102
        if self.outqueue:
103
            try:
104
                self.outqueue.put((task, result))
105
            except Exception as e:
106
                logger.exception(e)
107
108
    def fetch(self, task, callback=None):
109
        if self.async:
110
            return self.async_fetch(task, callback)
111
        else:
112
            return self.async_fetch(task, callback).result()
113
114
    @gen.coroutine
115
    def async_fetch(self, task, callback=None):
116
        '''Do one fetch'''
117
        url = task.get('url', 'data:,')
118
        if callback is None:
119
            callback = self.send_result
120
121
        try:
122
            if url.startswith('data:'):
123
                ret = yield gen.maybe_future(self.data_fetch(url, task, callback))
124
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
125
                ret = yield self.phantomjs_fetch(url, task, callback)
126
            else:
127
                ret = yield self.http_fetch(url, task, callback)
128
        except Exception as e:
129
            logger.exception(e)
130
            raise e
131
132
        raise gen.Return(ret)
133
134
    def sync_fetch(self, task):
135
        '''Synchronization fetch, usually used in xmlrpc thread'''
136
        if not self._running:
137
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
138
139
        wait_result = threading.Condition()
140
        _result = {}
141
142
        def callback(type, task, result):
143
            wait_result.acquire()
144
            _result['type'] = type
145
            _result['task'] = task
146
            _result['result'] = result
147
            wait_result.notify()
148
            wait_result.release()
149
150
        wait_result.acquire()
151
        self.fetch(task, callback=callback)
152
        while 'result' not in _result:
153
            wait_result.wait()
154
        wait_result.release()
155
        return _result['result']
156
157
    def data_fetch(self, url, task, callback):
158
        '''A fake fetcher for dataurl'''
159
        self.on_fetch('data', task)
160
        result = {}
161
        result['orig_url'] = url
162
        result['content'] = dataurl.decode(url)
163
        result['headers'] = {}
164
        result['status_code'] = 200
165
        result['url'] = url
166
        result['cookies'] = {}
167
        result['time'] = 0
168
        result['save'] = task.get('fetch', {}).get('save')
169
        if len(result['content']) < 70:
170
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
171
        else:
172
            logger.info(
173
                "[200] %s:%s data:,%s...[content:%d] 0s",
174
                task.get('project'), task.get('taskid'),
175
                result['content'][:70],
176
                len(result['content'])
177
            )
178
179
        callback('data', task, result)
180
        self.on_result('data', task, result)
181
        return task, result
182
183
    def handle_error(self, type, url, task, start_time, callback, error):
184
        result = {
185
            'status_code': getattr(error, 'code', 599),
186
            'error': utils.text(error),
187
            'content': "",
188
            'time': time.time() - start_time,
189
            'orig_url': url,
190
            'url': url,
191
        }
192
        logger.error("[%d] %s:%s %s, %r %.2fs",
193
                     result['status_code'], task.get('project'), task.get('taskid'),
194
                     url, error, result['time'])
195
        callback(type, task, result)
196
        self.on_result(type, task, result)
197
        return task, result
198
199
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
200
201
    def pack_tornado_request_parameters(self, url, task):
202
        fetch = copy.deepcopy(self.default_options)
203
        fetch['url'] = url
204
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
205
        fetch['headers']['User-Agent'] = self.user_agent
206
        task_fetch = task.get('fetch', {})
207
        for each in self.allowed_options:
208
            if each in task_fetch:
209
                fetch[each] = task_fetch[each]
210
        fetch['headers'].update(task_fetch.get('headers', {}))
211
212
        if task.get('track'):
213
            track_headers = tornado.httputil.HTTPHeaders(
214
                task.get('track', {}).get('fetch', {}).get('headers') or {})
215
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
216
        else:
217
            track_headers = {}
218
            track_ok = False
219
        # proxy
220
        proxy_string = None
221
        if isinstance(task_fetch.get('proxy'), six.string_types):
222
            proxy_string = task_fetch['proxy']
223
        elif self.proxy and task_fetch.get('proxy', True):
224
            proxy_string = self.proxy
225
        if proxy_string:
226
            if '://' not in proxy_string:
227
                proxy_string = 'http://' + proxy_string
228
            proxy_splited = urlsplit(proxy_string)
229
            if proxy_splited.username:
230
                fetch['proxy_username'] = proxy_splited.username
231
                if six.PY2:
232
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
233
            if proxy_splited.password:
234
                fetch['proxy_password'] = proxy_splited.password
235
                if six.PY2:
236
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
237
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
238
            if six.PY2:
239
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
240
            fetch['proxy_port'] = proxy_splited.port or 8080
241
242
        # etag
243
        if task_fetch.get('etag', True):
244
            _t = None
245
            if isinstance(task_fetch.get('etag'), six.string_types):
246
                _t = task_fetch.get('etag')
247
            elif track_ok:
248
                _t = track_headers.get('etag')
249
            if _t and 'If-None-Match' not in fetch['headers']:
250
                fetch['headers']['If-None-Match'] = _t
251
        # last modifed
252
        if task_fetch.get('last_modified', True):
253
            _t = None
254
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
255
                _t = task_fetch.get('last_modifed')
256
            elif track_ok:
257
                _t = track_headers.get('last-modified')
258
            if _t and 'If-Modified-Since' not in fetch['headers']:
259
                fetch['headers']['If-Modified-Since'] = _t
260
        # timeout
261
        if 'timeout' in fetch:
262
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
263
            del fetch['timeout']
264
        # data rename to body
265
        if 'data' in fetch:
266
            fetch['body'] = fetch['data']
267
            del fetch['data']
268
269
        return fetch
270
271
    @gen.coroutine
272
    def can_fetch(self, user_agent, url):
273
        parsed = urlsplit(url)
274
        domain = parsed.netloc
275
        if domain in self.robots_txt_cache:
276
            robot_txt = self.robots_txt_cache[domain]
277
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
278
                robot_txt = None
279
        else:
280
            robot_txt = None
281
282
        if robot_txt is None:
283
            robot_txt = RobotFileParser()
284
            try:
285
                response = yield gen.maybe_future(self.http_client.fetch(
286
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
287
                content = response.body
288
            except tornado.httpclient.HTTPError as e:
289
                logger.error('load robots.txt from %s error: %r', domain, e)
290
                content = ''
291
292
            try:
293
                content = content.decode('utf8', 'ignore')
294
            except UnicodeDecodeError:
295
                content = ''
296
297
            robot_txt.parse(content.splitlines())
298
            self.robots_txt_cache[domain] = robot_txt
299
300
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
301
302
    def clear_robot_txt_cache(self):
303
        now = time.time()
304
        for domain, robot_txt in self.robots_txt_cache.items():
305
            if now - robot_txt.mtime() > self.robot_txt_age:
306
                del self.robots_txt_cache[domain]
307
308
    @gen.coroutine
309
    def http_fetch(self, url, task, callback):
310
        '''HTTP fetcher'''
311
        start_time = time.time()
312
313
        self.on_fetch('http', task)
314
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
315
316
        # setup request parameters
317
        fetch = self.pack_tornado_request_parameters(url, task)
318
        task_fetch = task.get('fetch', {})
319
320
        session = cookies.RequestsCookieJar()
321
        # fix for tornado request obj
322
        if 'Cookie' in fetch['headers']:
323
            c = http_cookies.SimpleCookie()
324
            try:
325
                c.load(fetch['headers']['Cookie'])
326
            except AttributeError:
327
                c.load(utils.utf8(fetch['headers']['Cookie']))
328
            for key in c:
329
                session.set(key, c[key])
330
            del fetch['headers']['Cookie']
331
        if 'cookies' in fetch:
332
            session.update(fetch['cookies'])
333
            del fetch['cookies']
334
335
        max_redirects = task_fetch.get('max_redirects', 5)
336
        # we will handle redirects by hand to capture cookies
337
        fetch['follow_redirects'] = False
338
339
        # making requests
340
        while True:
341
            # robots.txt
342
            if task_fetch.get('robots_txt', False):
343
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
344
                if not can_fetch:
345
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
346
                    raise gen.Return(handle_error(error))
347
348
            try:
349
                request = tornado.httpclient.HTTPRequest(**fetch)
350
                cookie_header = cookies.get_cookie_header(session, request)
351
                if cookie_header:
352
                    request.headers['Cookie'] = cookie_header
353
            except Exception as e:
354
                logger.exception(fetch)
355
                raise gen.Return(handle_error(e))
356
357
            try:
358
                response = yield gen.maybe_future(self.http_client.fetch(request))
359
            except tornado.httpclient.HTTPError as e:
360
                if e.response:
361
                    response = e.response
362
                else:
363
                    raise gen.Return(handle_error(e))
364
365
            extract_cookies_to_jar(session, response.request, response.headers)
366
            if (response.code in (301, 302, 303, 307)
367
                    and response.headers.get('Location')
368
                    and task_fetch.get('allow_redirects', True)):
369
                if max_redirects <= 0:
370
                    error = tornado.httpclient.HTTPError(
371
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
372
                        response)
373
                    raise gen.Return(handle_error(error))
374
                if response.code in (302, 303):
375
                    fetch['method'] = 'GET'
376
                    if 'body' in fetch:
377
                        del fetch['body']
378
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
379
                fetch['request_timeout'] -= time.time() - start_time
380
                if fetch['request_timeout'] < 0:
381
                    fetch['request_timeout'] = 0.1
382
                fetch['connect_timeout'] = fetch['request_timeout']
383
                max_redirects -= 1
384
                continue
385
386
            result = {}
387
            result['orig_url'] = url
388
            result['content'] = response.body or ''
389
            result['headers'] = dict(response.headers)
390
            result['status_code'] = response.code
391
            result['url'] = response.effective_url or url
392
            result['cookies'] = session.get_dict()
393
            result['time'] = time.time() - start_time
394
            result['save'] = task_fetch.get('save')
395
            if response.error:
396
                result['error'] = utils.text(response.error)
397
            if 200 <= response.code < 300:
398
                logger.info("[%d] %s:%s %s %.2fs", response.code,
399
                            task.get('project'), task.get('taskid'),
400
                            url, result['time'])
401
            else:
402
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
403
                               task.get('project'), task.get('taskid'),
404
                               url, result['time'])
405
406
            callback('http', task, result)
407
            self.on_result('http', task, result)
408
            raise gen.Return((task, result))
409
410
    @gen.coroutine
411
    def phantomjs_fetch(self, url, task, callback):
412
        '''Fetch with phantomjs proxy'''
413
        start_time = time.time()
414
415
        self.on_fetch('phantomjs', task)
416
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
417
418
        # check phantomjs proxy is enabled
419
        if not self.phantomjs_proxy:
420
            result = {
421
                "orig_url": url,
422
                "content": "phantomjs is not enabled.",
423
                "headers": {},
424
                "status_code": 501,
425
                "url": url,
426
                "cookies": {},
427
                "time": 0,
428
                "save": task.get('fetch', {}).get('save')
429
            }
430
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
431
            callback('http', task, result)
432
            self.on_result('http', task, result)
433
            raise gen.Return((task, result))
434
435
        # setup request parameters
436
        fetch = self.pack_tornado_request_parameters(url, task)
437
        task_fetch = task.get('fetch', {})
438
        for each in task_fetch:
439
            if each not in fetch:
440
                fetch[each] = task_fetch[each]
441
442
        # robots.txt
443
        if task_fetch.get('robots_txt', False):
444
            user_agent = fetch['headers']['User-Agent']
445
            can_fetch = yield self.can_fetch(user_agent, url)
446
            if not can_fetch:
447
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
448
                raise gen.Return(handle_error(error))
449
450
        request_conf = {
451
            'follow_redirects': False
452
        }
453
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 120)
454
        request_conf['request_timeout'] = fetch.get('request_timeout', 120)
455
456
        session = cookies.RequestsCookieJar()
457
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
458
        if fetch.get('cookies'):
459
            session.update(fetch['cookies'])
460
            if 'Cookie' in request.headers:
461
                del request.headers['Cookie']
462
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
463
464
        # making requests
465
        fetch['headers'] = dict(fetch['headers'])
466
        try:
467
            request = tornado.httpclient.HTTPRequest(
468
                url="%s" % self.phantomjs_proxy, method="POST",
469
                body=json.dumps(fetch), **request_conf)
470
        except Exception as e:
471
            raise gen.Return(handle_error(e))
472
473
        try:
474
            response = yield gen.maybe_future(self.http_client.fetch(request))
475
        except tornado.httpclient.HTTPError as e:
476
            if e.response:
477
                response = e.response
478
            else:
479
                raise gen.Return(handle_error(e))
480
481
        if not response.body:
482
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
483
484
        try:
485
            result = json.loads(utils.text(response.body))
486
        except Exception as e:
487
            if response.error:
488
                result['error'] = utils.text(response.error)
489
            raise gen.Return(handle_error(e))
490
491
        if result.get('status_code', 200):
492
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
493
                        task.get('project'), task.get('taskid'), url, result['time'])
494
        else:
495
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
496
                         task.get('project'), task.get('taskid'),
497
                         url, result['content'], result['time'])
498
499
        callback('phantomjs', task, result)
500
        self.on_result('phantomjs', task, result)
501
        raise gen.Return((task, result))
502
503
    def run(self):
504
        '''Run loop'''
505
        logger.info("fetcher starting...")
506
507
        def queue_loop():
508
            if not self.outqueue or not self.inqueue:
509
                return
510
            while not self._quit:
511
                try:
512
                    if self.outqueue.full():
513
                        break
514
                    if self.http_client.free_size() <= 0:
515
                        break
516
                    task = self.inqueue.get_nowait()
517
                    # FIXME: decode unicode_obj should used after data selete from
518
                    # database, it's used here for performance
519
                    task = utils.decode_unicode_obj(task)
520
                    self.fetch(task)
521
                except queue.Empty:
522
                    break
523
                except KeyboardInterrupt:
524
                    break
525
                except Exception as e:
526
                    logger.exception(e)
527
                    break
528
529
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
530
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
531
        self._running = True
532
533
        try:
534
            self.ioloop.start()
535
        except KeyboardInterrupt:
536
            pass
537
538
        logger.info("fetcher exiting...")
539
540
    def quit(self):
541
        '''Quit fetcher'''
542
        self._running = False
543
        self._quit = True
544
        self.ioloop.stop()
545
546
    def size(self):
547
        return self.http_client.size()
548
549
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
550
        '''Run xmlrpc server'''
551
        import umsgpack
552
        try:
553
            from xmlrpc.server import SimpleXMLRPCServer
554
            from xmlrpc.client import Binary
555
        except ImportError:
556
            from SimpleXMLRPCServer import SimpleXMLRPCServer
557
            from xmlrpclib import Binary
558
559
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
560
        server.register_introspection_functions()
561
        server.register_multicall_functions()
562
563
        server.register_function(self.quit, '_quit')
564
        server.register_function(self.size)
565
566
        def sync_fetch(task):
567
            result = self.sync_fetch(task)
568
            result = Binary(umsgpack.packb(result))
569
            return result
570
        server.register_function(sync_fetch, 'fetch')
571
572
        def dump_counter(_time, _type):
573
            return self._cnt[_time].to_dict(_type)
574
        server.register_function(dump_counter, 'counter')
575
576
        server.timeout = 0.5
577
        while not self._quit:
578
            server.handle_request()
579
        server.server_close()
580
581
    def on_fetch(self, type, task):
582
        '''Called before task fetch'''
583
        pass
584
585
    def on_result(self, type, task, result):
586
        '''Called after task fetched'''
587
        status_code = result.get('status_code', 599)
588
        if status_code != 599:
589
            status_code = (int(status_code) / 100 * 100)
590
        self._cnt['5m'].event((task.get('project'), status_code), +1)
591
        self._cnt['1h'].event((task.get('project'), status_code), +1)
592
593
        if type == 'http' and result.get('time'):
594
            content_len = len(result.get('content', ''))
595
            self._cnt['5m'].event((task.get('project'), 'speed'),
596
                                  float(content_len) / result.get('time'))
597
            self._cnt['1h'].event((task.get('project'), 'speed'),
598
                                  float(content_len) / result.get('time'))
599
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
600
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
601