Completed
Push — master ( a019cb...c89357 )
by Roy
01:11
created

pyspider.fetcher.Fetcher   F

Complexity

Total Complexity 104

Size/Duplication

Total Lines 474
Duplicated Lines 0 %
Metric Value
dl 0
loc 474
rs 1.5789
wmc 104

19 Methods

Rating   Name   Duplication   Size   Complexity  
A send_result() 0 7 3
A callback() 0 7 1
A fetch() 0 5 2
A __init__() 0 20 3
A async_fetch() 0 11 4
F http_fetch() 0 95 20
F pack_tornado_request_parameters() 0 69 26
B data_fetch() 0 25 2
A handle_error() 0 15 1
A quit() 0 5 1
B xmlrpc_run() 0 31 5
F run() 0 35 11
A dump_counter() 0 2 1
C queue_loop() 0 21 9
A size() 0 2 1
A on_result() 0 16 4
F phantomjs_fetch() 0 84 15
A sync_fetch() 0 22 1
A on_fetch() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like pyspider.fetcher.Fetcher often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from requests import cookies
24
from six.moves.urllib.parse import urljoin, urlsplit
25
from tornado import gen
26
from tornado.curl_httpclient import CurlAsyncHTTPClient
27
from tornado.simple_httpclient import SimpleAsyncHTTPClient
28
from pyspider.libs import utils, dataurl, counter
29
from .cookie_utils import extract_cookies_to_jar
30
logger = logging.getLogger('fetcher')
31
32
33
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
34
35
    def free_size(self):
36
        return len(self._free_list)
37
38
    def size(self):
39
        return len(self._curls) - self.free_size()
40
41
42
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
43
44
    def free_size(self):
45
        return self.max_clients - self.size()
46
47
    def size(self):
48
        return len(self.active)
49
50
fetcher_output = {
51
    "status_code": int,
52
    "orig_url": str,
53
    "url": str,
54
    "headers": dict,
55
    "content": str,
56
    "cookies": dict,
57
}
58
59
60
class Fetcher(object):
61
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
62
    default_options = {
63
        'method': 'GET',
64
        'headers': {
65
        },
66
        'use_gzip': True,
67
        'timeout': 120,
68
    }
69
    phantomjs_proxy = None
70
71
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
72
        self.inqueue = inqueue
73
        self.outqueue = outqueue
74
75
        self.poolsize = poolsize
76
        self._running = False
77
        self._quit = False
78
        self.proxy = proxy
79
        self.async = async
80
        self.ioloop = tornado.ioloop.IOLoop()
81
82
        # binding io_loop to http_client here
83
        self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
84
                                                 io_loop=self.ioloop)
85
86
        self._cnt = {
87
            '5m': counter.CounterManager(
88
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
89
            '1h': counter.CounterManager(
90
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
91
        }
92
93
    def send_result(self, type, task, result):
94
        '''Send fetch result to processor'''
95
        if self.outqueue:
96
            try:
97
                self.outqueue.put((task, result))
98
            except Exception as e:
99
                logger.exception(e)
100
101
    def fetch(self, task, callback=None):
102
        if self.async:
103
            return self.async_fetch(task, callback)
104
        else:
105
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, callback))
106
107
    def async_fetch(self, task, callback=None):
108
        '''Do one fetch'''
109
        url = task.get('url', 'data:,')
110
        if callback is None:
111
            callback = self.send_result
112
        if url.startswith('data:'):
113
            return gen.maybe_future(self.data_fetch(url, task, callback))
114
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
115
            return gen.maybe_future(self.phantomjs_fetch(url, task, callback))
116
        else:
117
            return gen.maybe_future(self.http_fetch(url, task, callback))
118
119
    def sync_fetch(self, task):
120
        '''Synchronization fetch, usually used in xmlrpc thread'''
121
        if not self._running:
122
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
123
124
        wait_result = threading.Condition()
125
        _result = {}
126
127
        def callback(type, task, result):
128
            wait_result.acquire()
129
            _result['type'] = type
130
            _result['task'] = task
131
            _result['result'] = result
132
            wait_result.notify()
133
            wait_result.release()
134
135
        wait_result.acquire()
136
        self.fetch(task, callback=callback)
137
        while 'result' not in _result:
138
            wait_result.wait()
139
        wait_result.release()
140
        return _result['result']
141
142
    def data_fetch(self, url, task, callback):
143
        '''A fake fetcher for dataurl'''
144
        self.on_fetch('data', task)
145
        result = {}
146
        result['orig_url'] = url
147
        result['content'] = dataurl.decode(url)
148
        result['headers'] = {}
149
        result['status_code'] = 200
150
        result['url'] = url
151
        result['cookies'] = {}
152
        result['time'] = 0
153
        result['save'] = task.get('fetch', {}).get('save')
154
        if len(result['content']) < 70:
155
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
156
        else:
157
            logger.info(
158
                "[200] %s:%s data:,%s...[content:%d] 0s",
159
                task.get('project'), task.get('taskid'),
160
                result['content'][:70],
161
                len(result['content'])
162
            )
163
164
        callback('data', task, result)
165
        self.on_result('data', task, result)
166
        return task, result
167
168
    def handle_error(self, type, url, task, start_time, callback, error):
169
        result = {
170
            'status_code': getattr(error, 'code', 599),
171
            'error': utils.text(error),
172
            'content': "",
173
            'time': time.time() - start_time,
174
            'orig_url': url,
175
            'url': url,
176
        }
177
        logger.error("[%d] %s:%s %s, %r %.2fs",
178
                     result['status_code'], task.get('project'), task.get('taskid'),
179
                     url, error, result['time'])
180
        callback(type, task, result)
181
        self.on_result(type, task, result)
182
        return task, result
183
184
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
185
186
    def pack_tornado_request_parameters(self, url, task):
187
        fetch = copy.deepcopy(self.default_options)
188
        fetch['url'] = url
189
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
190
        fetch['headers']['User-Agent'] = self.user_agent
191
        task_fetch = task.get('fetch', {})
192
        for each in self.allowed_options:
193
            if each in task_fetch:
194
                fetch[each] = task_fetch[each]
195
        fetch['headers'].update(task_fetch.get('headers', {}))
196
197
        if task.get('track'):
198
            track_headers = tornado.httputil.HTTPHeaders(
199
                task.get('track', {}).get('fetch', {}).get('headers') or {})
200
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
201
        else:
202
            track_headers = {}
203
            track_ok = False
204
        # proxy
205
        proxy_string = None
206
        if isinstance(task_fetch.get('proxy'), six.string_types):
207
            proxy_string = task_fetch['proxy']
208
        elif self.proxy and task_fetch.get('proxy', True):
209
            proxy_string = self.proxy
210
        if proxy_string:
211
            if '://' not in proxy_string:
212
                proxy_string = 'http://' + proxy_string
213
            proxy_splited = urlsplit(proxy_string)
214
            if proxy_splited.username:
215
                fetch['proxy_username'] = proxy_splited.username
216
                if six.PY2:
217
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
218
            if proxy_splited.password:
219
                fetch['proxy_password'] = proxy_splited.password
220
                if six.PY2:
221
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
222
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
223
            if six.PY2:
224
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
225
            fetch['proxy_port'] = proxy_splited.port or 8080
226
227
        # etag
228
        if task_fetch.get('etag', True):
229
            _t = None
230
            if isinstance(task_fetch.get('etag'), six.string_types):
231
                _t = task_fetch.get('etag')
232
            elif track_ok:
233
                _t = track_headers.get('etag')
234
            if _t and 'If-None-Match' not in fetch['headers']:
235
                fetch['headers']['If-None-Match'] = _t
236
        # last modifed
237
        if task_fetch.get('last_modified', True):
238
            _t = None
239
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
240
                _t = task_fetch.get('last_modifed')
241
            elif track_ok:
242
                _t = track_headers.get('last-modified')
243
            if _t and 'If-Modified-Since' not in fetch['headers']:
244
                fetch['headers']['If-Modified-Since'] = _t
245
        # timeout
246
        if 'timeout' in fetch:
247
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
248
            del fetch['timeout']
249
        # data rename to body
250
        if 'data' in fetch:
251
            fetch['body'] = fetch['data']
252
            del fetch['data']
253
254
        return fetch
255
256
    @gen.coroutine
257
    def http_fetch(self, url, task, callback):
258
        '''HTTP fetcher'''
259
        start_time = time.time()
260
261
        self.on_fetch('http', task)
262
263
        # setup request parameters
264
        fetch = self.pack_tornado_request_parameters(url, task)
265
        task_fetch = task.get('fetch', {})
266
267
        session = cookies.RequestsCookieJar()
268
        # fix for tornado request obj
269
        if 'Cookie' in fetch['headers']:
270
            c = http_cookies.SimpleCookie()
271
            try:
272
                c.load(fetch['headers']['Cookie'])
273
            except AttributeError:
274
                c.load(utils.utf8(fetch['headers']['Cookie']))
275
            for key in c:
276
                session.set(key, c[key])
277
            del fetch['headers']['Cookie']
278
        if 'cookies' in fetch:
279
            session.update(fetch['cookies'])
280
            del fetch['cookies']
281
282
        max_redirects = task_fetch.get('max_redirects', 5)
283
        # we will handle redirects by hand to capture cookies
284
        fetch['follow_redirects'] = False
285
286
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
287
288
        # making requests
289
        while True:
290
            try:
291
                request = tornado.httpclient.HTTPRequest(**fetch)
292
                cookie_header = cookies.get_cookie_header(session, request)
293
                if cookie_header:
294
                    request.headers['Cookie'] = cookie_header
295
            except Exception as e:
296
                logger.exception(fetch)
297
                raise gen.Return(handle_error(e))
298
299
            try:
300
                response = yield self.http_client.fetch(request)
301
            except tornado.httpclient.HTTPError as e:
302
                if e.response:
303
                    response = e.response
304
                else:
305
                    raise gen.Return(handle_error(e))
306
307
            extract_cookies_to_jar(session, response.request, response.headers)
308
            if (response.code in (301, 302, 303, 307)
309
                    and response.headers.get('Location')
310
                    and task_fetch.get('allow_redirects', True)):
311
                if max_redirects <= 0:
312
                    error = tornado.httpclient.HTTPError(
313
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
314
                        response)
315
                    raise gen.Return(handle_error(error))
316
                if response.code in (302, 303):
317
                    fetch['method'] = 'GET'
318
                    if 'body' in fetch:
319
                        del fetch['body']
320
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
321
                fetch['request_timeout'] -= time.time() - start_time
322
                if fetch['request_timeout'] < 0:
323
                    fetch['request_timeout'] = 0.1
324
                fetch['connect_timeout'] = fetch['request_timeout']
325
                max_redirects -= 1
326
                continue
327
328
            result = {}
329
            result['orig_url'] = url
330
            result['content'] = response.body or ''
331
            result['headers'] = dict(response.headers)
332
            result['status_code'] = response.code
333
            result['url'] = response.effective_url or url
334
            result['cookies'] = session.get_dict()
335
            result['time'] = time.time() - start_time
336
            result['save'] = task_fetch.get('save')
337
            if response.error:
338
                result['error'] = utils.text(response.error)
339
            if 200 <= response.code < 300:
340
                logger.info("[%d] %s:%s %s %.2fs", response.code,
341
                            task.get('project'), task.get('taskid'),
342
                            url, result['time'])
343
            else:
344
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
345
                               task.get('project'), task.get('taskid'),
346
                               url, result['time'])
347
348
            callback('http', task, result)
349
            self.on_result('http', task, result)
350
            raise gen.Return((task, result))
351
352
    @gen.coroutine
353
    def phantomjs_fetch(self, url, task, callback):
354
        '''Fetch with phantomjs proxy'''
355
        start_time = time.time()
356
357
        self.on_fetch('phantomjs', task)
358
359
        # check phantomjs proxy is enabled
360
        if not self.phantomjs_proxy:
361
            result = {
362
                "orig_url": url,
363
                "content": "phantomjs is not enabled.",
364
                "headers": {},
365
                "status_code": 501,
366
                "url": url,
367
                "cookies": {},
368
                "time": 0,
369
                "save": task.get('fetch', {}).get('save')
370
            }
371
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
372
            callback('http', task, result)
373
            self.on_result('http', task, result)
374
            raise gen.Return((task, result))
375
376
        # setup request parameters
377
        fetch = self.pack_tornado_request_parameters(url, task)
378
        task_fetch = task.get('fetch', {})
379
        for each in task_fetch:
380
            if each not in fetch:
381
                fetch[each] = task_fetch[each]
382
383
        request_conf = {
384
            'follow_redirects': False
385
        }
386
        if 'timeout' in task_fetch:
387
            request_conf['connect_timeout'] = task_fetch['timeout']
388
            request_conf['request_timeout'] = task_fetch['timeout'] + 1
389
390
        session = cookies.RequestsCookieJar()
391
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
392
        if fetch.get('cookies'):
393
            session.update(fetch['cookies'])
394
            if 'Cookie' in request.headers:
395
                del request.headers['Cookie']
396
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
397
398
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
399
400
        # making requests
401
        fetch['headers'] = dict(fetch['headers'])
402
        try:
403
            request = tornado.httpclient.HTTPRequest(
404
                url="%s" % self.phantomjs_proxy, method="POST",
405
                body=json.dumps(fetch), **request_conf)
406
        except Exception as e:
407
            raise gen.Return(handle_error(e))
408
409
        try:
410
            response = yield self.http_client.fetch(request)
411
        except tornado.httpclient.HTTPError as e:
412
            if e.response:
413
                response = e.response
414
415
        if not response.body:
416
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
417
418
        try:
419
            result = json.loads(utils.text(response.body))
420
            if response.error:
421
                result['error'] = utils.text(response.error)
422
        except Exception as e:
423
            raise gen.Return(handle_error(e))
424
425
        if result.get('status_code', 200):
426
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
427
                        task.get('project'), task.get('taskid'), url, result['time'])
428
        else:
429
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
430
                         task.get('project'), task.get('taskid'),
431
                         url, result['content'], result['time'])
432
433
        callback('phantomjs', task, result)
434
        self.on_result('phantomjs', task, result)
435
        raise gen.Return((task, result))
436
437
    def run(self):
438
        '''Run loop'''
439
        logger.info("fetcher starting...")
440
441
        def queue_loop():
442
            if not self.outqueue or not self.inqueue:
443
                return
444
            while not self._quit:
445
                try:
446
                    if self.outqueue.full():
447
                        break
448
                    if self.http_client.free_size() <= 0:
449
                        break
450
                    task = self.inqueue.get_nowait()
451
                    # FIXME: decode unicode_obj should used after data selete from
452
                    # database, it's used here for performance
453
                    task = utils.decode_unicode_obj(task)
454
                    self.fetch(task)
455
                except queue.Empty:
456
                    break
457
                except KeyboardInterrupt:
458
                    break
459
                except Exception as e:
460
                    logger.exception(e)
461
                    break
462
463
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
464
        self._running = True
465
466
        try:
467
            self.ioloop.start()
468
        except KeyboardInterrupt:
469
            pass
470
471
        logger.info("fetcher exiting...")
472
473
    def quit(self):
474
        '''Quit fetcher'''
475
        self._running = False
476
        self._quit = True
477
        self.ioloop.stop()
478
479
    def size(self):
480
        return self.http_client.size()
481
482
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
483
        '''Run xmlrpc server'''
484
        import umsgpack
485
        try:
486
            from xmlrpc.server import SimpleXMLRPCServer
487
            from xmlrpc.client import Binary
488
        except ImportError:
489
            from SimpleXMLRPCServer import SimpleXMLRPCServer
490
            from xmlrpclib import Binary
491
492
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
493
        server.register_introspection_functions()
494
        server.register_multicall_functions()
495
496
        server.register_function(self.quit, '_quit')
497
        server.register_function(self.size)
498
499
        def sync_fetch(task):
500
            result = self.sync_fetch(task)
501
            result = Binary(umsgpack.packb(result))
502
            return result
503
        server.register_function(sync_fetch, 'fetch')
504
505
        def dump_counter(_time, _type):
506
            return self._cnt[_time].to_dict(_type)
507
        server.register_function(dump_counter, 'counter')
508
509
        server.timeout = 0.5
510
        while not self._quit:
511
            server.handle_request()
512
        server.server_close()
513
514
    def on_fetch(self, type, task):
515
        '''Called before task fetch'''
516
        pass
517
518
    def on_result(self, type, task, result):
519
        '''Called after task fetched'''
520
        status_code = result.get('status_code', 599)
521
        if status_code != 599:
522
            status_code = (int(status_code) / 100 * 100)
523
        self._cnt['5m'].event((task.get('project'), status_code), +1)
524
        self._cnt['1h'].event((task.get('project'), status_code), +1)
525
526
        if type == 'http' and result.get('time'):
527
            content_len = len(result.get('content', ''))
528
            self._cnt['5m'].event((task.get('project'), 'speed'),
529
                                  float(content_len) / result.get('time'))
530
            self._cnt['1h'].event((task.get('project'), 'speed'),
531
                                  float(content_len) / result.get('time'))
532
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
533
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
534