Completed
Push — master ( 1aa254...d5cc3f )
by Roy
01:59
created

pyspider.fetcher.Fetcher.phantomjs_fetch()   F

Complexity

Conditions 17

Size

Total Lines 91

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 17
dl 0
loc 91
rs 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pyspider.fetcher.Fetcher.phantomjs_fetch() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
from pyspider.libs import utils, dataurl, counter
30
from .cookie_utils import extract_cookies_to_jar
31
logger = logging.getLogger('fetcher')
32
33
34
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
35
36
    def free_size(self):
37
        return len(self._free_list)
38
39
    def size(self):
40
        return len(self._curls) - self.free_size()
41
42
43
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
44
45
    def free_size(self):
46
        return self.max_clients - self.size()
47
48
    def size(self):
49
        return len(self.active)
50
51
fetcher_output = {
52
    "status_code": int,
53
    "orig_url": str,
54
    "url": str,
55
    "headers": dict,
56
    "content": str,
57
    "cookies": dict,
58
}
59
60
61
class Fetcher(object):
62
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
63
    default_options = {
64
        'method': 'GET',
65
        'headers': {
66
        },
67
        'use_gzip': True,
68
        'timeout': 120,
69
    }
70
    phantomjs_proxy = None
71
    robot_txt_age = 60*60  # 1h
72
73
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
74
        self.inqueue = inqueue
75
        self.outqueue = outqueue
76
77
        self.poolsize = poolsize
78
        self._running = False
79
        self._quit = False
80
        self.proxy = proxy
81
        self.async = async
82
        self.ioloop = tornado.ioloop.IOLoop()
83
84
        self.robots_txt_cache = {}
85
86
        # binding io_loop to http_client here
87
        self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
88
                                                 io_loop=self.ioloop)
89
90
        self._cnt = {
91
            '5m': counter.CounterManager(
92
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
93
            '1h': counter.CounterManager(
94
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
95
        }
96
97
    def send_result(self, type, task, result):
98
        '''Send fetch result to processor'''
99
        if self.outqueue:
100
            try:
101
                self.outqueue.put((task, result))
102
            except Exception as e:
103
                logger.exception(e)
104
105
    def fetch(self, task, callback=None):
106
        if self.async:
107
            return self.async_fetch(task, callback)
108
        else:
109
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, callback))
110
111
    @gen.coroutine
112
    def async_fetch(self, task, callback=None):
113
        '''Do one fetch'''
114
        url = task.get('url', 'data:,')
115
        if callback is None:
116
            callback = self.send_result
117
118
        try:
119
            if url.startswith('data:'):
120
                ret = yield gen.maybe_future(self.data_fetch(url, task, callback))
121
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
122
                ret = yield self.phantomjs_fetch(url, task, callback)
123
            else:
124
                ret = yield self.http_fetch(url, task, callback)
125
        except Exception as e:
126
            logger.exception(e)
127
            raise e
128
129
        raise gen.Return(ret)
130
131
    def sync_fetch(self, task):
132
        '''Synchronization fetch, usually used in xmlrpc thread'''
133
        if not self._running:
134
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
135
136
        wait_result = threading.Condition()
137
        _result = {}
138
139
        def callback(type, task, result):
140
            wait_result.acquire()
141
            _result['type'] = type
142
            _result['task'] = task
143
            _result['result'] = result
144
            wait_result.notify()
145
            wait_result.release()
146
147
        wait_result.acquire()
148
        self.fetch(task, callback=callback)
149
        while 'result' not in _result:
150
            wait_result.wait()
151
        wait_result.release()
152
        return _result['result']
153
154
    def data_fetch(self, url, task, callback):
155
        '''A fake fetcher for dataurl'''
156
        self.on_fetch('data', task)
157
        result = {}
158
        result['orig_url'] = url
159
        result['content'] = dataurl.decode(url)
160
        result['headers'] = {}
161
        result['status_code'] = 200
162
        result['url'] = url
163
        result['cookies'] = {}
164
        result['time'] = 0
165
        result['save'] = task.get('fetch', {}).get('save')
166
        if len(result['content']) < 70:
167
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
168
        else:
169
            logger.info(
170
                "[200] %s:%s data:,%s...[content:%d] 0s",
171
                task.get('project'), task.get('taskid'),
172
                result['content'][:70],
173
                len(result['content'])
174
            )
175
176
        callback('data', task, result)
177
        self.on_result('data', task, result)
178
        return task, result
179
180
    def handle_error(self, type, url, task, start_time, callback, error):
181
        result = {
182
            'status_code': getattr(error, 'code', 599),
183
            'error': utils.text(error),
184
            'content': "",
185
            'time': time.time() - start_time,
186
            'orig_url': url,
187
            'url': url,
188
        }
189
        logger.error("[%d] %s:%s %s, %r %.2fs",
190
                     result['status_code'], task.get('project'), task.get('taskid'),
191
                     url, error, result['time'])
192
        callback(type, task, result)
193
        self.on_result(type, task, result)
194
        return task, result
195
196
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
197
198
    def pack_tornado_request_parameters(self, url, task):
199
        fetch = copy.deepcopy(self.default_options)
200
        fetch['url'] = url
201
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
202
        fetch['headers']['User-Agent'] = self.user_agent
203
        task_fetch = task.get('fetch', {})
204
        for each in self.allowed_options:
205
            if each in task_fetch:
206
                fetch[each] = task_fetch[each]
207
        fetch['headers'].update(task_fetch.get('headers', {}))
208
209
        if task.get('track'):
210
            track_headers = tornado.httputil.HTTPHeaders(
211
                task.get('track', {}).get('fetch', {}).get('headers') or {})
212
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
213
        else:
214
            track_headers = {}
215
            track_ok = False
216
        # proxy
217
        proxy_string = None
218
        if isinstance(task_fetch.get('proxy'), six.string_types):
219
            proxy_string = task_fetch['proxy']
220
        elif self.proxy and task_fetch.get('proxy', True):
221
            proxy_string = self.proxy
222
        if proxy_string:
223
            if '://' not in proxy_string:
224
                proxy_string = 'http://' + proxy_string
225
            proxy_splited = urlsplit(proxy_string)
226
            if proxy_splited.username:
227
                fetch['proxy_username'] = proxy_splited.username
228
                if six.PY2:
229
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
230
            if proxy_splited.password:
231
                fetch['proxy_password'] = proxy_splited.password
232
                if six.PY2:
233
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
234
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
235
            if six.PY2:
236
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
237
            fetch['proxy_port'] = proxy_splited.port or 8080
238
239
        # etag
240
        if task_fetch.get('etag', True):
241
            _t = None
242
            if isinstance(task_fetch.get('etag'), six.string_types):
243
                _t = task_fetch.get('etag')
244
            elif track_ok:
245
                _t = track_headers.get('etag')
246
            if _t and 'If-None-Match' not in fetch['headers']:
247
                fetch['headers']['If-None-Match'] = _t
248
        # last modifed
249
        if task_fetch.get('last_modified', True):
250
            _t = None
251
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
252
                _t = task_fetch.get('last_modifed')
253
            elif track_ok:
254
                _t = track_headers.get('last-modified')
255
            if _t and 'If-Modified-Since' not in fetch['headers']:
256
                fetch['headers']['If-Modified-Since'] = _t
257
        # timeout
258
        if 'timeout' in fetch:
259
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
260
            del fetch['timeout']
261
        # data rename to body
262
        if 'data' in fetch:
263
            fetch['body'] = fetch['data']
264
            del fetch['data']
265
266
        return fetch
267
268
    @gen.coroutine
269
    def can_fetch(self, user_agent, url):
270
        parsed = urlsplit(url)
271
        domain = parsed.netloc
272
        if domain in self.robots_txt_cache:
273
            robot_txt = self.robots_txt_cache[domain]
274
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
275
                robot_txt = None
276
        else:
277
            robot_txt = None
278
279
        if robot_txt is None:
280
            robot_txt = RobotFileParser()
281
            try:
282
                response = yield self.http_client.fetch(urljoin(url, '/robots.txt'),
283
                                                        connect_timeout=10, request_timeout=30)
284
                content = response.body
285
            except tornado.httpclient.HTTPError as e:
286
                logger.error('load robots.txt from %s error: %r', domain, e)
287
                content = ''
288
289
            try:
290
                content = content.decode('utf8', 'ignore')
291
            except UnicodeDecodeError:
292
                content = ''
293
294
            robot_txt.parse(content.splitlines())
295
            self.robots_txt_cache[domain] = robot_txt
296
297
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
298
299
    def clear_robot_txt_cache(self):
300
        now = time.time()
301
        for domain, robot_txt in self.robots_txt_cache.items():
302
            if now - robot_txt.mtime() > self.robot_txt_age:
303
                del self.robots_txt_cache[domain]
304
305
    @gen.coroutine
306
    def http_fetch(self, url, task, callback):
307
        '''HTTP fetcher'''
308
        start_time = time.time()
309
310
        self.on_fetch('http', task)
311
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
312
313
        # setup request parameters
314
        fetch = self.pack_tornado_request_parameters(url, task)
315
        task_fetch = task.get('fetch', {})
316
317
        session = cookies.RequestsCookieJar()
318
        # fix for tornado request obj
319
        if 'Cookie' in fetch['headers']:
320
            c = http_cookies.SimpleCookie()
321
            try:
322
                c.load(fetch['headers']['Cookie'])
323
            except AttributeError:
324
                c.load(utils.utf8(fetch['headers']['Cookie']))
325
            for key in c:
326
                session.set(key, c[key])
327
            del fetch['headers']['Cookie']
328
        if 'cookies' in fetch:
329
            session.update(fetch['cookies'])
330
            del fetch['cookies']
331
332
        max_redirects = task_fetch.get('max_redirects', 5)
333
        # we will handle redirects by hand to capture cookies
334
        fetch['follow_redirects'] = False
335
336
        # making requests
337
        while True:
338
            # robots.txt
339
            if task_fetch.get('robots_txt', False):
340
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
341
                if not can_fetch:
342
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
343
                    raise gen.Return(handle_error(error))
344
345
            try:
346
                request = tornado.httpclient.HTTPRequest(**fetch)
347
                cookie_header = cookies.get_cookie_header(session, request)
348
                if cookie_header:
349
                    request.headers['Cookie'] = cookie_header
350
            except Exception as e:
351
                logger.exception(fetch)
352
                raise gen.Return(handle_error(e))
353
354
            try:
355
                response = yield self.http_client.fetch(request)
356
            except tornado.httpclient.HTTPError as e:
357
                if e.response:
358
                    response = e.response
359
                else:
360
                    raise gen.Return(handle_error(e))
361
362
            extract_cookies_to_jar(session, response.request, response.headers)
363
            if (response.code in (301, 302, 303, 307)
364
                    and response.headers.get('Location')
365
                    and task_fetch.get('allow_redirects', True)):
366
                if max_redirects <= 0:
367
                    error = tornado.httpclient.HTTPError(
368
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
369
                        response)
370
                    raise gen.Return(handle_error(error))
371
                if response.code in (302, 303):
372
                    fetch['method'] = 'GET'
373
                    if 'body' in fetch:
374
                        del fetch['body']
375
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
376
                fetch['request_timeout'] -= time.time() - start_time
377
                if fetch['request_timeout'] < 0:
378
                    fetch['request_timeout'] = 0.1
379
                fetch['connect_timeout'] = fetch['request_timeout']
380
                max_redirects -= 1
381
                continue
382
383
            result = {}
384
            result['orig_url'] = url
385
            result['content'] = response.body or ''
386
            result['headers'] = dict(response.headers)
387
            result['status_code'] = response.code
388
            result['url'] = response.effective_url or url
389
            result['cookies'] = session.get_dict()
390
            result['time'] = time.time() - start_time
391
            result['save'] = task_fetch.get('save')
392
            if response.error:
393
                result['error'] = utils.text(response.error)
394
            if 200 <= response.code < 300:
395
                logger.info("[%d] %s:%s %s %.2fs", response.code,
396
                            task.get('project'), task.get('taskid'),
397
                            url, result['time'])
398
            else:
399
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
400
                               task.get('project'), task.get('taskid'),
401
                               url, result['time'])
402
403
            callback('http', task, result)
404
            self.on_result('http', task, result)
405
            raise gen.Return((task, result))
406
407
    @gen.coroutine
408
    def phantomjs_fetch(self, url, task, callback):
409
        '''Fetch with phantomjs proxy'''
410
        start_time = time.time()
411
412
        self.on_fetch('phantomjs', task)
413
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
414
415
        # check phantomjs proxy is enabled
416
        if not self.phantomjs_proxy:
417
            result = {
418
                "orig_url": url,
419
                "content": "phantomjs is not enabled.",
420
                "headers": {},
421
                "status_code": 501,
422
                "url": url,
423
                "cookies": {},
424
                "time": 0,
425
                "save": task.get('fetch', {}).get('save')
426
            }
427
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
428
            callback('http', task, result)
429
            self.on_result('http', task, result)
430
            raise gen.Return((task, result))
431
432
        # setup request parameters
433
        fetch = self.pack_tornado_request_parameters(url, task)
434
        task_fetch = task.get('fetch', {})
435
        for each in task_fetch:
436
            if each not in fetch:
437
                fetch[each] = task_fetch[each]
438
439
        # robots.txt
440
        if task_fetch.get('robots_txt', False):
441
            user_agent = fetch['headers']['User-Agent']
442
            can_fetch = yield self.can_fetch(user_agent, url)
443
            if not can_fetch:
444
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
445
                raise gen.Return(handle_error(error))
446
447
        request_conf = {
448
            'follow_redirects': False
449
        }
450
        if 'timeout' in task_fetch:
451
            request_conf['connect_timeout'] = task_fetch['timeout']
452
            request_conf['request_timeout'] = task_fetch['timeout'] + 1
453
454
        session = cookies.RequestsCookieJar()
455
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
456
        if fetch.get('cookies'):
457
            session.update(fetch['cookies'])
458
            if 'Cookie' in request.headers:
459
                del request.headers['Cookie']
460
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
461
462
        # making requests
463
        fetch['headers'] = dict(fetch['headers'])
464
        try:
465
            request = tornado.httpclient.HTTPRequest(
466
                url="%s" % self.phantomjs_proxy, method="POST",
467
                body=json.dumps(fetch), **request_conf)
468
        except Exception as e:
469
            raise gen.Return(handle_error(e))
470
471
        try:
472
            response = yield self.http_client.fetch(request)
473
        except tornado.httpclient.HTTPError as e:
474
            if e.response:
475
                response = e.response
476
477
        if not response.body:
478
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
479
480
        try:
481
            result = json.loads(utils.text(response.body))
482
            if response.error:
483
                result['error'] = utils.text(response.error)
484
        except Exception as e:
485
            raise gen.Return(handle_error(e))
486
487
        if result.get('status_code', 200):
488
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
489
                        task.get('project'), task.get('taskid'), url, result['time'])
490
        else:
491
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
492
                         task.get('project'), task.get('taskid'),
493
                         url, result['content'], result['time'])
494
495
        callback('phantomjs', task, result)
496
        self.on_result('phantomjs', task, result)
497
        raise gen.Return((task, result))
498
499
    def run(self):
500
        '''Run loop'''
501
        logger.info("fetcher starting...")
502
503
        def queue_loop():
504
            if not self.outqueue or not self.inqueue:
505
                return
506
            while not self._quit:
507
                try:
508
                    if self.outqueue.full():
509
                        break
510
                    if self.http_client.free_size() <= 0:
511
                        break
512
                    task = self.inqueue.get_nowait()
513
                    # FIXME: decode unicode_obj should used after data selete from
514
                    # database, it's used here for performance
515
                    task = utils.decode_unicode_obj(task)
516
                    self.fetch(task)
517
                except queue.Empty:
518
                    break
519
                except KeyboardInterrupt:
520
                    break
521
                except Exception as e:
522
                    logger.exception(e)
523
                    break
524
525
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
526
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
527
        self._running = True
528
529
        try:
530
            self.ioloop.start()
531
        except KeyboardInterrupt:
532
            pass
533
534
        logger.info("fetcher exiting...")
535
536
    def quit(self):
537
        '''Quit fetcher'''
538
        self._running = False
539
        self._quit = True
540
        self.ioloop.stop()
541
542
    def size(self):
543
        return self.http_client.size()
544
545
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
546
        '''Run xmlrpc server'''
547
        import umsgpack
548
        try:
549
            from xmlrpc.server import SimpleXMLRPCServer
550
            from xmlrpc.client import Binary
551
        except ImportError:
552
            from SimpleXMLRPCServer import SimpleXMLRPCServer
553
            from xmlrpclib import Binary
554
555
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
556
        server.register_introspection_functions()
557
        server.register_multicall_functions()
558
559
        server.register_function(self.quit, '_quit')
560
        server.register_function(self.size)
561
562
        def sync_fetch(task):
563
            result = self.sync_fetch(task)
564
            result = Binary(umsgpack.packb(result))
565
            return result
566
        server.register_function(sync_fetch, 'fetch')
567
568
        def dump_counter(_time, _type):
569
            return self._cnt[_time].to_dict(_type)
570
        server.register_function(dump_counter, 'counter')
571
572
        server.timeout = 0.5
573
        while not self._quit:
574
            server.handle_request()
575
        server.server_close()
576
577
    def on_fetch(self, type, task):
578
        '''Called before task fetch'''
579
        pass
580
581
    def on_result(self, type, task, result):
582
        '''Called after task fetched'''
583
        status_code = result.get('status_code', 599)
584
        if status_code != 599:
585
            status_code = (int(status_code) / 100 * 100)
586
        self._cnt['5m'].event((task.get('project'), status_code), +1)
587
        self._cnt['1h'].event((task.get('project'), status_code), +1)
588
589
        if type == 'http' and result.get('time'):
590
            content_len = len(result.get('content', ''))
591
            self._cnt['5m'].event((task.get('project'), 'speed'),
592
                                  float(content_len) / result.get('time'))
593
            self._cnt['1h'].event((task.get('project'), 'speed'),
594
                                  float(content_len) / result.get('time'))
595
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
596
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
597