Completed
Push — master ( 0ce354...dffe31 )
by Roy
01:06
created

pyspider.fetcher.Fetcher.dump_counter()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
30
from pyspider.libs import utils, dataurl, counter
31
from pyspider.libs.url import quote_chinese
32
from .cookie_utils import extract_cookies_to_jar
33
logger = logging.getLogger('fetcher')
34
35
36
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
37
38
    def free_size(self):
39
        return len(self._free_list)
40
41
    def size(self):
42
        return len(self._curls) - self.free_size()
43
44
45
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
46
47
    def free_size(self):
48
        return self.max_clients - self.size()
49
50
    def size(self):
51
        return len(self.active)
52
53
fetcher_output = {
54
    "status_code": int,
55
    "orig_url": str,
56
    "url": str,
57
    "headers": dict,
58
    "content": str,
59
    "cookies": dict,
60
}
61
62
63
class Fetcher(object):
64
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
65
    default_options = {
66
        'method': 'GET',
67
        'headers': {
68
        },
69
        'use_gzip': True,
70
        'timeout': 120,
71
    }
72
    phantomjs_proxy = None
73
    robot_txt_age = 60*60  # 1h
74
75
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
76
        self.inqueue = inqueue
77
        self.outqueue = outqueue
78
79
        self.poolsize = poolsize
80
        self._running = False
81
        self._quit = False
82
        self.proxy = proxy
83
        self.async = async
84
        self.ioloop = tornado.ioloop.IOLoop()
85
86
        self.robots_txt_cache = {}
87
88
        # binding io_loop to http_client here
89
        if self.async:
90
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
91
                                                     io_loop=self.ioloop)
92
        else:
93
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
94
95
        self._cnt = {
96
            '5m': counter.CounterManager(
97
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
98
            '1h': counter.CounterManager(
99
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
100
        }
101
102
    def send_result(self, type, task, result):
103
        '''Send fetch result to processor'''
104
        if self.outqueue:
105
            try:
106
                self.outqueue.put((task, result))
107
            except Exception as e:
108
                logger.exception(e)
109
110
    def fetch(self, task, callback=None):
111
        if self.async:
112
            return self.async_fetch(task, callback)
113
        else:
114
            return self.async_fetch(task, callback).result()
115
116
    @gen.coroutine
117
    def async_fetch(self, task, callback=None):
118
        '''Do one fetch'''
119
        url = task.get('url', 'data:,')
120
        if callback is None:
121
            callback = self.send_result
122
123
        try:
124
            if url.startswith('data:'):
125
                ret = yield gen.maybe_future(self.data_fetch(url, task, callback))
126
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
127
                ret = yield self.phantomjs_fetch(url, task, callback)
128
            else:
129
                ret = yield self.http_fetch(url, task, callback)
130
        except Exception as e:
131
            logger.exception(e)
132
            raise e
133
134
        raise gen.Return(ret)
135
136
    def sync_fetch(self, task):
137
        '''Synchronization fetch, usually used in xmlrpc thread'''
138
        if not self._running:
139
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
140
141
        wait_result = threading.Condition()
142
        _result = {}
143
144
        def callback(type, task, result):
145
            wait_result.acquire()
146
            _result['type'] = type
147
            _result['task'] = task
148
            _result['result'] = result
149
            wait_result.notify()
150
            wait_result.release()
151
152
        wait_result.acquire()
153
        self.ioloop.add_callback(self.fetch, task, callback)
154
        while 'result' not in _result:
155
            wait_result.wait()
156
        wait_result.release()
157
        return _result['result']
158
159
    def data_fetch(self, url, task, callback):
160
        '''A fake fetcher for dataurl'''
161
        self.on_fetch('data', task)
162
        result = {}
163
        result['orig_url'] = url
164
        result['content'] = dataurl.decode(url)
165
        result['headers'] = {}
166
        result['status_code'] = 200
167
        result['url'] = url
168
        result['cookies'] = {}
169
        result['time'] = 0
170
        result['save'] = task.get('fetch', {}).get('save')
171
        if len(result['content']) < 70:
172
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
173
        else:
174
            logger.info(
175
                "[200] %s:%s data:,%s...[content:%d] 0s",
176
                task.get('project'), task.get('taskid'),
177
                result['content'][:70],
178
                len(result['content'])
179
            )
180
181
        callback('data', task, result)
182
        self.on_result('data', task, result)
183
        return task, result
184
185
    def handle_error(self, type, url, task, start_time, callback, error):
186
        result = {
187
            'status_code': getattr(error, 'code', 599),
188
            'error': utils.text(error),
189
            'content': "",
190
            'time': time.time() - start_time,
191
            'orig_url': url,
192
            'url': url,
193
        }
194
        logger.error("[%d] %s:%s %s, %r %.2fs",
195
                     result['status_code'], task.get('project'), task.get('taskid'),
196
                     url, error, result['time'])
197
        callback(type, task, result)
198
        self.on_result(type, task, result)
199
        return task, result
200
201
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
202
203
    def pack_tornado_request_parameters(self, url, task):
204
        fetch = copy.deepcopy(self.default_options)
205
        fetch['url'] = url
206
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
207
        fetch['headers']['User-Agent'] = self.user_agent
208
        task_fetch = task.get('fetch', {})
209
        for each in self.allowed_options:
210
            if each in task_fetch:
211
                fetch[each] = task_fetch[each]
212
        fetch['headers'].update(task_fetch.get('headers', {}))
213
214
        if task.get('track'):
215
            track_headers = tornado.httputil.HTTPHeaders(
216
                task.get('track', {}).get('fetch', {}).get('headers') or {})
217
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
218
        else:
219
            track_headers = {}
220
            track_ok = False
221
        # proxy
222
        proxy_string = None
223
        if isinstance(task_fetch.get('proxy'), six.string_types):
224
            proxy_string = task_fetch['proxy']
225
        elif self.proxy and task_fetch.get('proxy', True):
226
            proxy_string = self.proxy
227
        if proxy_string:
228
            if '://' not in proxy_string:
229
                proxy_string = 'http://' + proxy_string
230
            proxy_splited = urlsplit(proxy_string)
231
            if proxy_splited.username:
232
                fetch['proxy_username'] = proxy_splited.username
233
                if six.PY2:
234
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
235
            if proxy_splited.password:
236
                fetch['proxy_password'] = proxy_splited.password
237
                if six.PY2:
238
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
239
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
240
            if six.PY2:
241
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
242
            fetch['proxy_port'] = proxy_splited.port or 8080
243
244
        # etag
245
        if task_fetch.get('etag', True):
246
            _t = None
247
            if isinstance(task_fetch.get('etag'), six.string_types):
248
                _t = task_fetch.get('etag')
249
            elif track_ok:
250
                _t = track_headers.get('etag')
251
            if _t and 'If-None-Match' not in fetch['headers']:
252
                fetch['headers']['If-None-Match'] = _t
253
        # last modifed
254
        if task_fetch.get('last_modified', True):
255
            _t = None
256
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
257
                _t = task_fetch.get('last_modifed')
258
            elif track_ok:
259
                _t = track_headers.get('last-modified')
260
            if _t and 'If-Modified-Since' not in fetch['headers']:
261
                fetch['headers']['If-Modified-Since'] = _t
262
        # timeout
263
        if 'timeout' in fetch:
264
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
265
            del fetch['timeout']
266
        # data rename to body
267
        if 'data' in fetch:
268
            fetch['body'] = fetch['data']
269
            del fetch['data']
270
271
        return fetch
272
273
    @gen.coroutine
274
    def can_fetch(self, user_agent, url):
275
        parsed = urlsplit(url)
276
        domain = parsed.netloc
277
        if domain in self.robots_txt_cache:
278
            robot_txt = self.robots_txt_cache[domain]
279
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
280
                robot_txt = None
281
        else:
282
            robot_txt = None
283
284
        if robot_txt is None:
285
            robot_txt = RobotFileParser()
286
            try:
287
                response = yield gen.maybe_future(self.http_client.fetch(
288
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
289
                content = response.body
290
            except tornado.httpclient.HTTPError as e:
291
                logger.error('load robots.txt from %s error: %r', domain, e)
292
                content = ''
293
294
            try:
295
                content = content.decode('utf8', 'ignore')
296
            except UnicodeDecodeError:
297
                content = ''
298
299
            robot_txt.parse(content.splitlines())
300
            self.robots_txt_cache[domain] = robot_txt
301
302
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
303
304
    def clear_robot_txt_cache(self):
305
        now = time.time()
306
        for domain, robot_txt in self.robots_txt_cache.items():
307
            if now - robot_txt.mtime() > self.robot_txt_age:
308
                del self.robots_txt_cache[domain]
309
310
    @gen.coroutine
311
    def http_fetch(self, url, task, callback):
312
        '''HTTP fetcher'''
313
        start_time = time.time()
314
315
        self.on_fetch('http', task)
316
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
317
318
        # setup request parameters
319
        fetch = self.pack_tornado_request_parameters(url, task)
320
        task_fetch = task.get('fetch', {})
321
322
        session = cookies.RequestsCookieJar()
323
        # fix for tornado request obj
324
        if 'Cookie' in fetch['headers']:
325
            c = http_cookies.SimpleCookie()
326
            try:
327
                c.load(fetch['headers']['Cookie'])
328
            except AttributeError:
329
                c.load(utils.utf8(fetch['headers']['Cookie']))
330
            for key in c:
331
                session.set(key, c[key])
332
            del fetch['headers']['Cookie']
333
        if 'cookies' in fetch:
334
            session.update(fetch['cookies'])
335
            del fetch['cookies']
336
337
        max_redirects = task_fetch.get('max_redirects', 5)
338
        # we will handle redirects by hand to capture cookies
339
        fetch['follow_redirects'] = False
340
341
        # making requests
342
        while True:
343
            # robots.txt
344
            if task_fetch.get('robots_txt', False):
345
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
346
                if not can_fetch:
347
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
348
                    raise gen.Return(handle_error(error))
349
350
            try:
351
                request = tornado.httpclient.HTTPRequest(**fetch)
352
                # if cookie already in header, get_cookie_header wouldn't work
353
                old_cookie_header = request.headers.get('Cookie')
354
                if old_cookie_header:
355
                    del request.headers['Cookie']
356
                cookie_header = cookies.get_cookie_header(session, request)
357
                if cookie_header:
358
                    request.headers['Cookie'] = cookie_header
359
                elif old_cookie_header:
360
                    request.headers['Cookie'] = old_cookie_header
361
            except Exception as e:
362
                logger.exception(fetch)
363
                raise gen.Return(handle_error(e))
364
365
            try:
366
                response = yield gen.maybe_future(self.http_client.fetch(request))
367
            except tornado.httpclient.HTTPError as e:
368
                if e.response:
369
                    response = e.response
370
                else:
371
                    raise gen.Return(handle_error(e))
372
373
            extract_cookies_to_jar(session, response.request, response.headers)
374
            if (response.code in (301, 302, 303, 307)
375
                    and response.headers.get('Location')
376
                    and task_fetch.get('allow_redirects', True)):
377
                if max_redirects <= 0:
378
                    error = tornado.httpclient.HTTPError(
379
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
380
                        response)
381
                    raise gen.Return(handle_error(error))
382
                if response.code in (302, 303):
383
                    fetch['method'] = 'GET'
384
                    if 'body' in fetch:
385
                        del fetch['body']
386
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
387
                fetch['request_timeout'] -= time.time() - start_time
388
                if fetch['request_timeout'] < 0:
389
                    fetch['request_timeout'] = 0.1
390
                fetch['connect_timeout'] = fetch['request_timeout']
391
                max_redirects -= 1
392
                continue
393
394
            result = {}
395
            result['orig_url'] = url
396
            result['content'] = response.body or ''
397
            result['headers'] = dict(response.headers)
398
            result['status_code'] = response.code
399
            result['url'] = response.effective_url or url
400
            result['cookies'] = session.get_dict()
401
            result['time'] = time.time() - start_time
402
            result['save'] = task_fetch.get('save')
403
            if response.error:
404
                result['error'] = utils.text(response.error)
405
            if 200 <= response.code < 300:
406
                logger.info("[%d] %s:%s %s %.2fs", response.code,
407
                            task.get('project'), task.get('taskid'),
408
                            url, result['time'])
409
            else:
410
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
411
                               task.get('project'), task.get('taskid'),
412
                               url, result['time'])
413
414
            callback('http', task, result)
415
            self.on_result('http', task, result)
416
            raise gen.Return((task, result))
417
418
    @gen.coroutine
419
    def phantomjs_fetch(self, url, task, callback):
420
        '''Fetch with phantomjs proxy'''
421
        start_time = time.time()
422
423
        self.on_fetch('phantomjs', task)
424
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
425
426
        # check phantomjs proxy is enabled
427
        if not self.phantomjs_proxy:
428
            result = {
429
                "orig_url": url,
430
                "content": "phantomjs is not enabled.",
431
                "headers": {},
432
                "status_code": 501,
433
                "url": url,
434
                "cookies": {},
435
                "time": 0,
436
                "save": task.get('fetch', {}).get('save')
437
            }
438
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
439
            callback('http', task, result)
440
            self.on_result('http', task, result)
441
            raise gen.Return((task, result))
442
443
        # setup request parameters
444
        fetch = self.pack_tornado_request_parameters(url, task)
445
        task_fetch = task.get('fetch', {})
446
        for each in task_fetch:
447
            if each not in fetch:
448
                fetch[each] = task_fetch[each]
449
450
        # robots.txt
451
        if task_fetch.get('robots_txt', False):
452
            user_agent = fetch['headers']['User-Agent']
453
            can_fetch = yield self.can_fetch(user_agent, url)
454
            if not can_fetch:
455
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
456
                raise gen.Return(handle_error(error))
457
458
        request_conf = {
459
            'follow_redirects': False
460
        }
461
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 120)
462
        request_conf['request_timeout'] = fetch.get('request_timeout', 120)
463
464
        session = cookies.RequestsCookieJar()
465
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
466
        if fetch.get('cookies'):
467
            session.update(fetch['cookies'])
468
            if 'Cookie' in request.headers:
469
                del request.headers['Cookie']
470
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
471
472
        # making requests
473
        fetch['headers'] = dict(fetch['headers'])
474
        try:
475
            request = tornado.httpclient.HTTPRequest(
476
                url="%s" % self.phantomjs_proxy, method="POST",
477
                body=json.dumps(fetch), **request_conf)
478
        except Exception as e:
479
            raise gen.Return(handle_error(e))
480
481
        try:
482
            response = yield gen.maybe_future(self.http_client.fetch(request))
483
        except tornado.httpclient.HTTPError as e:
484
            if e.response:
485
                response = e.response
486
            else:
487
                raise gen.Return(handle_error(e))
488
489
        if not response.body:
490
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
491
492
        result = {}
493
        try:
494
            result = json.loads(utils.text(response.body))
495
        except Exception as e:
496
            if response.error:
497
                result['error'] = utils.text(response.error)
498
            raise gen.Return(handle_error(e))
499
500
        if result.get('status_code', 200):
501
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
502
                        task.get('project'), task.get('taskid'), url, result['time'])
503
        else:
504
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
505
                         task.get('project'), task.get('taskid'),
506
                         url, result['content'], result['time'])
507
508
        callback('phantomjs', task, result)
509
        self.on_result('phantomjs', task, result)
510
        raise gen.Return((task, result))
511
512
    def run(self):
513
        '''Run loop'''
514
        logger.info("fetcher starting...")
515
516
        def queue_loop():
517
            if not self.outqueue or not self.inqueue:
518
                return
519
            while not self._quit:
520
                try:
521
                    if self.outqueue.full():
522
                        break
523
                    if self.http_client.free_size() <= 0:
524
                        break
525
                    task = self.inqueue.get_nowait()
526
                    # FIXME: decode unicode_obj should used after data selete from
527
                    # database, it's used here for performance
528
                    task = utils.decode_unicode_obj(task)
529
                    self.fetch(task)
530
                except queue.Empty:
531
                    break
532
                except KeyboardInterrupt:
533
                    break
534
                except Exception as e:
535
                    logger.exception(e)
536
                    break
537
538
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
539
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
540
        self._running = True
541
542
        try:
543
            self.ioloop.start()
544
        except KeyboardInterrupt:
545
            pass
546
547
        logger.info("fetcher exiting...")
548
549
    def quit(self):
550
        '''Quit fetcher'''
551
        self._running = False
552
        self._quit = True
553
        self.ioloop.add_callback(self.ioloop.stop)
554
        if hasattr(self, 'xmlrpc_server'):
555
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
556
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
557
558
    def size(self):
559
        return self.http_client.size()
560
561
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
562
        '''Run xmlrpc server'''
563
        import umsgpack
564
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
565
        try:
566
            from xmlrpc.client import Binary
567
        except ImportError:
568
            from xmlrpclib import Binary
569
570
        application = WSGIXMLRPCApplication()
571
572
        application.register_function(self.quit, '_quit')
573
        application.register_function(self.size)
574
575
        def sync_fetch(task):
576
            result = self.sync_fetch(task)
577
            result = Binary(umsgpack.packb(result))
578
            return result
579
        application.register_function(sync_fetch, 'fetch')
580
581
        def dump_counter(_time, _type):
582
            return self._cnt[_time].to_dict(_type)
583
        application.register_function(dump_counter, 'counter')
584
585
        import tornado.wsgi
586
        import tornado.ioloop
587
        import tornado.httpserver
588
589
        container = tornado.wsgi.WSGIContainer(application)
590
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
591
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
592
        self.xmlrpc_server.listen(port=port, address=bind)
593
        self.xmlrpc_ioloop.start()
594
595
    def on_fetch(self, type, task):
596
        '''Called before task fetch'''
597
        pass
598
599
    def on_result(self, type, task, result):
600
        '''Called after task fetched'''
601
        status_code = result.get('status_code', 599)
602
        if status_code != 599:
603
            status_code = (int(status_code) / 100 * 100)
604
        self._cnt['5m'].event((task.get('project'), status_code), +1)
605
        self._cnt['1h'].event((task.get('project'), status_code), +1)
606
607
        if type == 'http' and result.get('time'):
608
            content_len = len(result.get('content', ''))
609
            self._cnt['5m'].event((task.get('project'), 'speed'),
610
                                  float(content_len) / result.get('time'))
611
            self._cnt['1h'].event((task.get('project'), 'speed'),
612
                                  float(content_len) / result.get('time'))
613
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
614
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
615