Completed
Push — master ( c89357...eeed85 )
by Roy
01:05
created

pyspider.fetcher.Fetcher.fetch()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 5
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
from pyspider.libs import utils, dataurl, counter
30
from .cookie_utils import extract_cookies_to_jar
31
logger = logging.getLogger('fetcher')
32
33
34
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
35
36
    def free_size(self):
37
        return len(self._free_list)
38
39
    def size(self):
40
        return len(self._curls) - self.free_size()
41
42
43
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
44
45
    def free_size(self):
46
        return self.max_clients - self.size()
47
48
    def size(self):
49
        return len(self.active)
50
51
fetcher_output = {
52
    "status_code": int,
53
    "orig_url": str,
54
    "url": str,
55
    "headers": dict,
56
    "content": str,
57
    "cookies": dict,
58
}
59
60
61
class Fetcher(object):
62
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
63
    default_options = {
64
        'method': 'GET',
65
        'headers': {
66
        },
67
        'use_gzip': True,
68
        'timeout': 120,
69
    }
70
    phantomjs_proxy = None
71
    robot_txt_age = 60*60  # 1h
72
73
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
74
        self.inqueue = inqueue
75
        self.outqueue = outqueue
76
77
        self.poolsize = poolsize
78
        self._running = False
79
        self._quit = False
80
        self.proxy = proxy
81
        self.async = async
82
        self.ioloop = tornado.ioloop.IOLoop()
83
84
        self.robots_txt_cache = {}
85
86
        # binding io_loop to http_client here
87
        self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
88
                                                 io_loop=self.ioloop)
89
90
        self._cnt = {
91
            '5m': counter.CounterManager(
92
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
93
            '1h': counter.CounterManager(
94
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
95
        }
96
97
    def send_result(self, type, task, result):
98
        '''Send fetch result to processor'''
99
        if self.outqueue:
100
            try:
101
                self.outqueue.put((task, result))
102
            except Exception as e:
103
                logger.exception(e)
104
105
    def fetch(self, task, callback=None):
106
        if self.async:
107
            return self.async_fetch(task, callback)
108
        else:
109
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, callback))
110
111
    def async_fetch(self, task, callback=None):
112
        '''Do one fetch'''
113
        url = task.get('url', 'data:,')
114
        if callback is None:
115
            callback = self.send_result
116
        if url.startswith('data:'):
117
            return gen.maybe_future(self.data_fetch(url, task, callback))
118
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
119
            return gen.maybe_future(self.phantomjs_fetch(url, task, callback))
120
        else:
121
            return gen.maybe_future(self.http_fetch(url, task, callback))
122
123
    def sync_fetch(self, task):
124
        '''Synchronization fetch, usually used in xmlrpc thread'''
125
        if not self._running:
126
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
127
128
        wait_result = threading.Condition()
129
        _result = {}
130
131
        def callback(type, task, result):
132
            wait_result.acquire()
133
            _result['type'] = type
134
            _result['task'] = task
135
            _result['result'] = result
136
            wait_result.notify()
137
            wait_result.release()
138
139
        wait_result.acquire()
140
        self.fetch(task, callback=callback)
141
        while 'result' not in _result:
142
            wait_result.wait()
143
        wait_result.release()
144
        return _result['result']
145
146
    def data_fetch(self, url, task, callback):
147
        '''A fake fetcher for dataurl'''
148
        self.on_fetch('data', task)
149
        result = {}
150
        result['orig_url'] = url
151
        result['content'] = dataurl.decode(url)
152
        result['headers'] = {}
153
        result['status_code'] = 200
154
        result['url'] = url
155
        result['cookies'] = {}
156
        result['time'] = 0
157
        result['save'] = task.get('fetch', {}).get('save')
158
        if len(result['content']) < 70:
159
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
160
        else:
161
            logger.info(
162
                "[200] %s:%s data:,%s...[content:%d] 0s",
163
                task.get('project'), task.get('taskid'),
164
                result['content'][:70],
165
                len(result['content'])
166
            )
167
168
        callback('data', task, result)
169
        self.on_result('data', task, result)
170
        return task, result
171
172
    def handle_error(self, type, url, task, start_time, callback, error):
173
        result = {
174
            'status_code': getattr(error, 'code', 599),
175
            'error': utils.text(error),
176
            'content': "",
177
            'time': time.time() - start_time,
178
            'orig_url': url,
179
            'url': url,
180
        }
181
        logger.error("[%d] %s:%s %s, %r %.2fs",
182
                     result['status_code'], task.get('project'), task.get('taskid'),
183
                     url, error, result['time'])
184
        callback(type, task, result)
185
        self.on_result(type, task, result)
186
        return task, result
187
188
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
189
190
    def pack_tornado_request_parameters(self, url, task):
191
        fetch = copy.deepcopy(self.default_options)
192
        fetch['url'] = url
193
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
194
        fetch['headers']['User-Agent'] = self.user_agent
195
        task_fetch = task.get('fetch', {})
196
        for each in self.allowed_options:
197
            if each in task_fetch:
198
                fetch[each] = task_fetch[each]
199
        fetch['headers'].update(task_fetch.get('headers', {}))
200
201
        if task.get('track'):
202
            track_headers = tornado.httputil.HTTPHeaders(
203
                task.get('track', {}).get('fetch', {}).get('headers') or {})
204
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
205
        else:
206
            track_headers = {}
207
            track_ok = False
208
        # proxy
209
        proxy_string = None
210
        if isinstance(task_fetch.get('proxy'), six.string_types):
211
            proxy_string = task_fetch['proxy']
212
        elif self.proxy and task_fetch.get('proxy', True):
213
            proxy_string = self.proxy
214
        if proxy_string:
215
            if '://' not in proxy_string:
216
                proxy_string = 'http://' + proxy_string
217
            proxy_splited = urlsplit(proxy_string)
218
            if proxy_splited.username:
219
                fetch['proxy_username'] = proxy_splited.username
220
                if six.PY2:
221
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
222
            if proxy_splited.password:
223
                fetch['proxy_password'] = proxy_splited.password
224
                if six.PY2:
225
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
226
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
227
            if six.PY2:
228
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
229
            fetch['proxy_port'] = proxy_splited.port or 8080
230
231
        # etag
232
        if task_fetch.get('etag', True):
233
            _t = None
234
            if isinstance(task_fetch.get('etag'), six.string_types):
235
                _t = task_fetch.get('etag')
236
            elif track_ok:
237
                _t = track_headers.get('etag')
238
            if _t and 'If-None-Match' not in fetch['headers']:
239
                fetch['headers']['If-None-Match'] = _t
240
        # last modifed
241
        if task_fetch.get('last_modified', True):
242
            _t = None
243
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
244
                _t = task_fetch.get('last_modifed')
245
            elif track_ok:
246
                _t = track_headers.get('last-modified')
247
            if _t and 'If-Modified-Since' not in fetch['headers']:
248
                fetch['headers']['If-Modified-Since'] = _t
249
        # timeout
250
        if 'timeout' in fetch:
251
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
252
            del fetch['timeout']
253
        # data rename to body
254
        if 'data' in fetch:
255
            fetch['body'] = fetch['data']
256
            del fetch['data']
257
258
        return fetch
259
260
    @gen.coroutine
261
    def can_fetch(self, user_agent, url):
262
        parsed = urlsplit(url)
263
        domain = parsed.netloc
264
        if domain in self.robots_txt_cache:
265
            robot_txt = self.robots_txt_cache[domain]
266
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
267
                robot_txt = None
268
        else:
269
            robot_txt = None
270
271
        if robot_txt is None:
272
            robot_txt = RobotFileParser()
273
            try:
274
                response = yield self.http_client.fetch(urljoin(url, '/robots.txt'),
275
                                                        connect_timeout=10, request_timeout=30)
276
                content = response.body
277
            except tornado.httpclient.HTTPError as e:
278
                logger.error('load robots.txt from %s error: %r', domain, e)
279
                content = ''
280
281
            robot_txt.parse(content.splitlines())
282
            self.robots_txt_cache[domain] = robot_txt
283
284
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
285
286
    def clear_robot_txt_cache(self):
287
        now = time.time()
288
        for domain, robot_txt in self.robots_txt_cache.items():
289
            if now - robot_txt.mtime() > self.robot_txt_age:
290
                del self.robots_txt_cache[domain]
291
292
    @gen.coroutine
293
    def http_fetch(self, url, task, callback):
294
        '''HTTP fetcher'''
295
        start_time = time.time()
296
297
        self.on_fetch('http', task)
298
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
299
300
        # setup request parameters
301
        fetch = self.pack_tornado_request_parameters(url, task)
302
        task_fetch = task.get('fetch', {})
303
304
        session = cookies.RequestsCookieJar()
305
        # fix for tornado request obj
306
        if 'Cookie' in fetch['headers']:
307
            c = http_cookies.SimpleCookie()
308
            try:
309
                c.load(fetch['headers']['Cookie'])
310
            except AttributeError:
311
                c.load(utils.utf8(fetch['headers']['Cookie']))
312
            for key in c:
313
                session.set(key, c[key])
314
            del fetch['headers']['Cookie']
315
        if 'cookies' in fetch:
316
            session.update(fetch['cookies'])
317
            del fetch['cookies']
318
319
        max_redirects = task_fetch.get('max_redirects', 5)
320
        # we will handle redirects by hand to capture cookies
321
        fetch['follow_redirects'] = False
322
323
        # making requests
324
        while True:
325
            # robots.txt
326
            if task_fetch.get('robots_txt', False):
327
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
328
                print can_fetch
329
                if not can_fetch:
330
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
331
                    print error
332
                    raise gen.Return(handle_error(error))
333
334
            try:
335
                request = tornado.httpclient.HTTPRequest(**fetch)
336
                cookie_header = cookies.get_cookie_header(session, request)
337
                if cookie_header:
338
                    request.headers['Cookie'] = cookie_header
339
            except Exception as e:
340
                logger.exception(fetch)
341
                raise gen.Return(handle_error(e))
342
343
            try:
344
                response = yield self.http_client.fetch(request)
345
            except tornado.httpclient.HTTPError as e:
346
                if e.response:
347
                    response = e.response
348
                else:
349
                    raise gen.Return(handle_error(e))
350
351
            extract_cookies_to_jar(session, response.request, response.headers)
352
            if (response.code in (301, 302, 303, 307)
353
                    and response.headers.get('Location')
354
                    and task_fetch.get('allow_redirects', True)):
355
                if max_redirects <= 0:
356
                    error = tornado.httpclient.HTTPError(
357
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
358
                        response)
359
                    raise gen.Return(handle_error(error))
360
                if response.code in (302, 303):
361
                    fetch['method'] = 'GET'
362
                    if 'body' in fetch:
363
                        del fetch['body']
364
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
365
                fetch['request_timeout'] -= time.time() - start_time
366
                if fetch['request_timeout'] < 0:
367
                    fetch['request_timeout'] = 0.1
368
                fetch['connect_timeout'] = fetch['request_timeout']
369
                max_redirects -= 1
370
                continue
371
372
            result = {}
373
            result['orig_url'] = url
374
            result['content'] = response.body or ''
375
            result['headers'] = dict(response.headers)
376
            result['status_code'] = response.code
377
            result['url'] = response.effective_url or url
378
            result['cookies'] = session.get_dict()
379
            result['time'] = time.time() - start_time
380
            result['save'] = task_fetch.get('save')
381
            if response.error:
382
                result['error'] = utils.text(response.error)
383
            if 200 <= response.code < 300:
384
                logger.info("[%d] %s:%s %s %.2fs", response.code,
385
                            task.get('project'), task.get('taskid'),
386
                            url, result['time'])
387
            else:
388
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
389
                               task.get('project'), task.get('taskid'),
390
                               url, result['time'])
391
392
            callback('http', task, result)
393
            self.on_result('http', task, result)
394
            raise gen.Return((task, result))
395
396
    @gen.coroutine
397
    def phantomjs_fetch(self, url, task, callback):
398
        '''Fetch with phantomjs proxy'''
399
        start_time = time.time()
400
401
        self.on_fetch('phantomjs', task)
402
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
403
404
        # check phantomjs proxy is enabled
405
        if not self.phantomjs_proxy:
406
            result = {
407
                "orig_url": url,
408
                "content": "phantomjs is not enabled.",
409
                "headers": {},
410
                "status_code": 501,
411
                "url": url,
412
                "cookies": {},
413
                "time": 0,
414
                "save": task.get('fetch', {}).get('save')
415
            }
416
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
417
            callback('http', task, result)
418
            self.on_result('http', task, result)
419
            raise gen.Return((task, result))
420
421
        # setup request parameters
422
        fetch = self.pack_tornado_request_parameters(url, task)
423
        task_fetch = task.get('fetch', {})
424
        for each in task_fetch:
425
            if each not in fetch:
426
                fetch[each] = task_fetch[each]
427
428
        # robots.txt
429
        if task_fetch.get('robots_txt', False):
430
            user_agent = fetch['headers']['User-Agent']
431
            can_fetch = yield self.can_fetch(user_agent, url)
432
            if not can_fetch:
433
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
434
                raise gen.Return(handle_error(error))
435
436
        request_conf = {
437
            'follow_redirects': False
438
        }
439
        if 'timeout' in task_fetch:
440
            request_conf['connect_timeout'] = task_fetch['timeout']
441
            request_conf['request_timeout'] = task_fetch['timeout'] + 1
442
443
        session = cookies.RequestsCookieJar()
444
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
445
        if fetch.get('cookies'):
446
            session.update(fetch['cookies'])
447
            if 'Cookie' in request.headers:
448
                del request.headers['Cookie']
449
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
450
451
        # making requests
452
        fetch['headers'] = dict(fetch['headers'])
453
        try:
454
            request = tornado.httpclient.HTTPRequest(
455
                url="%s" % self.phantomjs_proxy, method="POST",
456
                body=json.dumps(fetch), **request_conf)
457
        except Exception as e:
458
            raise gen.Return(handle_error(e))
459
460
        try:
461
            response = yield self.http_client.fetch(request)
462
        except tornado.httpclient.HTTPError as e:
463
            if e.response:
464
                response = e.response
465
466
        if not response.body:
467
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
468
469
        try:
470
            result = json.loads(utils.text(response.body))
471
            if response.error:
472
                result['error'] = utils.text(response.error)
473
        except Exception as e:
474
            raise gen.Return(handle_error(e))
475
476
        if result.get('status_code', 200):
477
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
478
                        task.get('project'), task.get('taskid'), url, result['time'])
479
        else:
480
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
481
                         task.get('project'), task.get('taskid'),
482
                         url, result['content'], result['time'])
483
484
        callback('phantomjs', task, result)
485
        self.on_result('phantomjs', task, result)
486
        raise gen.Return((task, result))
487
488
    def run(self):
489
        '''Run loop'''
490
        logger.info("fetcher starting...")
491
492
        def queue_loop():
493
            if not self.outqueue or not self.inqueue:
494
                return
495
            while not self._quit:
496
                try:
497
                    if self.outqueue.full():
498
                        break
499
                    if self.http_client.free_size() <= 0:
500
                        break
501
                    task = self.inqueue.get_nowait()
502
                    # FIXME: decode unicode_obj should used after data selete from
503
                    # database, it's used here for performance
504
                    task = utils.decode_unicode_obj(task)
505
                    self.fetch(task)
506
                except queue.Empty:
507
                    break
508
                except KeyboardInterrupt:
509
                    break
510
                except Exception as e:
511
                    logger.exception(e)
512
                    break
513
514
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
515
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
516
        self._running = True
517
518
        try:
519
            self.ioloop.start()
520
        except KeyboardInterrupt:
521
            pass
522
523
        logger.info("fetcher exiting...")
524
525
    def quit(self):
526
        '''Quit fetcher'''
527
        self._running = False
528
        self._quit = True
529
        self.ioloop.stop()
530
531
    def size(self):
532
        return self.http_client.size()
533
534
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
535
        '''Run xmlrpc server'''
536
        import umsgpack
537
        try:
538
            from xmlrpc.server import SimpleXMLRPCServer
539
            from xmlrpc.client import Binary
540
        except ImportError:
541
            from SimpleXMLRPCServer import SimpleXMLRPCServer
542
            from xmlrpclib import Binary
543
544
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
545
        server.register_introspection_functions()
546
        server.register_multicall_functions()
547
548
        server.register_function(self.quit, '_quit')
549
        server.register_function(self.size)
550
551
        def sync_fetch(task):
552
            result = self.sync_fetch(task)
553
            result = Binary(umsgpack.packb(result))
554
            return result
555
        server.register_function(sync_fetch, 'fetch')
556
557
        def dump_counter(_time, _type):
558
            return self._cnt[_time].to_dict(_type)
559
        server.register_function(dump_counter, 'counter')
560
561
        server.timeout = 0.5
562
        while not self._quit:
563
            server.handle_request()
564
        server.server_close()
565
566
    def on_fetch(self, type, task):
567
        '''Called before task fetch'''
568
        pass
569
570
    def on_result(self, type, task, result):
571
        '''Called after task fetched'''
572
        status_code = result.get('status_code', 599)
573
        if status_code != 599:
574
            status_code = (int(status_code) / 100 * 100)
575
        self._cnt['5m'].event((task.get('project'), status_code), +1)
576
        self._cnt['1h'].event((task.get('project'), status_code), +1)
577
578
        if type == 'http' and result.get('time'):
579
            content_len = len(result.get('content', ''))
580
            self._cnt['5m'].event((task.get('project'), 'speed'),
581
                                  float(content_len) / result.get('time'))
582
            self._cnt['1h'].event((task.get('project'), 'speed'),
583
                                  float(content_len) / result.get('time'))
584
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
585
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
586