Completed
Push — master ( ba437b...1aa254 )
by Roy
01:07
created

pack_tornado_request_parameters()   F

Complexity

Conditions 26

Size

Total Lines 69

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 26
dl 0
loc 69
rs 2.7193

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pyspider.fetcher.Fetcher.pack_tornado_request_parameters() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
from pyspider.libs import utils, dataurl, counter
30
from .cookie_utils import extract_cookies_to_jar
31
logger = logging.getLogger('fetcher')
32
33
34
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
35
36
    def free_size(self):
37
        return len(self._free_list)
38
39
    def size(self):
40
        return len(self._curls) - self.free_size()
41
42
43
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
44
45
    def free_size(self):
46
        return self.max_clients - self.size()
47
48
    def size(self):
49
        return len(self.active)
50
51
fetcher_output = {
52
    "status_code": int,
53
    "orig_url": str,
54
    "url": str,
55
    "headers": dict,
56
    "content": str,
57
    "cookies": dict,
58
}
59
60
61
class Fetcher(object):
62
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
63
    default_options = {
64
        'method': 'GET',
65
        'headers': {
66
        },
67
        'use_gzip': True,
68
        'timeout': 120,
69
    }
70
    phantomjs_proxy = None
71
    robot_txt_age = 60*60  # 1h
72
73
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
74
        self.inqueue = inqueue
75
        self.outqueue = outqueue
76
77
        self.poolsize = poolsize
78
        self._running = False
79
        self._quit = False
80
        self.proxy = proxy
81
        self.async = async
82
        self.ioloop = tornado.ioloop.IOLoop()
83
84
        self.robots_txt_cache = {}
85
86
        # binding io_loop to http_client here
87
        self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
88
                                                 io_loop=self.ioloop)
89
90
        self._cnt = {
91
            '5m': counter.CounterManager(
92
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
93
            '1h': counter.CounterManager(
94
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
95
        }
96
97
    def send_result(self, type, task, result):
98
        '''Send fetch result to processor'''
99
        if self.outqueue:
100
            try:
101
                self.outqueue.put((task, result))
102
            except Exception as e:
103
                logger.exception(e)
104
105
    def fetch(self, task, callback=None):
106
        if self.async:
107
            return self.async_fetch(task, callback)
108
        else:
109
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, callback))
110
111
    def async_fetch(self, task, callback=None):
112
        '''Do one fetch'''
113
        url = task.get('url', 'data:,')
114
        if callback is None:
115
            callback = self.send_result
116
        if url.startswith('data:'):
117
            return gen.maybe_future(self.data_fetch(url, task, callback))
118
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
119
            return gen.maybe_future(self.phantomjs_fetch(url, task, callback))
120
        else:
121
            return gen.maybe_future(self.http_fetch(url, task, callback))
122
123
    def sync_fetch(self, task):
124
        '''Synchronization fetch, usually used in xmlrpc thread'''
125
        if not self._running:
126
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
127
128
        wait_result = threading.Condition()
129
        _result = {}
130
131
        def callback(type, task, result):
132
            wait_result.acquire()
133
            _result['type'] = type
134
            _result['task'] = task
135
            _result['result'] = result
136
            wait_result.notify()
137
            wait_result.release()
138
139
        wait_result.acquire()
140
        self.fetch(task, callback=callback)
141
        while 'result' not in _result:
142
            wait_result.wait()
143
        wait_result.release()
144
        return _result['result']
145
146
    def data_fetch(self, url, task, callback):
147
        '''A fake fetcher for dataurl'''
148
        self.on_fetch('data', task)
149
        result = {}
150
        result['orig_url'] = url
151
        result['content'] = dataurl.decode(url)
152
        result['headers'] = {}
153
        result['status_code'] = 200
154
        result['url'] = url
155
        result['cookies'] = {}
156
        result['time'] = 0
157
        result['save'] = task.get('fetch', {}).get('save')
158
        if len(result['content']) < 70:
159
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
160
        else:
161
            logger.info(
162
                "[200] %s:%s data:,%s...[content:%d] 0s",
163
                task.get('project'), task.get('taskid'),
164
                result['content'][:70],
165
                len(result['content'])
166
            )
167
168
        callback('data', task, result)
169
        self.on_result('data', task, result)
170
        return task, result
171
172
    def handle_error(self, type, url, task, start_time, callback, error):
173
        result = {
174
            'status_code': getattr(error, 'code', 599),
175
            'error': utils.text(error),
176
            'content': "",
177
            'time': time.time() - start_time,
178
            'orig_url': url,
179
            'url': url,
180
        }
181
        logger.error("[%d] %s:%s %s, %r %.2fs",
182
                     result['status_code'], task.get('project'), task.get('taskid'),
183
                     url, error, result['time'])
184
        callback(type, task, result)
185
        self.on_result(type, task, result)
186
        return task, result
187
188
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
189
190
    def pack_tornado_request_parameters(self, url, task):
191
        fetch = copy.deepcopy(self.default_options)
192
        fetch['url'] = url
193
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
194
        fetch['headers']['User-Agent'] = self.user_agent
195
        task_fetch = task.get('fetch', {})
196
        for each in self.allowed_options:
197
            if each in task_fetch:
198
                fetch[each] = task_fetch[each]
199
        fetch['headers'].update(task_fetch.get('headers', {}))
200
201
        if task.get('track'):
202
            track_headers = tornado.httputil.HTTPHeaders(
203
                task.get('track', {}).get('fetch', {}).get('headers') or {})
204
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
205
        else:
206
            track_headers = {}
207
            track_ok = False
208
        # proxy
209
        proxy_string = None
210
        if isinstance(task_fetch.get('proxy'), six.string_types):
211
            proxy_string = task_fetch['proxy']
212
        elif self.proxy and task_fetch.get('proxy', True):
213
            proxy_string = self.proxy
214
        if proxy_string:
215
            if '://' not in proxy_string:
216
                proxy_string = 'http://' + proxy_string
217
            proxy_splited = urlsplit(proxy_string)
218
            if proxy_splited.username:
219
                fetch['proxy_username'] = proxy_splited.username
220
                if six.PY2:
221
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
222
            if proxy_splited.password:
223
                fetch['proxy_password'] = proxy_splited.password
224
                if six.PY2:
225
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
226
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
227
            if six.PY2:
228
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
229
            fetch['proxy_port'] = proxy_splited.port or 8080
230
231
        # etag
232
        if task_fetch.get('etag', True):
233
            _t = None
234
            if isinstance(task_fetch.get('etag'), six.string_types):
235
                _t = task_fetch.get('etag')
236
            elif track_ok:
237
                _t = track_headers.get('etag')
238
            if _t and 'If-None-Match' not in fetch['headers']:
239
                fetch['headers']['If-None-Match'] = _t
240
        # last modifed
241
        if task_fetch.get('last_modified', True):
242
            _t = None
243
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
244
                _t = task_fetch.get('last_modifed')
245
            elif track_ok:
246
                _t = track_headers.get('last-modified')
247
            if _t and 'If-Modified-Since' not in fetch['headers']:
248
                fetch['headers']['If-Modified-Since'] = _t
249
        # timeout
250
        if 'timeout' in fetch:
251
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
252
            del fetch['timeout']
253
        # data rename to body
254
        if 'data' in fetch:
255
            fetch['body'] = fetch['data']
256
            del fetch['data']
257
258
        return fetch
259
260
    @gen.coroutine
261
    def can_fetch(self, user_agent, url):
262
        parsed = urlsplit(url)
263
        domain = parsed.netloc
264
        if domain in self.robots_txt_cache:
265
            robot_txt = self.robots_txt_cache[domain]
266
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
267
                robot_txt = None
268
        else:
269
            robot_txt = None
270
271
        if robot_txt is None:
272
            robot_txt = RobotFileParser()
273
            try:
274
                response = yield self.http_client.fetch(urljoin(url, '/robots.txt'),
275
                                                        connect_timeout=10, request_timeout=30)
276
                content = response.body
277
            except tornado.httpclient.HTTPError as e:
278
                logger.error('load robots.txt from %s error: %r', domain, e)
279
                content = ''
280
281
            robot_txt.parse(content.splitlines())
282
            self.robots_txt_cache[domain] = robot_txt
283
284
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
285
286
    def clear_robot_txt_cache(self):
287
        now = time.time()
288
        for domain, robot_txt in self.robots_txt_cache.items():
289
            if now - robot_txt.mtime() > self.robot_txt_age:
290
                del self.robots_txt_cache[domain]
291
292
    @gen.coroutine
293
    def http_fetch(self, url, task, callback):
294
        '''HTTP fetcher'''
295
        start_time = time.time()
296
297
        self.on_fetch('http', task)
298
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
299
300
        # setup request parameters
301
        fetch = self.pack_tornado_request_parameters(url, task)
302
        task_fetch = task.get('fetch', {})
303
304
        session = cookies.RequestsCookieJar()
305
        # fix for tornado request obj
306
        if 'Cookie' in fetch['headers']:
307
            c = http_cookies.SimpleCookie()
308
            try:
309
                c.load(fetch['headers']['Cookie'])
310
            except AttributeError:
311
                c.load(utils.utf8(fetch['headers']['Cookie']))
312
            for key in c:
313
                session.set(key, c[key])
314
            del fetch['headers']['Cookie']
315
        if 'cookies' in fetch:
316
            session.update(fetch['cookies'])
317
            del fetch['cookies']
318
319
        max_redirects = task_fetch.get('max_redirects', 5)
320
        # we will handle redirects by hand to capture cookies
321
        fetch['follow_redirects'] = False
322
323
        # making requests
324
        while True:
325
            # robots.txt
326
            if task_fetch.get('robots_txt', False):
327
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
328
                if not can_fetch:
329
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
330
                    raise gen.Return(handle_error(error))
331
332
            try:
333
                request = tornado.httpclient.HTTPRequest(**fetch)
334
                cookie_header = cookies.get_cookie_header(session, request)
335
                if cookie_header:
336
                    request.headers['Cookie'] = cookie_header
337
            except Exception as e:
338
                logger.exception(fetch)
339
                raise gen.Return(handle_error(e))
340
341
            try:
342
                response = yield self.http_client.fetch(request)
343
            except tornado.httpclient.HTTPError as e:
344
                if e.response:
345
                    response = e.response
346
                else:
347
                    raise gen.Return(handle_error(e))
348
349
            extract_cookies_to_jar(session, response.request, response.headers)
350
            if (response.code in (301, 302, 303, 307)
351
                    and response.headers.get('Location')
352
                    and task_fetch.get('allow_redirects', True)):
353
                if max_redirects <= 0:
354
                    error = tornado.httpclient.HTTPError(
355
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
356
                        response)
357
                    raise gen.Return(handle_error(error))
358
                if response.code in (302, 303):
359
                    fetch['method'] = 'GET'
360
                    if 'body' in fetch:
361
                        del fetch['body']
362
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
363
                fetch['request_timeout'] -= time.time() - start_time
364
                if fetch['request_timeout'] < 0:
365
                    fetch['request_timeout'] = 0.1
366
                fetch['connect_timeout'] = fetch['request_timeout']
367
                max_redirects -= 1
368
                continue
369
370
            result = {}
371
            result['orig_url'] = url
372
            result['content'] = response.body or ''
373
            result['headers'] = dict(response.headers)
374
            result['status_code'] = response.code
375
            result['url'] = response.effective_url or url
376
            result['cookies'] = session.get_dict()
377
            result['time'] = time.time() - start_time
378
            result['save'] = task_fetch.get('save')
379
            if response.error:
380
                result['error'] = utils.text(response.error)
381
            if 200 <= response.code < 300:
382
                logger.info("[%d] %s:%s %s %.2fs", response.code,
383
                            task.get('project'), task.get('taskid'),
384
                            url, result['time'])
385
            else:
386
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
387
                               task.get('project'), task.get('taskid'),
388
                               url, result['time'])
389
390
            callback('http', task, result)
391
            self.on_result('http', task, result)
392
            raise gen.Return((task, result))
393
394
    @gen.coroutine
395
    def phantomjs_fetch(self, url, task, callback):
396
        '''Fetch with phantomjs proxy'''
397
        start_time = time.time()
398
399
        self.on_fetch('phantomjs', task)
400
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
401
402
        # check phantomjs proxy is enabled
403
        if not self.phantomjs_proxy:
404
            result = {
405
                "orig_url": url,
406
                "content": "phantomjs is not enabled.",
407
                "headers": {},
408
                "status_code": 501,
409
                "url": url,
410
                "cookies": {},
411
                "time": 0,
412
                "save": task.get('fetch', {}).get('save')
413
            }
414
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
415
            callback('http', task, result)
416
            self.on_result('http', task, result)
417
            raise gen.Return((task, result))
418
419
        # setup request parameters
420
        fetch = self.pack_tornado_request_parameters(url, task)
421
        task_fetch = task.get('fetch', {})
422
        for each in task_fetch:
423
            if each not in fetch:
424
                fetch[each] = task_fetch[each]
425
426
        # robots.txt
427
        if task_fetch.get('robots_txt', False):
428
            user_agent = fetch['headers']['User-Agent']
429
            can_fetch = yield self.can_fetch(user_agent, url)
430
            if not can_fetch:
431
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
432
                raise gen.Return(handle_error(error))
433
434
        request_conf = {
435
            'follow_redirects': False
436
        }
437
        if 'timeout' in task_fetch:
438
            request_conf['connect_timeout'] = task_fetch['timeout']
439
            request_conf['request_timeout'] = task_fetch['timeout'] + 1
440
441
        session = cookies.RequestsCookieJar()
442
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
443
        if fetch.get('cookies'):
444
            session.update(fetch['cookies'])
445
            if 'Cookie' in request.headers:
446
                del request.headers['Cookie']
447
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
448
449
        # making requests
450
        fetch['headers'] = dict(fetch['headers'])
451
        try:
452
            request = tornado.httpclient.HTTPRequest(
453
                url="%s" % self.phantomjs_proxy, method="POST",
454
                body=json.dumps(fetch), **request_conf)
455
        except Exception as e:
456
            raise gen.Return(handle_error(e))
457
458
        try:
459
            response = yield self.http_client.fetch(request)
460
        except tornado.httpclient.HTTPError as e:
461
            if e.response:
462
                response = e.response
463
464
        if not response.body:
465
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
466
467
        try:
468
            result = json.loads(utils.text(response.body))
469
            if response.error:
470
                result['error'] = utils.text(response.error)
471
        except Exception as e:
472
            raise gen.Return(handle_error(e))
473
474
        if result.get('status_code', 200):
475
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
476
                        task.get('project'), task.get('taskid'), url, result['time'])
477
        else:
478
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
479
                         task.get('project'), task.get('taskid'),
480
                         url, result['content'], result['time'])
481
482
        callback('phantomjs', task, result)
483
        self.on_result('phantomjs', task, result)
484
        raise gen.Return((task, result))
485
486
    def run(self):
487
        '''Run loop'''
488
        logger.info("fetcher starting...")
489
490
        def queue_loop():
491
            if not self.outqueue or not self.inqueue:
492
                return
493
            while not self._quit:
494
                try:
495
                    if self.outqueue.full():
496
                        break
497
                    if self.http_client.free_size() <= 0:
498
                        break
499
                    task = self.inqueue.get_nowait()
500
                    # FIXME: decode unicode_obj should used after data selete from
501
                    # database, it's used here for performance
502
                    task = utils.decode_unicode_obj(task)
503
                    self.fetch(task)
504
                except queue.Empty:
505
                    break
506
                except KeyboardInterrupt:
507
                    break
508
                except Exception as e:
509
                    logger.exception(e)
510
                    break
511
512
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
513
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
514
        self._running = True
515
516
        try:
517
            self.ioloop.start()
518
        except KeyboardInterrupt:
519
            pass
520
521
        logger.info("fetcher exiting...")
522
523
    def quit(self):
524
        '''Quit fetcher'''
525
        self._running = False
526
        self._quit = True
527
        self.ioloop.stop()
528
529
    def size(self):
530
        return self.http_client.size()
531
532
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
533
        '''Run xmlrpc server'''
534
        import umsgpack
535
        try:
536
            from xmlrpc.server import SimpleXMLRPCServer
537
            from xmlrpc.client import Binary
538
        except ImportError:
539
            from SimpleXMLRPCServer import SimpleXMLRPCServer
540
            from xmlrpclib import Binary
541
542
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
543
        server.register_introspection_functions()
544
        server.register_multicall_functions()
545
546
        server.register_function(self.quit, '_quit')
547
        server.register_function(self.size)
548
549
        def sync_fetch(task):
550
            result = self.sync_fetch(task)
551
            result = Binary(umsgpack.packb(result))
552
            return result
553
        server.register_function(sync_fetch, 'fetch')
554
555
        def dump_counter(_time, _type):
556
            return self._cnt[_time].to_dict(_type)
557
        server.register_function(dump_counter, 'counter')
558
559
        server.timeout = 0.5
560
        while not self._quit:
561
            server.handle_request()
562
        server.server_close()
563
564
    def on_fetch(self, type, task):
565
        '''Called before task fetch'''
566
        pass
567
568
    def on_result(self, type, task, result):
569
        '''Called after task fetched'''
570
        status_code = result.get('status_code', 599)
571
        if status_code != 599:
572
            status_code = (int(status_code) / 100 * 100)
573
        self._cnt['5m'].event((task.get('project'), status_code), +1)
574
        self._cnt['1h'].event((task.get('project'), status_code), +1)
575
576
        if type == 'http' and result.get('time'):
577
            content_len = len(result.get('content', ''))
578
            self._cnt['5m'].event((task.get('project'), 'speed'),
579
                                  float(content_len) / result.get('time'))
580
            self._cnt['1h'].event((task.get('project'), 'speed'),
581
                                  float(content_len) / result.get('time'))
582
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
583
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
584