Completed
Push — ihipop-master ( 1c53cc )
by Roy
08:35 queued 07:05
created

pyspider.fetcher.Fetcher.handle_response()   C

Complexity

Conditions 10

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 10
dl 0
loc 21
rs 5.2414

How to fix   Complexity   

Complexity

Complex classes like pyspider.fetcher.Fetcher.handle_response() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import threading
16
import tornado.ioloop
17
import tornado.httputil
18
import tornado.httpclient
19
import pyspider
20
21
from six.moves import http_cookies
22
from requests import cookies
23
from six.moves.urllib.parse import urljoin, urlsplit
24
from tornado.curl_httpclient import CurlAsyncHTTPClient
25
from tornado.simple_httpclient import SimpleAsyncHTTPClient
26
from pyspider.libs import utils, dataurl, counter
27
from pyspider.libs.queue import Queue as queue
28
from .cookie_utils import extract_cookies_to_jar
29
logger = logging.getLogger('fetcher')
30
31
32
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
33
34
    def free_size(self):
35
        return len(self._free_list)
36
37
    def size(self):
38
        return len(self._curls) - self.free_size()
39
40
41
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
42
43
    def free_size(self):
44
        return self.max_clients - self.size()
45
46
    def size(self):
47
        return len(self.active)
48
49
fetcher_output = {
50
    "status_code": int,
51
    "orig_url": str,
52
    "url": str,
53
    "headers": dict,
54
    "content": str,
55
    "cookies": dict,
56
}
57
58
59
class Fetcher(object):
60
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
61
    default_options = {
62
        'method': 'GET',
63
        'headers': {
64
        },
65
        'use_gzip': True,
66
        'timeout': 120,
67
    }
68
    phantomjs_proxy = None
69
70
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
71
        self.inqueue = inqueue
72
        self.outqueue = outqueue
73
74
        self.poolsize = poolsize
75
        self._running = False
76
        self._quit = False
77
        self.proxy = proxy
78
        self.async = async
79
        self.ioloop = tornado.ioloop.IOLoop()
80
81
        # binding io_loop to http_client here
82
        if self.async:
83
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
84
                                                     io_loop=self.ioloop)
85
        else:
86
            self.http_client = tornado.httpclient.HTTPClient(
87
                MyCurlAsyncHTTPClient, max_clients=self.poolsize
88
            )
89
90
        self._cnt = {
91
            '5m': counter.CounterManager(
92
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
93
            '1h': counter.CounterManager(
94
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
95
        }
96
97
    def send_result(self, type, task, result):
98
        '''Send fetch result to processor'''
99
        if self.outqueue:
100
            try:
101
                self.outqueue.put((task, result))
102
            except Exception as e:
103
                logger.exception(e)
104
105
    def fetch(self, task, callback=None):
106
        '''Do one fetch'''
107
        url = task.get('url', 'data:,')
108
        if callback is None:
109
            callback = self.send_result
110
        if url.startswith('data:'):
111
            return self.data_fetch(url, task, callback)
112
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
113
            return self.phantomjs_fetch(url, task, callback)
114
        else:
115
            return self.http_fetch(url, task, callback)
116
117
    def sync_fetch(self, task):
118
        '''Synchronization fetch'''
119
        wait_result = threading.Condition()
120
        _result = {}
121
122
        def callback(type, task, result):
123
            wait_result.acquire()
124
            _result['type'] = type
125
            _result['task'] = task
126
            _result['result'] = result
127
            wait_result.notify()
128
            wait_result.release()
129
130
        wait_result.acquire()
131
        self.fetch(task, callback=callback)
132
        while 'result' not in _result:
133
            wait_result.wait()
134
        wait_result.release()
135
        return _result['result']
136
137
    def data_fetch(self, url, task, callback):
138
        '''A fake fetcher for dataurl'''
139
        self.on_fetch('data', task)
140
        result = {}
141
        result['orig_url'] = url
142
        result['content'] = dataurl.decode(url)
143
        result['headers'] = {}
144
        result['status_code'] = 200
145
        result['url'] = url
146
        result['cookies'] = {}
147
        result['time'] = 0
148
        result['save'] = task.get('fetch', {}).get('save')
149
        if len(result['content']) < 70:
150
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
151
        else:
152
            logger.info(
153
                "[200] %s:%s data:,%s...[content:%d] 0s",
154
                task.get('project'), task.get('taskid'),
155
                result['content'][:70],
156
                len(result['content'])
157
            )
158
159
        callback('data', task, result)
160
        self.on_result('data', task, result)
161
        return task, result
162
163
    def handle_error(self, type, url, task, start_time, callback, error):
164
        result = {
165
            'status_code': getattr(error, 'code', 599),
166
            'error': utils.text(error),
167
            'content': "",
168
            'time': time.time() - start_time,
169
            'orig_url': url,
170
            'url': url,
171
        }
172
        logger.error("[%d] %s:%s %s, %r %.2fs",
173
                     result['status_code'], task.get('project'), task.get('taskid'),
174
                     url, error, result['time'])
175
        callback(type, task, result)
176
        self.on_result(type, task, result)
177
        return task, result
178
179
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
180
181
    def http_fetch(self, url, task, callback):
182
        '''HTTP fetcher'''
183
        start_time = time.time()
184
185
        self.on_fetch('http', task)
186
        fetch = copy.deepcopy(self.default_options)
187
        fetch['url'] = url
188
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
189
        fetch['headers']['User-Agent'] = self.user_agent
190
        task_fetch = task.get('fetch', {})
191
        for each in self.allowed_options:
192
            if each in task_fetch:
193
                fetch[each] = task_fetch[each]
194
        fetch['headers'].update(task_fetch.get('headers', {}))
195
196
        if task.get('track'):
197
            track_headers = tornado.httputil.HTTPHeaders(
198
                task.get('track', {}).get('fetch', {}).get('headers') or {})
199
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
200
        else:
201
            track_headers = {}
202
            track_ok = False
203
        # proxy
204
        proxy_string = None
205
        if isinstance(task_fetch.get('proxy'), six.string_types):
206
            proxy_string = task_fetch['proxy']
207
        elif self.proxy and task_fetch.get('proxy', True):
208
            proxy_string = self.proxy
209
        if proxy_string:
210
            if '://' not in proxy_string:
211
                proxy_string = 'http://' + proxy_string
212
            proxy_splited = urlsplit(proxy_string)
213
            if proxy_splited.username:
214
                fetch['proxy_username'] = proxy_splited.username
215
                if six.PY2:
216
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
217
            if proxy_splited.password:
218
                fetch['proxy_password'] = proxy_splited.password
219
                if six.PY2:
220
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
221
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
222
            if six.PY2:
223
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
224
            fetch['proxy_port'] = proxy_splited.port or 8080
225
226
        # etag
227
        if task_fetch.get('etag', True):
228
            _t = None
229
            if isinstance(task_fetch.get('etag'), six.string_types):
230
                _t = task_fetch.get('etag')
231
            elif track_ok:
232
                _t = track_headers.get('etag')
233
            if _t:
234
                fetch['headers'].setdefault('If-None-Match', _t)
235
        # last modifed
236
        if task_fetch.get('last_modified', True):
237
            _t = None
238
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
239
                _t = task_fetch.get('last_modifed')
240
            elif track_ok:
241
                _t = track_headers.get('last-modified')
242
            if _t:
243
                fetch['headers'].setdefault('If-Modified-Since', _t)
244
245
        session = cookies.RequestsCookieJar()
246
247
        # fix for tornado request obj
248
        if 'Cookie' in fetch['headers']:
249
            c = http_cookies.SimpleCookie()
250
            try:
251
                c.load(fetch['headers']['Cookie'])
252
            except AttributeError:
253
                c.load(utils.utf8(fetch['headers']['Cookie']))
254
            for key in c:
255
                session.set(key, c[key])
256
            del fetch['headers']['Cookie']
257
        fetch['follow_redirects'] = False
258
        if 'timeout' in fetch:
259
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
260
            del fetch['timeout']
261
        if 'data' in fetch:
262
            fetch['body'] = fetch['data']
263
            del fetch['data']
264
        if 'cookies' in fetch:
265
            session.update(fetch['cookies'])
266
            del fetch['cookies']
267
268
        store = {}
269
        store['max_redirects'] = task_fetch.get('max_redirects', 5)
270
271
        def handle_response(response):
272
            extract_cookies_to_jar(session, response.request, response.headers)
273
            if (response.code in (301, 302, 303, 307)
274
                    and response.headers.get('Location')
275
                    and task_fetch.get('allow_redirects', True)):
276
                if store['max_redirects'] <= 0:
277
                    error = tornado.httpclient.HTTPError(
278
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
279
                        response)
280
                    return handle_error(error)
281
                if response.code in (302, 303):
282
                    fetch['method'] = 'GET'
283
                    if 'body' in fetch:
284
                        del fetch['body']
285
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
286
                fetch['request_timeout'] -= time.time() - start_time
287
                if fetch['request_timeout'] < 0:
288
                    fetch['request_timeout'] = 0.1
289
                fetch['connect_timeout'] = fetch['request_timeout']
290
                store['max_redirects'] -= 1
291
                return make_request(fetch)
292
293
            result = {}
294
            result['orig_url'] = url
295
            result['content'] = response.body or ''
296
            result['headers'] = dict(response.headers)
297
            result['status_code'] = response.code
298
            result['url'] = response.effective_url or url
299
            result['cookies'] = session.get_dict()
300
            result['time'] = time.time() - start_time
301
            result['save'] = task_fetch.get('save')
302
            if response.error:
303
                result['error'] = utils.text(response.error)
304
            if 200 <= response.code < 300:
305
                logger.info("[%d] %s:%s %s %.2fs", response.code,
306
                            task.get('project'), task.get('taskid'),
307
                            url, result['time'])
308
            else:
309
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
310
                               task.get('project'), task.get('taskid'),
311
                               url, result['time'])
312
            callback('http', task, result)
313
            self.on_result('http', task, result)
314
            return task, result
315
316
        handle_error = lambda x: self.handle_error('http',
317
                                                   url, task, start_time, callback, x)
318
319
        def make_request(fetch):
320
            try:
321
                request = tornado.httpclient.HTTPRequest(**fetch)
322
                cookie_header = cookies.get_cookie_header(session, request)
323
                if cookie_header:
324
                    request.headers['Cookie'] = cookie_header
325
                if self.async:
326
                    self.http_client.fetch(request, handle_response)
327
                else:
328
                    return handle_response(self.http_client.fetch(request))
329
            except tornado.httpclient.HTTPError as e:
330
                if e.response:
331
                    return handle_response(e.response)
332
                else:
333
                    return handle_error(e)
334
            except Exception as e:
335
                logger.exception(fetch)
336
                return handle_error(e)
337
338
        return make_request(fetch)
339
340
    def phantomjs_fetch(self, url, task, callback):
341
        '''Fetch with phantomjs proxy'''
342
        start_time = time.time()
343
344
        self.on_fetch('phantomjs', task)
345
        if not self.phantomjs_proxy:
346
            result = {
347
                "orig_url": url,
348
                "content": "phantomjs is not enabled.",
349
                "headers": {},
350
                "status_code": 501,
351
                "url": url,
352
                "cookies": {},
353
                "time": 0,
354
                "save": task.get('fetch', {}).get('save')
355
            }
356
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
357
            callback('http', task, result)
358
            self.on_result('http', task, result)
359
            return task, result
360
361
        request_conf = {
362
            'follow_redirects': False
363
        }
364
365
        fetch = copy.deepcopy(self.default_options)
366
        fetch['url'] = url
367
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
368
        fetch['headers']['User-Agent'] = self.user_agent
369
        task_fetch = task.get('fetch', {})
370
        for each in task_fetch:
371
            if each != 'headers':
372
                fetch[each] = task_fetch[each]
373
        fetch['headers'].update(task_fetch.get('headers', {}))
374
375
        if 'timeout' in fetch:
376
            request_conf['connect_timeout'] = fetch['timeout']
377
            request_conf['request_timeout'] = fetch['timeout'] + 1
378
379
        session = cookies.RequestsCookieJar()
380
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
381
        if fetch.get('cookies'):
382
            session.update(fetch['cookies'])
383
            if 'Cookie' in request.headers:
384
                del request.headers['Cookie']
385
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
386
387
        def handle_response(response):
388
            if not response.body:
389
                return handle_error(Exception('no response from phantomjs'))
390
391
            try:
392
                result = json.loads(utils.text(response.body))
393
                if response.error:
394
                    result['error'] = utils.text(response.error)
395
            except Exception as e:
396
                return handle_error(e)
397
398
            if result.get('status_code', 200):
399
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
400
                            task.get('project'), task.get('taskid'), url, result['time'])
401
            else:
402
                logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
403
                             task.get('project'), task.get('taskid'),
404
                             url, result['content'], result['time'])
405
            callback('phantomjs', task, result)
406
            self.on_result('phantomjs', task, result)
407
            return task, result
408
409
        handle_error = lambda x: self.handle_error('phantomjs',
410
                                                   url, task, start_time, callback, x)
411
412
        fetch['headers'] = dict(fetch['headers'])
413
        try:
414
            request = tornado.httpclient.HTTPRequest(
415
                url="%s" % self.phantomjs_proxy, method="POST",
416
                body=json.dumps(fetch), **request_conf)
417
            if self.async:
418
                self.http_client.fetch(request, handle_response)
419
            else:
420
                return handle_response(self.http_client.fetch(request))
421
        except tornado.httpclient.HTTPError as e:
422
            if e.response:
423
                return handle_response(e.response)
424
            else:
425
                return handle_error(e)
426
        except Exception as e:
427
            return handle_error(e)
428
429
    def run(self):
430
        '''Run loop'''
431
        logger.info("fetcher starting...")
432
433
        def queue_loop():
434
            if not self.outqueue or not self.inqueue:
435
                return
436
            while not self._quit:
437
                try:
438
                    if self.outqueue.full():
439
                        break
440
                    if self.http_client.free_size() <= 0:
441
                        break
442
                    task = self.inqueue.get_nowait()
443
                    # FIXME: decode unicode_obj should used after data selete from
444
                    # database, it's used here for performance
445
                    task = utils.decode_unicode_obj(task)
446
                    self.fetch(task)
447
                except queue.Empty:
448
                    break
449
                except KeyboardInterrupt:
450
                    break
451
                except Exception as e:
452
                    logger.exception(e)
453
                    break
454
455
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
456
        self._running = True
457
458
        try:
459
            self.ioloop.start()
460
        except KeyboardInterrupt:
461
            pass
462
463
        logger.info("fetcher exiting...")
464
465
    def quit(self):
466
        '''Quit fetcher'''
467
        self._running = False
468
        self._quit = True
469
        self.ioloop.stop()
470
471
    def size(self):
472
        return self.http_client.size()
473
474
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
475
        '''Run xmlrpc server'''
476
        import umsgpack
477
        try:
478
            from xmlrpc.server import SimpleXMLRPCServer
479
            from xmlrpc.client import Binary
480
        except ImportError:
481
            from SimpleXMLRPCServer import SimpleXMLRPCServer
482
            from xmlrpclib import Binary
483
484
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
485
        server.register_introspection_functions()
486
        server.register_multicall_functions()
487
488
        server.register_function(self.quit, '_quit')
489
        server.register_function(self.size)
490
491
        def sync_fetch(task):
492
            result = self.sync_fetch(task)
493
            result = Binary(umsgpack.packb(result))
494
            return result
495
        server.register_function(sync_fetch, 'fetch')
496
497
        def dump_counter(_time, _type):
498
            return self._cnt[_time].to_dict(_type)
499
        server.register_function(dump_counter, 'counter')
500
501
        server.timeout = 0.5
502
        while not self._quit:
503
            server.handle_request()
504
        server.server_close()
505
506
    def on_fetch(self, type, task):
507
        '''Called before task fetch'''
508
        pass
509
510
    def on_result(self, type, task, result):
511
        '''Called after task fetched'''
512
        status_code = result.get('status_code', 599)
513
        if status_code != 599:
514
            status_code = (int(status_code) / 100 * 100)
515
        self._cnt['5m'].event((task.get('project'), status_code), +1)
516
        self._cnt['1h'].event((task.get('project'), status_code), +1)
517
518
        if type == 'http' and result.get('time'):
519
            content_len = len(result.get('content', ''))
520
            self._cnt['5m'].event((task.get('project'), 'speed'),
521
                                  float(content_len) / result.get('time'))
522
            self._cnt['1h'].event((task.get('project'), 'speed'),
523
                                  float(content_len) / result.get('time'))
524
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
525
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
526