Completed
Push — master ( f8c889...431373 )
by Roy
11:19 queued 14s
created

pyspider.fetcher.Fetcher.data_fetch()   B

Complexity

Conditions 2

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 25
rs 8.8571
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
from pyspider.libs import utils, dataurl, counter
30
from .cookie_utils import extract_cookies_to_jar
31
logger = logging.getLogger('fetcher')
32
33
34
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
35
36
    def free_size(self):
37
        return len(self._free_list)
38
39
    def size(self):
40
        return len(self._curls) - self.free_size()
41
42
43
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
44
45
    def free_size(self):
46
        return self.max_clients - self.size()
47
48
    def size(self):
49
        return len(self.active)
50
51
fetcher_output = {
52
    "status_code": int,
53
    "orig_url": str,
54
    "url": str,
55
    "headers": dict,
56
    "content": str,
57
    "cookies": dict,
58
}
59
60
61
class Fetcher(object):
62
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
63
    default_options = {
64
        'method': 'GET',
65
        'headers': {
66
        },
67
        'use_gzip': True,
68
        'timeout': 120,
69
    }
70
    phantomjs_proxy = None
71
    robot_txt_age = 60*60  # 1h
72
73
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
74
        self.inqueue = inqueue
75
        self.outqueue = outqueue
76
77
        self.poolsize = poolsize
78
        self._running = False
79
        self._quit = False
80
        self.proxy = proxy
81
        self.async = async
82
        self.ioloop = tornado.ioloop.IOLoop()
83
84
        self.robots_txt_cache = {}
85
86
        # binding io_loop to http_client here
87
        if self.async:
88
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
89
                                                     io_loop=self.ioloop)
90
        else:
91
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
92
93
        self._cnt = {
94
            '5m': counter.CounterManager(
95
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
96
            '1h': counter.CounterManager(
97
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
98
        }
99
100
    def send_result(self, type, task, result):
101
        '''Send fetch result to processor'''
102
        if self.outqueue:
103
            try:
104
                self.outqueue.put((task, result))
105
            except Exception as e:
106
                logger.exception(e)
107
108
    def fetch(self, task, callback=None):
109
        if self.async:
110
            return self.async_fetch(task, callback)
111
        else:
112
            return self.async_fetch(task, callback).result()
113
114
    @gen.coroutine
115
    def async_fetch(self, task, callback=None):
116
        '''Do one fetch'''
117
        url = task.get('url', 'data:,')
118
        if callback is None:
119
            callback = self.send_result
120
121
        try:
122
            if url.startswith('data:'):
123
                ret = yield gen.maybe_future(self.data_fetch(url, task, callback))
124
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
125
                ret = yield self.phantomjs_fetch(url, task, callback)
126
            else:
127
                ret = yield self.http_fetch(url, task, callback)
128
        except Exception as e:
129
            logger.exception(e)
130
            raise e
131
132
        raise gen.Return(ret)
133
134
    def sync_fetch(self, task):
135
        '''Synchronization fetch, usually used in xmlrpc thread'''
136
        if not self._running:
137
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
138
139
        wait_result = threading.Condition()
140
        _result = {}
141
142
        def callback(type, task, result):
143
            wait_result.acquire()
144
            _result['type'] = type
145
            _result['task'] = task
146
            _result['result'] = result
147
            wait_result.notify()
148
            wait_result.release()
149
150
        wait_result.acquire()
151
        self.fetch(task, callback=callback)
152
        while 'result' not in _result:
153
            wait_result.wait()
154
        wait_result.release()
155
        return _result['result']
156
157
    def data_fetch(self, url, task, callback):
158
        '''A fake fetcher for dataurl'''
159
        self.on_fetch('data', task)
160
        result = {}
161
        result['orig_url'] = url
162
        result['content'] = dataurl.decode(url)
163
        result['headers'] = {}
164
        result['status_code'] = 200
165
        result['url'] = url
166
        result['cookies'] = {}
167
        result['time'] = 0
168
        result['save'] = task.get('fetch', {}).get('save')
169
        if len(result['content']) < 70:
170
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
171
        else:
172
            logger.info(
173
                "[200] %s:%s data:,%s...[content:%d] 0s",
174
                task.get('project'), task.get('taskid'),
175
                result['content'][:70],
176
                len(result['content'])
177
            )
178
179
        callback('data', task, result)
180
        self.on_result('data', task, result)
181
        return task, result
182
183
    def handle_error(self, type, url, task, start_time, callback, error):
184
        result = {
185
            'status_code': getattr(error, 'code', 599),
186
            'error': utils.text(error),
187
            'content': "",
188
            'time': time.time() - start_time,
189
            'orig_url': url,
190
            'url': url,
191
        }
192
        logger.error("[%d] %s:%s %s, %r %.2fs",
193
                     result['status_code'], task.get('project'), task.get('taskid'),
194
                     url, error, result['time'])
195
        callback(type, task, result)
196
        self.on_result(type, task, result)
197
        return task, result
198
199
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
200
201
    def pack_tornado_request_parameters(self, url, task):
202
        fetch = copy.deepcopy(self.default_options)
203
        fetch['url'] = url
204
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
205
        fetch['headers']['User-Agent'] = self.user_agent
206
        task_fetch = task.get('fetch', {})
207
        for each in self.allowed_options:
208
            if each in task_fetch:
209
                fetch[each] = task_fetch[each]
210
        fetch['headers'].update(task_fetch.get('headers', {}))
211
212
        if task.get('track'):
213
            track_headers = tornado.httputil.HTTPHeaders(
214
                task.get('track', {}).get('fetch', {}).get('headers') or {})
215
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
216
        else:
217
            track_headers = {}
218
            track_ok = False
219
        # proxy
220
        proxy_string = None
221
        if isinstance(task_fetch.get('proxy'), six.string_types):
222
            proxy_string = task_fetch['proxy']
223
        elif self.proxy and task_fetch.get('proxy', True):
224
            proxy_string = self.proxy
225
        if proxy_string:
226
            if '://' not in proxy_string:
227
                proxy_string = 'http://' + proxy_string
228
            proxy_splited = urlsplit(proxy_string)
229
            if proxy_splited.username:
230
                fetch['proxy_username'] = proxy_splited.username
231
                if six.PY2:
232
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
233
            if proxy_splited.password:
234
                fetch['proxy_password'] = proxy_splited.password
235
                if six.PY2:
236
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
237
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
238
            if six.PY2:
239
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
240
            fetch['proxy_port'] = proxy_splited.port or 8080
241
242
        # etag
243
        if task_fetch.get('etag', True):
244
            _t = None
245
            if isinstance(task_fetch.get('etag'), six.string_types):
246
                _t = task_fetch.get('etag')
247
            elif track_ok:
248
                _t = track_headers.get('etag')
249
            if _t and 'If-None-Match' not in fetch['headers']:
250
                fetch['headers']['If-None-Match'] = _t
251
        # last modifed
252
        if task_fetch.get('last_modified', True):
253
            _t = None
254
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
255
                _t = task_fetch.get('last_modifed')
256
            elif track_ok:
257
                _t = track_headers.get('last-modified')
258
            if _t and 'If-Modified-Since' not in fetch['headers']:
259
                fetch['headers']['If-Modified-Since'] = _t
260
        # timeout
261
        if 'timeout' in fetch:
262
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
263
            del fetch['timeout']
264
        # data rename to body
265
        if 'data' in fetch:
266
            fetch['body'] = fetch['data']
267
            del fetch['data']
268
269
        return fetch
270
271
    @gen.coroutine
272
    def can_fetch(self, user_agent, url):
273
        parsed = urlsplit(url)
274
        domain = parsed.netloc
275
        if domain in self.robots_txt_cache:
276
            robot_txt = self.robots_txt_cache[domain]
277
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
278
                robot_txt = None
279
        else:
280
            robot_txt = None
281
282
        if robot_txt is None:
283
            robot_txt = RobotFileParser()
284
            try:
285
                response = yield gen.maybe_future(self.http_client.fetch(
286
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
287
                content = response.body
288
            except tornado.httpclient.HTTPError as e:
289
                logger.error('load robots.txt from %s error: %r', domain, e)
290
                content = ''
291
292
            try:
293
                content = content.decode('utf8', 'ignore')
294
            except UnicodeDecodeError:
295
                content = ''
296
297
            robot_txt.parse(content.splitlines())
298
            self.robots_txt_cache[domain] = robot_txt
299
300
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
301
302
    def clear_robot_txt_cache(self):
303
        now = time.time()
304
        for domain, robot_txt in self.robots_txt_cache.items():
305
            if now - robot_txt.mtime() > self.robot_txt_age:
306
                del self.robots_txt_cache[domain]
307
308
    @gen.coroutine
309
    def http_fetch(self, url, task, callback):
310
        '''HTTP fetcher'''
311
        start_time = time.time()
312
313
        self.on_fetch('http', task)
314
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
315
316
        # setup request parameters
317
        fetch = self.pack_tornado_request_parameters(url, task)
318
        task_fetch = task.get('fetch', {})
319
320
        session = cookies.RequestsCookieJar()
321
        # fix for tornado request obj
322
        if 'Cookie' in fetch['headers']:
323
            c = http_cookies.SimpleCookie()
324
            try:
325
                c.load(fetch['headers']['Cookie'])
326
            except AttributeError:
327
                c.load(utils.utf8(fetch['headers']['Cookie']))
328
            for key in c:
329
                session.set(key, c[key])
330
            del fetch['headers']['Cookie']
331
        if 'cookies' in fetch:
332
            session.update(fetch['cookies'])
333
            del fetch['cookies']
334
335
        max_redirects = task_fetch.get('max_redirects', 5)
336
        # we will handle redirects by hand to capture cookies
337
        fetch['follow_redirects'] = False
338
339
        # making requests
340
        while True:
341
            # robots.txt
342
            if task_fetch.get('robots_txt', False):
343
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
344
                if not can_fetch:
345
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
346
                    raise gen.Return(handle_error(error))
347
348
            try:
349
                request = tornado.httpclient.HTTPRequest(**fetch)
350
                cookie_header = cookies.get_cookie_header(session, request)
351
                if cookie_header:
352
                    request.headers['Cookie'] = cookie_header
353
            except Exception as e:
354
                logger.exception(fetch)
355
                raise gen.Return(handle_error(e))
356
357
            try:
358
                response = yield gen.maybe_future(self.http_client.fetch(request))
359
            except tornado.httpclient.HTTPError as e:
360
                if e.response:
361
                    response = e.response
362
                else:
363
                    raise gen.Return(handle_error(e))
364
365
            extract_cookies_to_jar(session, response.request, response.headers)
366
            if (response.code in (301, 302, 303, 307)
367
                    and response.headers.get('Location')
368
                    and task_fetch.get('allow_redirects', True)):
369
                if max_redirects <= 0:
370
                    error = tornado.httpclient.HTTPError(
371
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
372
                        response)
373
                    raise gen.Return(handle_error(error))
374
                if response.code in (302, 303):
375
                    fetch['method'] = 'GET'
376
                    if 'body' in fetch:
377
                        del fetch['body']
378
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
379
                fetch['request_timeout'] -= time.time() - start_time
380
                if fetch['request_timeout'] < 0:
381
                    fetch['request_timeout'] = 0.1
382
                fetch['connect_timeout'] = fetch['request_timeout']
383
                max_redirects -= 1
384
                continue
385
386
            result = {}
387
            result['orig_url'] = url
388
            result['content'] = response.body or ''
389
            result['headers'] = dict(response.headers)
390
            result['status_code'] = response.code
391
            result['url'] = response.effective_url or url
392
            result['cookies'] = session.get_dict()
393
            result['time'] = time.time() - start_time
394
            result['save'] = task_fetch.get('save')
395
            if response.error:
396
                result['error'] = utils.text(response.error)
397
            if 200 <= response.code < 300:
398
                logger.info("[%d] %s:%s %s %.2fs", response.code,
399
                            task.get('project'), task.get('taskid'),
400
                            url, result['time'])
401
            else:
402
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
403
                               task.get('project'), task.get('taskid'),
404
                               url, result['time'])
405
406
            callback('http', task, result)
407
            self.on_result('http', task, result)
408
            raise gen.Return((task, result))
409
410
    @gen.coroutine
411
    def phantomjs_fetch(self, url, task, callback):
412
        '''Fetch with phantomjs proxy'''
413
        start_time = time.time()
414
415
        self.on_fetch('phantomjs', task)
416
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
417
418
        # check phantomjs proxy is enabled
419
        if not self.phantomjs_proxy:
420
            result = {
421
                "orig_url": url,
422
                "content": "phantomjs is not enabled.",
423
                "headers": {},
424
                "status_code": 501,
425
                "url": url,
426
                "cookies": {},
427
                "time": 0,
428
                "save": task.get('fetch', {}).get('save')
429
            }
430
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
431
            callback('http', task, result)
432
            self.on_result('http', task, result)
433
            raise gen.Return((task, result))
434
435
        # setup request parameters
436
        fetch = self.pack_tornado_request_parameters(url, task)
437
        task_fetch = task.get('fetch', {})
438
        for each in task_fetch:
439
            if each not in fetch:
440
                fetch[each] = task_fetch[each]
441
442
        # robots.txt
443
        if task_fetch.get('robots_txt', False):
444
            user_agent = fetch['headers']['User-Agent']
445
            can_fetch = yield self.can_fetch(user_agent, url)
446
            if not can_fetch:
447
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
448
                raise gen.Return(handle_error(error))
449
450
        request_conf = {
451
            'follow_redirects': False
452
        }
453
        if 'timeout' in task_fetch:
454
            request_conf['connect_timeout'] = task_fetch['timeout']
455
            request_conf['request_timeout'] = task_fetch['timeout'] + 1
456
457
        session = cookies.RequestsCookieJar()
458
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
459
        if fetch.get('cookies'):
460
            session.update(fetch['cookies'])
461
            if 'Cookie' in request.headers:
462
                del request.headers['Cookie']
463
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
464
465
        # making requests
466
        fetch['headers'] = dict(fetch['headers'])
467
        try:
468
            request = tornado.httpclient.HTTPRequest(
469
                url="%s" % self.phantomjs_proxy, method="POST",
470
                body=json.dumps(fetch), **request_conf)
471
        except Exception as e:
472
            raise gen.Return(handle_error(e))
473
474
        try:
475
            response = yield gen.maybe_future(self.http_client.fetch(request))
476
        except tornado.httpclient.HTTPError as e:
477
            if e.response:
478
                response = e.response
479
            else:
480
                raise gen.Return(handle_error(e))
481
482
        if not response.body:
483
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
484
485
        try:
486
            result = json.loads(utils.text(response.body))
487
        except Exception as e:
488
            if response.error:
489
                result['error'] = utils.text(response.error)
490
            raise gen.Return(handle_error(e))
491
492
        if result.get('status_code', 200):
493
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
494
                        task.get('project'), task.get('taskid'), url, result['time'])
495
        else:
496
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
497
                         task.get('project'), task.get('taskid'),
498
                         url, result['content'], result['time'])
499
500
        callback('phantomjs', task, result)
501
        self.on_result('phantomjs', task, result)
502
        raise gen.Return((task, result))
503
504
    def run(self):
505
        '''Run loop'''
506
        logger.info("fetcher starting...")
507
508
        def queue_loop():
509
            if not self.outqueue or not self.inqueue:
510
                return
511
            while not self._quit:
512
                try:
513
                    if self.outqueue.full():
514
                        break
515
                    if self.http_client.free_size() <= 0:
516
                        break
517
                    task = self.inqueue.get_nowait()
518
                    # FIXME: decode unicode_obj should used after data selete from
519
                    # database, it's used here for performance
520
                    task = utils.decode_unicode_obj(task)
521
                    self.fetch(task)
522
                except queue.Empty:
523
                    break
524
                except KeyboardInterrupt:
525
                    break
526
                except Exception as e:
527
                    logger.exception(e)
528
                    break
529
530
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
531
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
532
        self._running = True
533
534
        try:
535
            self.ioloop.start()
536
        except KeyboardInterrupt:
537
            pass
538
539
        logger.info("fetcher exiting...")
540
541
    def quit(self):
542
        '''Quit fetcher'''
543
        self._running = False
544
        self._quit = True
545
        self.ioloop.stop()
546
547
    def size(self):
548
        return self.http_client.size()
549
550
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
551
        '''Run xmlrpc server'''
552
        import umsgpack
553
        try:
554
            from xmlrpc.server import SimpleXMLRPCServer
555
            from xmlrpc.client import Binary
556
        except ImportError:
557
            from SimpleXMLRPCServer import SimpleXMLRPCServer
558
            from xmlrpclib import Binary
559
560
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
561
        server.register_introspection_functions()
562
        server.register_multicall_functions()
563
564
        server.register_function(self.quit, '_quit')
565
        server.register_function(self.size)
566
567
        def sync_fetch(task):
568
            result = self.sync_fetch(task)
569
            result = Binary(umsgpack.packb(result))
570
            return result
571
        server.register_function(sync_fetch, 'fetch')
572
573
        def dump_counter(_time, _type):
574
            return self._cnt[_time].to_dict(_type)
575
        server.register_function(dump_counter, 'counter')
576
577
        server.timeout = 0.5
578
        while not self._quit:
579
            server.handle_request()
580
        server.server_close()
581
582
    def on_fetch(self, type, task):
583
        '''Called before task fetch'''
584
        pass
585
586
    def on_result(self, type, task, result):
587
        '''Called after task fetched'''
588
        status_code = result.get('status_code', 599)
589
        if status_code != 599:
590
            status_code = (int(status_code) / 100 * 100)
591
        self._cnt['5m'].event((task.get('project'), status_code), +1)
592
        self._cnt['1h'].event((task.get('project'), status_code), +1)
593
594
        if type == 'http' and result.get('time'):
595
            content_len = len(result.get('content', ''))
596
            self._cnt['5m'].event((task.get('project'), 'speed'),
597
                                  float(content_len) / result.get('time'))
598
            self._cnt['1h'].event((task.get('project'), 'speed'),
599
                                  float(content_len) / result.get('time'))
600
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
601
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
602