Completed
Push — master ( aa4b33...0ce354 )
by Roy
01:07
created

pyspider.fetcher.Fetcher.sync_fetch()   A

Complexity

Conditions 1

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 22
rs 9.2

1 Method

Rating   Name   Duplication   Size   Complexity  
A pyspider.fetcher.Fetcher.callback() 0 7 1
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
from pyspider.libs import utils, dataurl, counter
30
from .cookie_utils import extract_cookies_to_jar
31
logger = logging.getLogger('fetcher')
32
33
34
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
35
36
    def free_size(self):
37
        return len(self._free_list)
38
39
    def size(self):
40
        return len(self._curls) - self.free_size()
41
42
43
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
44
45
    def free_size(self):
46
        return self.max_clients - self.size()
47
48
    def size(self):
49
        return len(self.active)
50
51
fetcher_output = {
52
    "status_code": int,
53
    "orig_url": str,
54
    "url": str,
55
    "headers": dict,
56
    "content": str,
57
    "cookies": dict,
58
}
59
60
61
class Fetcher(object):
62
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
63
    default_options = {
64
        'method': 'GET',
65
        'headers': {
66
        },
67
        'use_gzip': True,
68
        'timeout': 120,
69
    }
70
    phantomjs_proxy = None
71
    robot_txt_age = 60*60  # 1h
72
73
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
74
        self.inqueue = inqueue
75
        self.outqueue = outqueue
76
77
        self.poolsize = poolsize
78
        self._running = False
79
        self._quit = False
80
        self.proxy = proxy
81
        self.async = async
82
        self.ioloop = tornado.ioloop.IOLoop()
83
84
        self.robots_txt_cache = {}
85
86
        # binding io_loop to http_client here
87
        if self.async:
88
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
89
                                                     io_loop=self.ioloop)
90
        else:
91
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
92
93
        self._cnt = {
94
            '5m': counter.CounterManager(
95
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
96
            '1h': counter.CounterManager(
97
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
98
        }
99
100
    def send_result(self, type, task, result):
101
        '''Send fetch result to processor'''
102
        if self.outqueue:
103
            try:
104
                self.outqueue.put((task, result))
105
            except Exception as e:
106
                logger.exception(e)
107
108
    def fetch(self, task, callback=None):
109
        if self.async:
110
            return self.async_fetch(task, callback)
111
        else:
112
            return self.async_fetch(task, callback).result()
113
114
    @gen.coroutine
115
    def async_fetch(self, task, callback=None):
116
        '''Do one fetch'''
117
        url = task.get('url', 'data:,')
118
        if callback is None:
119
            callback = self.send_result
120
121
        try:
122
            if url.startswith('data:'):
123
                ret = yield gen.maybe_future(self.data_fetch(url, task, callback))
124
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
125
                ret = yield self.phantomjs_fetch(url, task, callback)
126
            else:
127
                ret = yield self.http_fetch(url, task, callback)
128
        except Exception as e:
129
            logger.exception(e)
130
            raise e
131
132
        raise gen.Return(ret)
133
134
    def sync_fetch(self, task):
135
        '''Synchronization fetch, usually used in xmlrpc thread'''
136
        if not self._running:
137
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
138
139
        wait_result = threading.Condition()
140
        _result = {}
141
142
        def callback(type, task, result):
143
            wait_result.acquire()
144
            _result['type'] = type
145
            _result['task'] = task
146
            _result['result'] = result
147
            wait_result.notify()
148
            wait_result.release()
149
150
        wait_result.acquire()
151
        self.ioloop.add_callback(self.fetch, task, callback)
152
        while 'result' not in _result:
153
            wait_result.wait()
154
        wait_result.release()
155
        return _result['result']
156
157
    def data_fetch(self, url, task, callback):
158
        '''A fake fetcher for dataurl'''
159
        self.on_fetch('data', task)
160
        result = {}
161
        result['orig_url'] = url
162
        result['content'] = dataurl.decode(url)
163
        result['headers'] = {}
164
        result['status_code'] = 200
165
        result['url'] = url
166
        result['cookies'] = {}
167
        result['time'] = 0
168
        result['save'] = task.get('fetch', {}).get('save')
169
        if len(result['content']) < 70:
170
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
171
        else:
172
            logger.info(
173
                "[200] %s:%s data:,%s...[content:%d] 0s",
174
                task.get('project'), task.get('taskid'),
175
                result['content'][:70],
176
                len(result['content'])
177
            )
178
179
        callback('data', task, result)
180
        self.on_result('data', task, result)
181
        return task, result
182
183
    def handle_error(self, type, url, task, start_time, callback, error):
184
        result = {
185
            'status_code': getattr(error, 'code', 599),
186
            'error': utils.text(error),
187
            'content': "",
188
            'time': time.time() - start_time,
189
            'orig_url': url,
190
            'url': url,
191
        }
192
        logger.error("[%d] %s:%s %s, %r %.2fs",
193
                     result['status_code'], task.get('project'), task.get('taskid'),
194
                     url, error, result['time'])
195
        callback(type, task, result)
196
        self.on_result(type, task, result)
197
        return task, result
198
199
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
200
201
    def pack_tornado_request_parameters(self, url, task):
202
        fetch = copy.deepcopy(self.default_options)
203
        fetch['url'] = url
204
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
205
        fetch['headers']['User-Agent'] = self.user_agent
206
        task_fetch = task.get('fetch', {})
207
        for each in self.allowed_options:
208
            if each in task_fetch:
209
                fetch[each] = task_fetch[each]
210
        fetch['headers'].update(task_fetch.get('headers', {}))
211
212
        if task.get('track'):
213
            track_headers = tornado.httputil.HTTPHeaders(
214
                task.get('track', {}).get('fetch', {}).get('headers') or {})
215
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
216
        else:
217
            track_headers = {}
218
            track_ok = False
219
        # proxy
220
        proxy_string = None
221
        if isinstance(task_fetch.get('proxy'), six.string_types):
222
            proxy_string = task_fetch['proxy']
223
        elif self.proxy and task_fetch.get('proxy', True):
224
            proxy_string = self.proxy
225
        if proxy_string:
226
            if '://' not in proxy_string:
227
                proxy_string = 'http://' + proxy_string
228
            proxy_splited = urlsplit(proxy_string)
229
            if proxy_splited.username:
230
                fetch['proxy_username'] = proxy_splited.username
231
                if six.PY2:
232
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
233
            if proxy_splited.password:
234
                fetch['proxy_password'] = proxy_splited.password
235
                if six.PY2:
236
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
237
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
238
            if six.PY2:
239
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
240
            fetch['proxy_port'] = proxy_splited.port or 8080
241
242
        # etag
243
        if task_fetch.get('etag', True):
244
            _t = None
245
            if isinstance(task_fetch.get('etag'), six.string_types):
246
                _t = task_fetch.get('etag')
247
            elif track_ok:
248
                _t = track_headers.get('etag')
249
            if _t and 'If-None-Match' not in fetch['headers']:
250
                fetch['headers']['If-None-Match'] = _t
251
        # last modifed
252
        if task_fetch.get('last_modified', True):
253
            _t = None
254
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
255
                _t = task_fetch.get('last_modifed')
256
            elif track_ok:
257
                _t = track_headers.get('last-modified')
258
            if _t and 'If-Modified-Since' not in fetch['headers']:
259
                fetch['headers']['If-Modified-Since'] = _t
260
        # timeout
261
        if 'timeout' in fetch:
262
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
263
            del fetch['timeout']
264
        # data rename to body
265
        if 'data' in fetch:
266
            fetch['body'] = fetch['data']
267
            del fetch['data']
268
269
        return fetch
270
271
    @gen.coroutine
272
    def can_fetch(self, user_agent, url):
273
        parsed = urlsplit(url)
274
        domain = parsed.netloc
275
        if domain in self.robots_txt_cache:
276
            robot_txt = self.robots_txt_cache[domain]
277
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
278
                robot_txt = None
279
        else:
280
            robot_txt = None
281
282
        if robot_txt is None:
283
            robot_txt = RobotFileParser()
284
            try:
285
                response = yield gen.maybe_future(self.http_client.fetch(
286
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
287
                content = response.body
288
            except tornado.httpclient.HTTPError as e:
289
                logger.error('load robots.txt from %s error: %r', domain, e)
290
                content = ''
291
292
            try:
293
                content = content.decode('utf8', 'ignore')
294
            except UnicodeDecodeError:
295
                content = ''
296
297
            robot_txt.parse(content.splitlines())
298
            self.robots_txt_cache[domain] = robot_txt
299
300
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
301
302
    def clear_robot_txt_cache(self):
303
        now = time.time()
304
        for domain, robot_txt in self.robots_txt_cache.items():
305
            if now - robot_txt.mtime() > self.robot_txt_age:
306
                del self.robots_txt_cache[domain]
307
308
    @gen.coroutine
309
    def http_fetch(self, url, task, callback):
310
        '''HTTP fetcher'''
311
        start_time = time.time()
312
313
        self.on_fetch('http', task)
314
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
315
316
        # setup request parameters
317
        fetch = self.pack_tornado_request_parameters(url, task)
318
        task_fetch = task.get('fetch', {})
319
320
        session = cookies.RequestsCookieJar()
321
        # fix for tornado request obj
322
        if 'Cookie' in fetch['headers']:
323
            c = http_cookies.SimpleCookie()
324
            try:
325
                c.load(fetch['headers']['Cookie'])
326
            except AttributeError:
327
                c.load(utils.utf8(fetch['headers']['Cookie']))
328
            for key in c:
329
                session.set(key, c[key])
330
            del fetch['headers']['Cookie']
331
        if 'cookies' in fetch:
332
            session.update(fetch['cookies'])
333
            del fetch['cookies']
334
335
        max_redirects = task_fetch.get('max_redirects', 5)
336
        # we will handle redirects by hand to capture cookies
337
        fetch['follow_redirects'] = False
338
339
        # making requests
340
        while True:
341
            # robots.txt
342
            if task_fetch.get('robots_txt', False):
343
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
344
                if not can_fetch:
345
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
346
                    raise gen.Return(handle_error(error))
347
348
            try:
349
                request = tornado.httpclient.HTTPRequest(**fetch)
350
                # if cookie already in header, get_cookie_header wouldn't work
351
                old_cookie_header = request.headers.get('Cookie')
352
                if old_cookie_header:
353
                    del request.headers['Cookie']
354
                cookie_header = cookies.get_cookie_header(session, request)
355
                if cookie_header:
356
                    request.headers['Cookie'] = cookie_header
357
                elif old_cookie_header:
358
                    request.headers['Cookie'] = old_cookie_header
359
            except Exception as e:
360
                logger.exception(fetch)
361
                raise gen.Return(handle_error(e))
362
363
            try:
364
                response = yield gen.maybe_future(self.http_client.fetch(request))
365
            except tornado.httpclient.HTTPError as e:
366
                if e.response:
367
                    response = e.response
368
                else:
369
                    raise gen.Return(handle_error(e))
370
371
            extract_cookies_to_jar(session, response.request, response.headers)
372
            if (response.code in (301, 302, 303, 307)
373
                    and response.headers.get('Location')
374
                    and task_fetch.get('allow_redirects', True)):
375
                if max_redirects <= 0:
376
                    error = tornado.httpclient.HTTPError(
377
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
378
                        response)
379
                    raise gen.Return(handle_error(error))
380
                if response.code in (302, 303):
381
                    fetch['method'] = 'GET'
382
                    if 'body' in fetch:
383
                        del fetch['body']
384
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
385
                fetch['request_timeout'] -= time.time() - start_time
386
                if fetch['request_timeout'] < 0:
387
                    fetch['request_timeout'] = 0.1
388
                fetch['connect_timeout'] = fetch['request_timeout']
389
                max_redirects -= 1
390
                continue
391
392
            result = {}
393
            result['orig_url'] = url
394
            result['content'] = response.body or ''
395
            result['headers'] = dict(response.headers)
396
            result['status_code'] = response.code
397
            result['url'] = response.effective_url or url
398
            result['cookies'] = session.get_dict()
399
            result['time'] = time.time() - start_time
400
            result['save'] = task_fetch.get('save')
401
            if response.error:
402
                result['error'] = utils.text(response.error)
403
            if 200 <= response.code < 300:
404
                logger.info("[%d] %s:%s %s %.2fs", response.code,
405
                            task.get('project'), task.get('taskid'),
406
                            url, result['time'])
407
            else:
408
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
409
                               task.get('project'), task.get('taskid'),
410
                               url, result['time'])
411
412
            callback('http', task, result)
413
            self.on_result('http', task, result)
414
            raise gen.Return((task, result))
415
416
    @gen.coroutine
417
    def phantomjs_fetch(self, url, task, callback):
418
        '''Fetch with phantomjs proxy'''
419
        start_time = time.time()
420
421
        self.on_fetch('phantomjs', task)
422
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
423
424
        # check phantomjs proxy is enabled
425
        if not self.phantomjs_proxy:
426
            result = {
427
                "orig_url": url,
428
                "content": "phantomjs is not enabled.",
429
                "headers": {},
430
                "status_code": 501,
431
                "url": url,
432
                "cookies": {},
433
                "time": 0,
434
                "save": task.get('fetch', {}).get('save')
435
            }
436
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
437
            callback('http', task, result)
438
            self.on_result('http', task, result)
439
            raise gen.Return((task, result))
440
441
        # setup request parameters
442
        fetch = self.pack_tornado_request_parameters(url, task)
443
        task_fetch = task.get('fetch', {})
444
        for each in task_fetch:
445
            if each not in fetch:
446
                fetch[each] = task_fetch[each]
447
448
        # robots.txt
449
        if task_fetch.get('robots_txt', False):
450
            user_agent = fetch['headers']['User-Agent']
451
            can_fetch = yield self.can_fetch(user_agent, url)
452
            if not can_fetch:
453
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
454
                raise gen.Return(handle_error(error))
455
456
        request_conf = {
457
            'follow_redirects': False
458
        }
459
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 120)
460
        request_conf['request_timeout'] = fetch.get('request_timeout', 120)
461
462
        session = cookies.RequestsCookieJar()
463
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
464
        if fetch.get('cookies'):
465
            session.update(fetch['cookies'])
466
            if 'Cookie' in request.headers:
467
                del request.headers['Cookie']
468
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
469
470
        # making requests
471
        fetch['headers'] = dict(fetch['headers'])
472
        try:
473
            request = tornado.httpclient.HTTPRequest(
474
                url="%s" % self.phantomjs_proxy, method="POST",
475
                body=json.dumps(fetch), **request_conf)
476
        except Exception as e:
477
            raise gen.Return(handle_error(e))
478
479
        try:
480
            response = yield gen.maybe_future(self.http_client.fetch(request))
481
        except tornado.httpclient.HTTPError as e:
482
            if e.response:
483
                response = e.response
484
            else:
485
                raise gen.Return(handle_error(e))
486
487
        if not response.body:
488
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
489
490
        result = {}
491
        try:
492
            result = json.loads(utils.text(response.body))
493
        except Exception as e:
494
            if response.error:
495
                result['error'] = utils.text(response.error)
496
            raise gen.Return(handle_error(e))
497
498
        if result.get('status_code', 200):
499
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
500
                        task.get('project'), task.get('taskid'), url, result['time'])
501
        else:
502
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
503
                         task.get('project'), task.get('taskid'),
504
                         url, result['content'], result['time'])
505
506
        callback('phantomjs', task, result)
507
        self.on_result('phantomjs', task, result)
508
        raise gen.Return((task, result))
509
510
    def run(self):
511
        '''Run loop'''
512
        logger.info("fetcher starting...")
513
514
        def queue_loop():
515
            if not self.outqueue or not self.inqueue:
516
                return
517
            while not self._quit:
518
                try:
519
                    if self.outqueue.full():
520
                        break
521
                    if self.http_client.free_size() <= 0:
522
                        break
523
                    task = self.inqueue.get_nowait()
524
                    # FIXME: decode unicode_obj should used after data selete from
525
                    # database, it's used here for performance
526
                    task = utils.decode_unicode_obj(task)
527
                    self.fetch(task)
528
                except queue.Empty:
529
                    break
530
                except KeyboardInterrupt:
531
                    break
532
                except Exception as e:
533
                    logger.exception(e)
534
                    break
535
536
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
537
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
538
        self._running = True
539
540
        try:
541
            self.ioloop.start()
542
        except KeyboardInterrupt:
543
            pass
544
545
        logger.info("fetcher exiting...")
546
547
    def quit(self):
548
        '''Quit fetcher'''
549
        self._running = False
550
        self._quit = True
551
        self.ioloop.add_callback(self.ioloop.stop)
552
        if hasattr(self, 'xmlrpc_server'):
553
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
554
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
555
556
    def size(self):
557
        return self.http_client.size()
558
559
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
560
        '''Run xmlrpc server'''
561
        import umsgpack
562
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
563
        try:
564
            from xmlrpc.client import Binary
565
        except ImportError:
566
            from xmlrpclib import Binary
567
568
        application = WSGIXMLRPCApplication()
569
570
        application.register_function(self.quit, '_quit')
571
        application.register_function(self.size)
572
573
        def sync_fetch(task):
574
            result = self.sync_fetch(task)
575
            result = Binary(umsgpack.packb(result))
576
            return result
577
        application.register_function(sync_fetch, 'fetch')
578
579
        def dump_counter(_time, _type):
580
            return self._cnt[_time].to_dict(_type)
581
        application.register_function(dump_counter, 'counter')
582
583
        import tornado.wsgi
584
        import tornado.ioloop
585
        import tornado.httpserver
586
587
        container = tornado.wsgi.WSGIContainer(application)
588
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
589
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
590
        self.xmlrpc_server.listen(port=port, address=bind)
591
        self.xmlrpc_ioloop.start()
592
593
    def on_fetch(self, type, task):
594
        '''Called before task fetch'''
595
        pass
596
597
    def on_result(self, type, task, result):
598
        '''Called after task fetched'''
599
        status_code = result.get('status_code', 599)
600
        if status_code != 599:
601
            status_code = (int(status_code) / 100 * 100)
602
        self._cnt['5m'].event((task.get('project'), status_code), +1)
603
        self._cnt['1h'].event((task.get('project'), status_code), +1)
604
605
        if type == 'http' and result.get('time'):
606
            content_len = len(result.get('content', ''))
607
            self._cnt['5m'].event((task.get('project'), 'speed'),
608
                                  float(content_len) / result.get('time'))
609
            self._cnt['1h'].event((task.get('project'), 'speed'),
610
                                  float(content_len) / result.get('time'))
611
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
612
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
613