Completed
Push — master ( 95d59c...0bf0f7 )
by Roy
01:06
created

pyspider.fetcher.Fetcher.xmlrpc_run()   B

Complexity

Conditions 4

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 33
rs 8.5806

1 Method

Rating   Name   Duplication   Size   Complexity  
A pyspider.fetcher.Fetcher.dump_counter() 0 2 1
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from six.moves.urllib.robotparser import RobotFileParser
24
from requests import cookies
25
from six.moves.urllib.parse import urljoin, urlsplit
26
from tornado import gen
27
from tornado.curl_httpclient import CurlAsyncHTTPClient
28
from tornado.simple_httpclient import SimpleAsyncHTTPClient
29
30
from pyspider.libs import utils, dataurl, counter
31
from pyspider.libs.url import quote_chinese
32
from .cookie_utils import extract_cookies_to_jar
33
logger = logging.getLogger('fetcher')
34
35
36
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
37
38
    def free_size(self):
39
        return len(self._free_list)
40
41
    def size(self):
42
        return len(self._curls) - self.free_size()
43
44
45
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
46
47
    def free_size(self):
48
        return self.max_clients - self.size()
49
50
    def size(self):
51
        return len(self.active)
52
53
fetcher_output = {
54
    "status_code": int,
55
    "orig_url": str,
56
    "url": str,
57
    "headers": dict,
58
    "content": str,
59
    "cookies": dict,
60
}
61
62
63
class Fetcher(object):
64
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
65
    default_options = {
66
        'method': 'GET',
67
        'headers': {
68
        },
69
        'use_gzip': True,
70
        'timeout': 120,
71
    }
72
    phantomjs_proxy = None
73
    robot_txt_age = 60*60  # 1h
74
75
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
76
        self.inqueue = inqueue
77
        self.outqueue = outqueue
78
79
        self.poolsize = poolsize
80
        self._running = False
81
        self._quit = False
82
        self.proxy = proxy
83
        self.async = async
84
        self.ioloop = tornado.ioloop.IOLoop()
85
86
        self.robots_txt_cache = {}
87
88
        # binding io_loop to http_client here
89
        if self.async:
90
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
91
                                                     io_loop=self.ioloop)
92
        else:
93
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
94
95
        self._cnt = {
96
            '5m': counter.CounterManager(
97
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
98
            '1h': counter.CounterManager(
99
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
100
        }
101
102
    def send_result(self, type, task, result):
103
        '''Send fetch result to processor'''
104
        if self.outqueue:
105
            try:
106
                self.outqueue.put((task, result))
107
            except Exception as e:
108
                logger.exception(e)
109
110
    def fetch(self, task, callback=None):
111
        if self.async:
112
            return self.async_fetch(task, callback)
113
        else:
114
            return self.async_fetch(task, callback).result()
115
116
    @gen.coroutine
117
    def async_fetch(self, task, callback=None):
118
        '''Do one fetch'''
119
        url = task.get('url', 'data:,')
120
        if callback is None:
121
            callback = self.send_result
122
123
        type = 'None'
124
        try:
125
            if url.startswith('data:'):
126
                type = 'data'
127
                result = yield gen.maybe_future(self.data_fetch(url, task))
128
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
129
                type = 'phantomjs'
130
                result = yield self.phantomjs_fetch(url, task)
131
            else:
132
                type = 'http'
133
                result = yield self.http_fetch(url, task)
134
        except Exception as e:
135
            logger.exception(e)
136
            result = self.handle_error(type, url, task, e)
137
138
        callback(type, task, result)
139
        self.on_result(type, task, result)
140
        raise gen.Return(result)
141
142
    def sync_fetch(self, task):
143
        '''Synchronization fetch, usually used in xmlrpc thread'''
144
        if not self._running:
145
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
146
147
        wait_result = threading.Condition()
148
        _result = {}
149
150
        def callback(type, task, result):
151
            wait_result.acquire()
152
            _result['type'] = type
153
            _result['task'] = task
154
            _result['result'] = result
155
            wait_result.notify()
156
            wait_result.release()
157
158
        wait_result.acquire()
159
        self.ioloop.add_callback(self.fetch, task, callback)
160
        while 'result' not in _result:
161
            wait_result.wait()
162
        wait_result.release()
163
        return _result['result']
164
165
    def data_fetch(self, url, task):
166
        '''A fake fetcher for dataurl'''
167
        self.on_fetch('data', task)
168
        result = {}
169
        result['orig_url'] = url
170
        result['content'] = dataurl.decode(url)
171
        result['headers'] = {}
172
        result['status_code'] = 200
173
        result['url'] = url
174
        result['cookies'] = {}
175
        result['time'] = 0
176
        result['save'] = task.get('fetch', {}).get('save')
177
        if len(result['content']) < 70:
178
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
179
        else:
180
            logger.info(
181
                "[200] %s:%s data:,%s...[content:%d] 0s",
182
                task.get('project'), task.get('taskid'),
183
                result['content'][:70],
184
                len(result['content'])
185
            )
186
187
        return result
188
189
    def handle_error(self, type, url, task, start_time, error):
190
        result = {
191
            'status_code': getattr(error, 'code', 599),
192
            'error': utils.text(error),
193
            'content': "",
194
            'time': time.time() - start_time,
195
            'orig_url': url,
196
            'url': url,
197
        }
198
        logger.error("[%d] %s:%s %s, %r %.2fs",
199
                     result['status_code'], task.get('project'), task.get('taskid'),
200
                     url, error, result['time'])
201
        return result
202
203
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
204
205
    def pack_tornado_request_parameters(self, url, task):
206
        fetch = copy.deepcopy(self.default_options)
207
        fetch['url'] = url
208
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
209
        fetch['headers']['User-Agent'] = self.user_agent
210
        task_fetch = task.get('fetch', {})
211
        for each in self.allowed_options:
212
            if each in task_fetch:
213
                fetch[each] = task_fetch[each]
214
        fetch['headers'].update(task_fetch.get('headers', {}))
215
216
        if task.get('track'):
217
            track_headers = tornado.httputil.HTTPHeaders(
218
                task.get('track', {}).get('fetch', {}).get('headers') or {})
219
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
220
        else:
221
            track_headers = {}
222
            track_ok = False
223
        # proxy
224
        proxy_string = None
225
        if isinstance(task_fetch.get('proxy'), six.string_types):
226
            proxy_string = task_fetch['proxy']
227
        elif self.proxy and task_fetch.get('proxy', True):
228
            proxy_string = self.proxy
229
        if proxy_string:
230
            if '://' not in proxy_string:
231
                proxy_string = 'http://' + proxy_string
232
            proxy_splited = urlsplit(proxy_string)
233
            if proxy_splited.username:
234
                fetch['proxy_username'] = proxy_splited.username
235
                if six.PY2:
236
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
237
            if proxy_splited.password:
238
                fetch['proxy_password'] = proxy_splited.password
239
                if six.PY2:
240
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
241
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
242
            if six.PY2:
243
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
244
            fetch['proxy_port'] = proxy_splited.port or 8080
245
246
        # etag
247
        if task_fetch.get('etag', True):
248
            _t = None
249
            if isinstance(task_fetch.get('etag'), six.string_types):
250
                _t = task_fetch.get('etag')
251
            elif track_ok:
252
                _t = track_headers.get('etag')
253
            if _t and 'If-None-Match' not in fetch['headers']:
254
                fetch['headers']['If-None-Match'] = _t
255
        # last modifed
256
        if task_fetch.get('last_modified', True):
257
            _t = None
258
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
259
                _t = task_fetch.get('last_modifed')
260
            elif track_ok:
261
                _t = track_headers.get('last-modified')
262
            if _t and 'If-Modified-Since' not in fetch['headers']:
263
                fetch['headers']['If-Modified-Since'] = _t
264
        # timeout
265
        if 'timeout' in fetch:
266
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
267
            del fetch['timeout']
268
        # data rename to body
269
        if 'data' in fetch:
270
            fetch['body'] = fetch['data']
271
            del fetch['data']
272
273
        return fetch
274
275
    @gen.coroutine
276
    def can_fetch(self, user_agent, url):
277
        parsed = urlsplit(url)
278
        domain = parsed.netloc
279
        if domain in self.robots_txt_cache:
280
            robot_txt = self.robots_txt_cache[domain]
281
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
282
                robot_txt = None
283
        else:
284
            robot_txt = None
285
286
        if robot_txt is None:
287
            robot_txt = RobotFileParser()
288
            try:
289
                response = yield gen.maybe_future(self.http_client.fetch(
290
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
291
                content = response.body
292
            except tornado.httpclient.HTTPError as e:
293
                logger.error('load robots.txt from %s error: %r', domain, e)
294
                content = ''
295
296
            try:
297
                content = content.decode('utf8', 'ignore')
298
            except UnicodeDecodeError:
299
                content = ''
300
301
            robot_txt.parse(content.splitlines())
302
            self.robots_txt_cache[domain] = robot_txt
303
304
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
305
306
    def clear_robot_txt_cache(self):
307
        now = time.time()
308
        for domain, robot_txt in self.robots_txt_cache.items():
309
            if now - robot_txt.mtime() > self.robot_txt_age:
310
                del self.robots_txt_cache[domain]
311
312
    @gen.coroutine
313
    def http_fetch(self, url, task):
314
        '''HTTP fetcher'''
315
        start_time = time.time()
316
        self.on_fetch('http', task)
317
        handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
318
319
        # setup request parameters
320
        fetch = self.pack_tornado_request_parameters(url, task)
321
        task_fetch = task.get('fetch', {})
322
323
        session = cookies.RequestsCookieJar()
324
        # fix for tornado request obj
325
        if 'Cookie' in fetch['headers']:
326
            c = http_cookies.SimpleCookie()
327
            try:
328
                c.load(fetch['headers']['Cookie'])
329
            except AttributeError:
330
                c.load(utils.utf8(fetch['headers']['Cookie']))
331
            for key in c:
332
                session.set(key, c[key])
333
            del fetch['headers']['Cookie']
334
        if 'cookies' in fetch:
335
            session.update(fetch['cookies'])
336
            del fetch['cookies']
337
338
        max_redirects = task_fetch.get('max_redirects', 5)
339
        # we will handle redirects by hand to capture cookies
340
        fetch['follow_redirects'] = False
341
342
        # making requests
343
        while True:
344
            # robots.txt
345
            if task_fetch.get('robots_txt', False):
346
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
347
                if not can_fetch:
348
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
349
                    raise gen.Return(handle_error(error))
350
351
            try:
352
                request = tornado.httpclient.HTTPRequest(**fetch)
353
                # if cookie already in header, get_cookie_header wouldn't work
354
                old_cookie_header = request.headers.get('Cookie')
355
                if old_cookie_header:
356
                    del request.headers['Cookie']
357
                cookie_header = cookies.get_cookie_header(session, request)
358
                if cookie_header:
359
                    request.headers['Cookie'] = cookie_header
360
                elif old_cookie_header:
361
                    request.headers['Cookie'] = old_cookie_header
362
            except Exception as e:
363
                logger.exception(fetch)
364
                raise gen.Return(handle_error(e))
365
366
            try:
367
                response = yield gen.maybe_future(self.http_client.fetch(request))
368
            except tornado.httpclient.HTTPError as e:
369
                if e.response:
370
                    response = e.response
371
                else:
372
                    raise gen.Return(handle_error(e))
373
374
            extract_cookies_to_jar(session, response.request, response.headers)
375
            if (response.code in (301, 302, 303, 307)
376
                    and response.headers.get('Location')
377
                    and task_fetch.get('allow_redirects', True)):
378
                if max_redirects <= 0:
379
                    error = tornado.httpclient.HTTPError(
380
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
381
                        response)
382
                    raise gen.Return(handle_error(error))
383
                if response.code in (302, 303):
384
                    fetch['method'] = 'GET'
385
                    if 'body' in fetch:
386
                        del fetch['body']
387
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
388
                fetch['request_timeout'] -= time.time() - start_time
389
                if fetch['request_timeout'] < 0:
390
                    fetch['request_timeout'] = 0.1
391
                fetch['connect_timeout'] = fetch['request_timeout']
392
                max_redirects -= 1
393
                continue
394
395
            result = {}
396
            result['orig_url'] = url
397
            result['content'] = response.body or ''
398
            result['headers'] = dict(response.headers)
399
            result['status_code'] = response.code
400
            result['url'] = response.effective_url or url
401
            result['time'] = time.time() - start_time
402
            result['cookies'] = session.get_dict()
403
            result['save'] = task_fetch.get('save')
404
            if response.error:
405
                result['error'] = utils.text(response.error)
406
            if 200 <= response.code < 300:
407
                logger.info("[%d] %s:%s %s %.2fs", response.code,
408
                            task.get('project'), task.get('taskid'),
409
                            url, result['time'])
410
            else:
411
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
412
                               task.get('project'), task.get('taskid'),
413
                               url, result['time'])
414
415
            raise gen.Return(result)
416
417
    @gen.coroutine
418
    def phantomjs_fetch(self, url, task):
419
        '''Fetch with phantomjs proxy'''
420
        start_time = time.time()
421
        self.on_fetch('phantomjs', task)
422
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
423
424
        # check phantomjs proxy is enabled
425
        if not self.phantomjs_proxy:
426
            result = {
427
                "orig_url": url,
428
                "content": "phantomjs is not enabled.",
429
                "headers": {},
430
                "status_code": 501,
431
                "url": url,
432
                "time": time.time() - start_time,
433
                "cookies": {},
434
                "save": task.get('fetch', {}).get('save')
435
            }
436
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
437
            raise gen.Return(result)
438
439
        # setup request parameters
440
        fetch = self.pack_tornado_request_parameters(url, task)
441
        task_fetch = task.get('fetch', {})
442
        for each in task_fetch:
443
            if each not in fetch:
444
                fetch[each] = task_fetch[each]
445
446
        # robots.txt
447
        if task_fetch.get('robots_txt', False):
448
            user_agent = fetch['headers']['User-Agent']
449
            can_fetch = yield self.can_fetch(user_agent, url)
450
            if not can_fetch:
451
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
452
                raise gen.Return(handle_error(error))
453
454
        request_conf = {
455
            'follow_redirects': False
456
        }
457
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 120)
458
        request_conf['request_timeout'] = fetch.get('request_timeout', 120)
459
460
        session = cookies.RequestsCookieJar()
461
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
462
        if fetch.get('cookies'):
463
            session.update(fetch['cookies'])
464
            if 'Cookie' in request.headers:
465
                del request.headers['Cookie']
466
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
467
468
        # making requests
469
        fetch['headers'] = dict(fetch['headers'])
470
        try:
471
            request = tornado.httpclient.HTTPRequest(
472
                url="%s" % self.phantomjs_proxy, method="POST",
473
                body=json.dumps(fetch), **request_conf)
474
        except Exception as e:
475
            raise gen.Return(handle_error(e))
476
477
        try:
478
            response = yield gen.maybe_future(self.http_client.fetch(request))
479
        except tornado.httpclient.HTTPError as e:
480
            if e.response:
481
                response = e.response
482
            else:
483
                raise gen.Return(handle_error(e))
484
485
        if not response.body:
486
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
487
488
        result = {}
489
        try:
490
            result = json.loads(utils.text(response.body))
491
        except Exception as e:
492
            if response.error:
493
                result['error'] = utils.text(response.error)
494
            raise gen.Return(handle_error(e))
495
496
        if result.get('status_code', 200):
497
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
498
                        task.get('project'), task.get('taskid'), url, result['time'])
499
        else:
500
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
501
                         task.get('project'), task.get('taskid'),
502
                         url, result['content'], result['time'])
503
504
        raise gen.Return(result)
505
506
    def run(self):
507
        '''Run loop'''
508
        logger.info("fetcher starting...")
509
510
        def queue_loop():
511
            if not self.outqueue or not self.inqueue:
512
                return
513
            while not self._quit:
514
                try:
515
                    if self.outqueue.full():
516
                        break
517
                    if self.http_client.free_size() <= 0:
518
                        break
519
                    task = self.inqueue.get_nowait()
520
                    # FIXME: decode unicode_obj should used after data selete from
521
                    # database, it's used here for performance
522
                    task = utils.decode_unicode_obj(task)
523
                    self.fetch(task)
524
                except queue.Empty:
525
                    break
526
                except KeyboardInterrupt:
527
                    break
528
                except Exception as e:
529
                    logger.exception(e)
530
                    break
531
532
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
533
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
534
        self._running = True
535
536
        try:
537
            self.ioloop.start()
538
        except KeyboardInterrupt:
539
            pass
540
541
        logger.info("fetcher exiting...")
542
543
    def quit(self):
544
        '''Quit fetcher'''
545
        self._running = False
546
        self._quit = True
547
        self.ioloop.add_callback(self.ioloop.stop)
548
        if hasattr(self, 'xmlrpc_server'):
549
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
550
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
551
552
    def size(self):
553
        return self.http_client.size()
554
555
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
556
        '''Run xmlrpc server'''
557
        import umsgpack
558
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
559
        try:
560
            from xmlrpc.client import Binary
561
        except ImportError:
562
            from xmlrpclib import Binary
563
564
        application = WSGIXMLRPCApplication()
565
566
        application.register_function(self.quit, '_quit')
567
        application.register_function(self.size)
568
569
        def sync_fetch(task):
570
            result = self.sync_fetch(task)
571
            result = Binary(umsgpack.packb(result))
572
            return result
573
        application.register_function(sync_fetch, 'fetch')
574
575
        def dump_counter(_time, _type):
576
            return self._cnt[_time].to_dict(_type)
577
        application.register_function(dump_counter, 'counter')
578
579
        import tornado.wsgi
580
        import tornado.ioloop
581
        import tornado.httpserver
582
583
        container = tornado.wsgi.WSGIContainer(application)
584
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
585
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
586
        self.xmlrpc_server.listen(port=port, address=bind)
587
        self.xmlrpc_ioloop.start()
588
589
    def on_fetch(self, type, task):
590
        '''Called before task fetch'''
591
        pass
592
593
    def on_result(self, type, task, result):
594
        '''Called after task fetched'''
595
        status_code = result.get('status_code', 599)
596
        if status_code != 599:
597
            status_code = (int(status_code) / 100 * 100)
598
        self._cnt['5m'].event((task.get('project'), status_code), +1)
599
        self._cnt['1h'].event((task.get('project'), status_code), +1)
600
601
        if type in ('http', 'phantomjs') and result.get('time'):
602
            content_len = len(result.get('content', ''))
603
            self._cnt['5m'].event((task.get('project'), 'speed'),
604
                                  float(content_len) / result.get('time'))
605
            self._cnt['1h'].event((task.get('project'), 'speed'),
606
                                  float(content_len) / result.get('time'))
607
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
608
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
609