Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

Fetcher.on_fetch()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
30
from pyspider.libs import utils, dataurl, counter
31
from pyspider.libs.url import quote_chinese
32
from .cookie_utils import extract_cookies_to_jar
33
logger = logging.getLogger('fetcher')
34
35
36
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
37
38
    def free_size(self):
39
        return len(self._free_list)
40
41
    def size(self):
42
        return len(self._curls) - self.free_size()
43
44
45
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
46
47
    def free_size(self):
48
        return self.max_clients - self.size()
49
50
    def size(self):
51
        return len(self.active)
52
53
fetcher_output = {
54
    "status_code": int,
55
    "orig_url": str,
56
    "url": str,
57
    "headers": dict,
58
    "content": str,
59
    "cookies": dict,
60
}
61
62
63
class Fetcher(object):
64
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
65
    default_options = {
66
        'method': 'GET',
67
        'headers': {
68
        },
69
        'use_gzip': True,
70
        'timeout': 120,
71
    }
72
    phantomjs_proxy = None
73
    robot_txt_age = 60*60  # 1h
74
75
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
76
        self.inqueue = inqueue
77
        self.outqueue = outqueue
78
79
        self.poolsize = poolsize
80
        self._running = False
81
        self._quit = False
82
        self.proxy = proxy
83
        self.async = async
84
        self.ioloop = tornado.ioloop.IOLoop()
85
86
        self.robots_txt_cache = {}
87
88
        # binding io_loop to http_client here
89
        if self.async:
90
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
91
                                                     io_loop=self.ioloop)
92
        else:
93
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
94
95
        self._cnt = {
96
            '5m': counter.CounterManager(
97
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
98
            '1h': counter.CounterManager(
99
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
100
        }
101
102
    def send_result(self, type, task, result):
103
        '''Send fetch result to processor'''
104
        if self.outqueue:
105
            try:
106
                self.outqueue.put((task, result))
107
            except Exception as e:
108
                logger.exception(e)
109
110
    def fetch(self, task, callback=None):
111
        if self.async:
112
            return self.async_fetch(task, callback)
113
        else:
114
            return self.async_fetch(task, callback).result()
115
116
    @gen.coroutine
117
    def async_fetch(self, task, callback=None):
118
        '''Do one fetch'''
119
        url = task.get('url', 'data:,')
120
        if callback is None:
121
            callback = self.send_result
122
123
        type = 'None'
124
        try:
125
            if url.startswith('data:'):
126
                type = 'data'
127
                result = yield gen.maybe_future(self.data_fetch(url, task))
128
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
129
                type = 'phantomjs'
130
                result = yield self.phantomjs_fetch(url, task)
131
            else:
132
                type = 'http'
133
                result = yield self.http_fetch(url, task)
134
        except Exception as e:
135
            logger.exception(e)
136
            result = self.handle_error(type, url, task, e)
137
138
        callback(type, task, result)
139
        self.on_result(type, task, result)
140
        raise gen.Return(result)
141
142
    def sync_fetch(self, task):
143
        '''Synchronization fetch, usually used in xmlrpc thread'''
144
        if not self._running:
145
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
146
147
        wait_result = threading.Condition()
148
        _result = {}
149
150
        def callback(type, task, result):
151
            wait_result.acquire()
152
            _result['type'] = type
153
            _result['task'] = task
154
            _result['result'] = result
155
            wait_result.notify()
156
            wait_result.release()
157
158
        wait_result.acquire()
159
        self.ioloop.add_callback(self.fetch, task, callback)
160
        while 'result' not in _result:
161
            wait_result.wait()
162
        wait_result.release()
163
        return _result['result']
164
165
    def data_fetch(self, url, task):
166
        '''A fake fetcher for dataurl'''
167
        self.on_fetch('data', task)
168
        result = {}
169
        result['orig_url'] = url
170
        result['content'] = dataurl.decode(url)
171
        result['headers'] = {}
172
        result['status_code'] = 200
173
        result['url'] = url
174
        result['cookies'] = {}
175
        result['time'] = 0
176
        result['save'] = task.get('fetch', {}).get('save')
177
        if len(result['content']) < 70:
178
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
179
        else:
180
            logger.info(
181
                "[200] %s:%s data:,%s...[content:%d] 0s",
182
                task.get('project'), task.get('taskid'),
183
                result['content'][:70],
184
                len(result['content'])
185
            )
186
187
        return result
188
189
    def handle_error(self, type, url, task, start_time, error):
190
        result = {
191
            'status_code': getattr(error, 'code', 599),
192
            'error': utils.text(error),
193
            'content': "",
194
            'time': time.time() - start_time,
195
            'orig_url': url,
196
            'url': url,
197
        }
198
        logger.error("[%d] %s:%s %s, %r %.2fs",
199
                     result['status_code'], task.get('project'), task.get('taskid'),
200
                     url, error, result['time'])
201
        return result
202
203
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
204
205
    def pack_tornado_request_parameters(self, url, task):
206
        fetch = copy.deepcopy(self.default_options)
207
        fetch['url'] = url
208
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
209
        fetch['headers']['User-Agent'] = self.user_agent
210
        task_fetch = task.get('fetch', {})
211
        for each in self.allowed_options:
212
            if each in task_fetch:
213
                fetch[each] = task_fetch[each]
214
        fetch['headers'].update(task_fetch.get('headers', {}))
215
216
        if task.get('track'):
217
            track_headers = tornado.httputil.HTTPHeaders(
218
                task.get('track', {}).get('fetch', {}).get('headers') or {})
219
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
220
        else:
221
            track_headers = {}
222
            track_ok = False
223
        # proxy
224
        proxy_string = None
225
        if isinstance(task_fetch.get('proxy'), six.string_types):
226
            proxy_string = task_fetch['proxy']
227
        elif self.proxy and task_fetch.get('proxy', True):
228
            proxy_string = self.proxy
229
        if proxy_string:
230
            if '://' not in proxy_string:
231
                proxy_string = 'http://' + proxy_string
232
            proxy_splited = urlsplit(proxy_string)
233
            if proxy_splited.username:
234
                fetch['proxy_username'] = proxy_splited.username
235
                if six.PY2:
236
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
237
            if proxy_splited.password:
238
                fetch['proxy_password'] = proxy_splited.password
239
                if six.PY2:
240
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
241
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
242
            if six.PY2:
243
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
244
            fetch['proxy_port'] = proxy_splited.port or 8080
245
246
        # etag
247
        if task_fetch.get('etag', True):
248
            _t = None
249
            if isinstance(task_fetch.get('etag'), six.string_types):
250
                _t = task_fetch.get('etag')
251
            elif track_ok:
252
                _t = track_headers.get('etag')
253
            if _t and 'If-None-Match' not in fetch['headers']:
254
                fetch['headers']['If-None-Match'] = _t
255
        # last modifed
256
        if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
257
            last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
258
            _t = None
259
            if isinstance(last_modified, six.string_types):
260
                _t = last_modified
261
            elif track_ok:
262
                _t = track_headers.get('last-modified')
263
            if _t and 'If-Modified-Since' not in fetch['headers']:
264
                fetch['headers']['If-Modified-Since'] = _t
265
        # timeout
266
        if 'timeout' in fetch:
267
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
268
            del fetch['timeout']
269
        # data rename to body
270
        if 'data' in fetch:
271
            fetch['body'] = fetch['data']
272
            del fetch['data']
273
274
        return fetch
275
276
    @gen.coroutine
277
    def can_fetch(self, user_agent, url):
278
        parsed = urlsplit(url)
279
        domain = parsed.netloc
280
        if domain in self.robots_txt_cache:
281
            robot_txt = self.robots_txt_cache[domain]
282
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
283
                robot_txt = None
284
        else:
285
            robot_txt = None
286
287
        if robot_txt is None:
288
            robot_txt = RobotFileParser()
289
            try:
290
                response = yield gen.maybe_future(self.http_client.fetch(
291
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
292
                content = response.body
293
            except tornado.httpclient.HTTPError as e:
294
                logger.error('load robots.txt from %s error: %r', domain, e)
295
                content = ''
296
297
            try:
298
                content = content.decode('utf8', 'ignore')
299
            except UnicodeDecodeError:
300
                content = ''
301
302
            robot_txt.parse(content.splitlines())
303
            self.robots_txt_cache[domain] = robot_txt
304
305
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
306
307
    def clear_robot_txt_cache(self):
308
        now = time.time()
309
        for domain, robot_txt in self.robots_txt_cache.items():
310
            if now - robot_txt.mtime() > self.robot_txt_age:
311
                del self.robots_txt_cache[domain]
312
313
    @gen.coroutine
314
    def http_fetch(self, url, task):
315
        '''HTTP fetcher'''
316
        start_time = time.time()
317
        self.on_fetch('http', task)
318
        handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
319
320
        # setup request parameters
321
        fetch = self.pack_tornado_request_parameters(url, task)
322
        task_fetch = task.get('fetch', {})
323
324
        session = cookies.RequestsCookieJar()
325
        # fix for tornado request obj
326
        if 'Cookie' in fetch['headers']:
327
            c = http_cookies.SimpleCookie()
328
            try:
329
                c.load(fetch['headers']['Cookie'])
330
            except AttributeError:
331
                c.load(utils.utf8(fetch['headers']['Cookie']))
332
            for key in c:
333
                session.set(key, c[key])
334
            del fetch['headers']['Cookie']
335
        if 'cookies' in fetch:
336
            session.update(fetch['cookies'])
337
            del fetch['cookies']
338
339
        max_redirects = task_fetch.get('max_redirects', 5)
340
        # we will handle redirects by hand to capture cookies
341
        fetch['follow_redirects'] = False
342
343
        # making requests
344
        while True:
345
            # robots.txt
346
            if task_fetch.get('robots_txt', False):
347
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
348
                if not can_fetch:
349
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
350
                    raise gen.Return(handle_error(error))
351
352
            try:
353
                request = tornado.httpclient.HTTPRequest(**fetch)
354
                # if cookie already in header, get_cookie_header wouldn't work
355
                old_cookie_header = request.headers.get('Cookie')
356
                if old_cookie_header:
357
                    del request.headers['Cookie']
358
                cookie_header = cookies.get_cookie_header(session, request)
359
                if cookie_header:
360
                    request.headers['Cookie'] = cookie_header
361
                elif old_cookie_header:
362
                    request.headers['Cookie'] = old_cookie_header
363
            except Exception as e:
364
                logger.exception(fetch)
365
                raise gen.Return(handle_error(e))
366
367
            try:
368
                response = yield gen.maybe_future(self.http_client.fetch(request))
369
            except tornado.httpclient.HTTPError as e:
370
                if e.response:
371
                    response = e.response
372
                else:
373
                    raise gen.Return(handle_error(e))
374
375
            extract_cookies_to_jar(session, response.request, response.headers)
376
            if (response.code in (301, 302, 303, 307)
377
                    and response.headers.get('Location')
378
                    and task_fetch.get('allow_redirects', True)):
379
                if max_redirects <= 0:
380
                    error = tornado.httpclient.HTTPError(
381
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
382
                        response)
383
                    raise gen.Return(handle_error(error))
384
                if response.code in (302, 303):
385
                    fetch['method'] = 'GET'
386
                    if 'body' in fetch:
387
                        del fetch['body']
388
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
389
                fetch['request_timeout'] -= time.time() - start_time
390
                if fetch['request_timeout'] < 0:
391
                    fetch['request_timeout'] = 0.1
392
                fetch['connect_timeout'] = fetch['request_timeout']
393
                max_redirects -= 1
394
                continue
395
396
            result = {}
397
            result['orig_url'] = url
398
            result['content'] = response.body or ''
399
            result['headers'] = dict(response.headers)
400
            result['status_code'] = response.code
401
            result['url'] = response.effective_url or url
402
            result['time'] = time.time() - start_time
403
            result['cookies'] = session.get_dict()
404
            result['save'] = task_fetch.get('save')
405
            if response.error:
406
                result['error'] = utils.text(response.error)
407
            if 200 <= response.code < 300:
408
                logger.info("[%d] %s:%s %s %.2fs", response.code,
409
                            task.get('project'), task.get('taskid'),
410
                            url, result['time'])
411
            else:
412
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
413
                               task.get('project'), task.get('taskid'),
414
                               url, result['time'])
415
416
            raise gen.Return(result)
417
418
    @gen.coroutine
419
    def phantomjs_fetch(self, url, task):
420
        '''Fetch with phantomjs proxy'''
421
        start_time = time.time()
422
        self.on_fetch('phantomjs', task)
423
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
424
425
        # check phantomjs proxy is enabled
426
        if not self.phantomjs_proxy:
427
            result = {
428
                "orig_url": url,
429
                "content": "phantomjs is not enabled.",
430
                "headers": {},
431
                "status_code": 501,
432
                "url": url,
433
                "time": time.time() - start_time,
434
                "cookies": {},
435
                "save": task.get('fetch', {}).get('save')
436
            }
437
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
438
            raise gen.Return(result)
439
440
        # setup request parameters
441
        fetch = self.pack_tornado_request_parameters(url, task)
442
        task_fetch = task.get('fetch', {})
443
        for each in task_fetch:
444
            if each not in fetch:
445
                fetch[each] = task_fetch[each]
446
447
        # robots.txt
448
        if task_fetch.get('robots_txt', False):
449
            user_agent = fetch['headers']['User-Agent']
450
            can_fetch = yield self.can_fetch(user_agent, url)
451
            if not can_fetch:
452
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
453
                raise gen.Return(handle_error(error))
454
455
        request_conf = {
456
            'follow_redirects': False
457
        }
458
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 120)
459
        request_conf['request_timeout'] = fetch.get('request_timeout', 120)
460
461
        session = cookies.RequestsCookieJar()
462
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
463
        if fetch.get('cookies'):
464
            session.update(fetch['cookies'])
465
            if 'Cookie' in request.headers:
466
                del request.headers['Cookie']
467
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
468
469
        # making requests
470
        fetch['headers'] = dict(fetch['headers'])
471
        try:
472
            request = tornado.httpclient.HTTPRequest(
473
                url="%s" % self.phantomjs_proxy, method="POST",
474
                body=json.dumps(fetch), **request_conf)
475
        except Exception as e:
476
            raise gen.Return(handle_error(e))
477
478
        try:
479
            response = yield gen.maybe_future(self.http_client.fetch(request))
480
        except tornado.httpclient.HTTPError as e:
481
            if e.response:
482
                response = e.response
483
            else:
484
                raise gen.Return(handle_error(e))
485
486
        if not response.body:
487
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
488
489
        result = {}
490
        try:
491
            result = json.loads(utils.text(response.body))
492
        except Exception as e:
493
            if response.error:
494
                result['error'] = utils.text(response.error)
495
            raise gen.Return(handle_error(e))
496
497
        if result.get('status_code', 200):
498
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
499
                        task.get('project'), task.get('taskid'), url, result['time'])
500
        else:
501
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
502
                         task.get('project'), task.get('taskid'),
503
                         url, result['content'], result['time'])
504
505
        raise gen.Return(result)
506
507
    def run(self):
508
        '''Run loop'''
509
        logger.info("fetcher starting...")
510
511
        def queue_loop():
512
            if not self.outqueue or not self.inqueue:
513
                return
514
            while not self._quit:
515
                try:
516
                    if self.outqueue.full():
517
                        break
518
                    if self.http_client.free_size() <= 0:
519
                        break
520
                    task = self.inqueue.get_nowait()
521
                    # FIXME: decode unicode_obj should used after data selete from
522
                    # database, it's used here for performance
523
                    task = utils.decode_unicode_obj(task)
524
                    self.fetch(task)
525
                except queue.Empty:
526
                    break
527
                except KeyboardInterrupt:
528
                    break
529
                except Exception as e:
530
                    logger.exception(e)
531
                    break
532
533
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
534
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
535
        self._running = True
536
537
        try:
538
            self.ioloop.start()
539
        except KeyboardInterrupt:
540
            pass
541
542
        logger.info("fetcher exiting...")
543
544
    def quit(self):
545
        '''Quit fetcher'''
546
        self._running = False
547
        self._quit = True
548
        self.ioloop.add_callback(self.ioloop.stop)
549
        if hasattr(self, 'xmlrpc_server'):
550
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
551
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
552
553
    def size(self):
554
        return self.http_client.size()
555
556
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
557
        '''Run xmlrpc server'''
558
        import umsgpack
559
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
560
        try:
561
            from xmlrpc.client import Binary
562
        except ImportError:
563
            from xmlrpclib import Binary
564
565
        application = WSGIXMLRPCApplication()
566
567
        application.register_function(self.quit, '_quit')
568
        application.register_function(self.size)
569
570
        def sync_fetch(task):
571
            result = self.sync_fetch(task)
572
            result = Binary(umsgpack.packb(result))
573
            return result
574
        application.register_function(sync_fetch, 'fetch')
575
576
        def dump_counter(_time, _type):
577
            return self._cnt[_time].to_dict(_type)
578
        application.register_function(dump_counter, 'counter')
579
580
        import tornado.wsgi
581
        import tornado.ioloop
582
        import tornado.httpserver
583
584
        container = tornado.wsgi.WSGIContainer(application)
585
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
586
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
587
        self.xmlrpc_server.listen(port=port, address=bind)
588
        self.xmlrpc_ioloop.start()
589
590
    def on_fetch(self, type, task):
591
        '''Called before task fetch'''
592
        pass
593
594
    def on_result(self, type, task, result):
595
        '''Called after task fetched'''
596
        status_code = result.get('status_code', 599)
597
        if status_code != 599:
598
            status_code = (int(status_code) / 100 * 100)
599
        self._cnt['5m'].event((task.get('project'), status_code), +1)
600
        self._cnt['1h'].event((task.get('project'), status_code), +1)
601
602
        if type in ('http', 'phantomjs') and result.get('time'):
603
            content_len = len(result.get('content', ''))
604
            self._cnt['5m'].event((task.get('project'), 'speed'),
605
                                  float(content_len) / result.get('time'))
606
            self._cnt['1h'].event((task.get('project'), 'speed'),
607
                                  float(content_len) / result.get('time'))
608
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
609
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
610