Completed
Push — master ( 9592b7...7ffc4b )
by Roy
01:04
created

pyspider.fetcher.Fetcher.check_output()   B

Complexity

Conditions 6

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 11
rs 8
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import threading
16
import tornado.ioloop
17
import tornado.httputil
18
import tornado.httpclient
19
import pyspider
20
21
from six.moves import queue, http_cookies
22
from requests import cookies
23
from six.moves.urllib.parse import urljoin, urlsplit
24
from tornado.curl_httpclient import CurlAsyncHTTPClient
25
from tornado.simple_httpclient import SimpleAsyncHTTPClient
26
from pyspider.libs import utils, dataurl, counter
27
from .cookie_utils import extract_cookies_to_jar
28
logger = logging.getLogger('fetcher')
29
30
31
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
32
33
    def free_size(self):
34
        return len(self._free_list)
35
36
    def size(self):
37
        return len(self._curls) - self.free_size()
38
39
40
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
41
42
    def free_size(self):
43
        return self.max_clients - self.size()
44
45
    def size(self):
46
        return len(self.active)
47
48
fetcher_output = {
49
    "status_code": int,
50
    "orig_url": str,
51
    "url": str,
52
    "headers": dict,
53
    "content": str,
54
    "cookies": dict,
55
}
56
57
58
class Fetcher(object):
59
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
60
    default_options = {
61
        'method': 'GET',
62
        'headers': {
63
        },
64
        'use_gzip': True,
65
        'timeout': 120,
66
    }
67
    phantomjs_proxy = None
68
69
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
70
        self.inqueue = inqueue
71
        self.outqueue = outqueue
72
73
        self.poolsize = poolsize
74
        self._running = False
75
        self._quit = False
76
        self.proxy = proxy
77
        self.async = async
78
        self.ioloop = tornado.ioloop.IOLoop()
79
80
        # binding io_loop to http_client here
81
        if self.async:
82
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
83
                                                     io_loop=self.ioloop)
84
        else:
85
            self.http_client = tornado.httpclient.HTTPClient(
86
                MyCurlAsyncHTTPClient, max_clients=self.poolsize
87
            )
88
89
        self._cnt = {
90
            '5m': counter.CounterManager(
91
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
92
            '1h': counter.CounterManager(
93
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
94
        }
95
96
    def send_result(self, type, task, result):
97
        '''Send fetch result to processor'''
98
        if self.outqueue:
99
            try:
100
                self.outqueue.put((task, result))
101
            except Exception as e:
102
                logger.exception(e)
103
104
    def fetch(self, task, callback=None):
105
        '''Do one fetch'''
106
        url = task.get('url', 'data:,')
107
        if callback is None:
108
            callback = self.send_result
109
        if url.startswith('data:'):
110
            return self.data_fetch(url, task, callback)
111
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
112
            return self.phantomjs_fetch(url, task, callback)
113
        else:
114
            return self.http_fetch(url, task, callback)
115
116
    def sync_fetch(self, task):
117
        '''Synchronization fetch'''
118
        wait_result = threading.Condition()
119
        _result = {}
120
121
        def callback(type, task, result):
122
            wait_result.acquire()
123
            _result['type'] = type
124
            _result['task'] = task
125
            _result['result'] = result
126
            wait_result.notify()
127
            wait_result.release()
128
129
        wait_result.acquire()
130
        self.fetch(task, callback=callback)
131
        while 'result' not in _result:
132
            wait_result.wait()
133
        wait_result.release()
134
        return _result['result']
135
136
    def data_fetch(self, url, task, callback):
137
        '''A fake fetcher for dataurl'''
138
        self.on_fetch('data', task)
139
        result = {}
140
        result['orig_url'] = url
141
        result['content'] = dataurl.decode(url)
142
        result['headers'] = {}
143
        result['status_code'] = 200
144
        result['url'] = url
145
        result['cookies'] = {}
146
        result['time'] = 0
147
        result['save'] = task.get('fetch', {}).get('save')
148
        if len(result['content']) < 70:
149
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
150
        else:
151
            logger.info(
152
                "[200] %s:%s data:,%s...[content:%d] 0s",
153
                task.get('project'), task.get('taskid'),
154
                result['content'][:70],
155
                len(result['content'])
156
            )
157
158
        callback('data', task, result)
159
        self.on_result('data', task, result)
160
        return task, result
161
162
    def handle_error(self, type, url, task, start_time, callback, error):
163
        result = {
164
            'status_code': getattr(error, 'code', 599),
165
            'error': utils.text(error),
166
            'content': "",
167
            'time': time.time() - start_time,
168
            'orig_url': url,
169
            'url': url,
170
        }
171
        logger.error("[%d] %s:%s %s, %r %.2fs",
172
                     result['status_code'], task.get('project'), task.get('taskid'),
173
                     url, error, result['time'])
174
        callback(type, task, result)
175
        self.on_result(type, task, result)
176
        return task, result
177
178
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
179
180
    def http_fetch(self, url, task, callback):
181
        '''HTTP fetcher'''
182
        start_time = time.time()
183
184
        self.on_fetch('http', task)
185
        fetch = copy.deepcopy(self.default_options)
186
        fetch['url'] = url
187
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
188
        fetch['headers']['User-Agent'] = self.user_agent
189
        task_fetch = task.get('fetch', {})
190
        for each in self.allowed_options:
191
            if each in task_fetch:
192
                fetch[each] = task_fetch[each]
193
        fetch['headers'].update(task_fetch.get('headers', {}))
194
195
        if task.get('track'):
196
            track_headers = tornado.httputil.HTTPHeaders(
197
                task.get('track', {}).get('fetch', {}).get('headers') or {})
198
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
199
        else:
200
            track_headers = {}
201
            track_ok = False
202
        # proxy
203
        proxy_string = None
204
        if isinstance(task_fetch.get('proxy'), six.string_types):
205
            proxy_string = task_fetch['proxy']
206
        elif self.proxy and task_fetch.get('proxy', True):
207
            proxy_string = self.proxy
208
        if proxy_string:
209
            if '://' not in proxy_string:
210
                proxy_string = 'http://' + proxy_string
211
            proxy_splited = urlsplit(proxy_string)
212
            if proxy_splited.username:
213
                fetch['proxy_username'] = proxy_splited.username
214
                if six.PY2:
215
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
216
            if proxy_splited.password:
217
                fetch['proxy_password'] = proxy_splited.password
218
                if six.PY2:
219
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
220
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
221
            if six.PY2:
222
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
223
            fetch['proxy_port'] = proxy_splited.port or 8080
224
225
        # etag
226
        if task_fetch.get('etag', True):
227
            _t = None
228
            if isinstance(task_fetch.get('etag'), six.string_types):
229
                _t = task_fetch.get('etag')
230
            elif track_ok:
231
                _t = track_headers.get('etag')
232
            if _t and 'If-None-Match' not in fetch['headers']:
233
                fetch['headers']['If-None-Match'] = _t
234
        # last modifed
235
        if task_fetch.get('last_modified', True):
236
            _t = None
237
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
238
                _t = task_fetch.get('last_modifed')
239
            elif track_ok:
240
                _t = track_headers.get('last-modified')
241
            if _t and 'If-Modified-Since' not in fetch['headers']:
242
                fetch['headers']['If-Modified-Since'] = _t
243
244
        session = cookies.RequestsCookieJar()
245
246
        # fix for tornado request obj
247
        if 'Cookie' in fetch['headers']:
248
            c = http_cookies.SimpleCookie()
249
            try:
250
                c.load(fetch['headers']['Cookie'])
251
            except AttributeError:
252
                c.load(utils.utf8(fetch['headers']['Cookie']))
253
            for key in c:
254
                session.set(key, c[key])
255
            del fetch['headers']['Cookie']
256
        fetch['follow_redirects'] = False
257
        if 'timeout' in fetch:
258
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
259
            del fetch['timeout']
260
        if 'data' in fetch:
261
            fetch['body'] = fetch['data']
262
            del fetch['data']
263
        if 'cookies' in fetch:
264
            session.update(fetch['cookies'])
265
            del fetch['cookies']
266
267
        store = {}
268
        store['max_redirects'] = task_fetch.get('max_redirects', 5)
269
270
        def handle_response(response):
271
            extract_cookies_to_jar(session, response.request, response.headers)
272
            if (response.code in (301, 302, 303, 307)
273
                    and response.headers.get('Location')
274
                    and task_fetch.get('allow_redirects', True)):
275
                if store['max_redirects'] <= 0:
276
                    error = tornado.httpclient.HTTPError(
277
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
278
                        response)
279
                    return handle_error(error)
280
                if response.code in (302, 303):
281
                    fetch['method'] = 'GET'
282
                    if 'body' in fetch:
283
                        del fetch['body']
284
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
285
                fetch['request_timeout'] -= time.time() - start_time
286
                if fetch['request_timeout'] < 0:
287
                    fetch['request_timeout'] = 0.1
288
                fetch['connect_timeout'] = fetch['request_timeout']
289
                store['max_redirects'] -= 1
290
                return make_request(fetch)
291
292
            result = {}
293
            result['orig_url'] = url
294
            result['content'] = response.body or ''
295
            result['headers'] = dict(response.headers)
296
            result['status_code'] = response.code
297
            result['url'] = response.effective_url or url
298
            result['cookies'] = session.get_dict()
299
            result['time'] = time.time() - start_time
300
            result['save'] = task_fetch.get('save')
301
            if response.error:
302
                result['error'] = utils.text(response.error)
303
            if 200 <= response.code < 300:
304
                logger.info("[%d] %s:%s %s %.2fs", response.code,
305
                            task.get('project'), task.get('taskid'),
306
                            url, result['time'])
307
            else:
308
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
309
                               task.get('project'), task.get('taskid'),
310
                               url, result['time'])
311
            callback('http', task, result)
312
            self.on_result('http', task, result)
313
            return task, result
314
315
        handle_error = lambda x: self.handle_error('http',
316
                                                   url, task, start_time, callback, x)
317
318
        def make_request(fetch):
319
            try:
320
                request = tornado.httpclient.HTTPRequest(**fetch)
321
                cookie_header = cookies.get_cookie_header(session, request)
322
                if cookie_header:
323
                    request.headers['Cookie'] = cookie_header
324
                if self.async:
325
                    self.http_client.fetch(request, handle_response)
326
                else:
327
                    return handle_response(self.http_client.fetch(request))
328
            except tornado.httpclient.HTTPError as e:
329
                if e.response:
330
                    return handle_response(e.response)
331
                else:
332
                    return handle_error(e)
333
            except Exception as e:
334
                logger.exception(fetch)
335
                return handle_error(e)
336
337
        return make_request(fetch)
338
339
    def phantomjs_fetch(self, url, task, callback):
340
        '''Fetch with phantomjs proxy'''
341
        start_time = time.time()
342
343
        self.on_fetch('phantomjs', task)
344
        if not self.phantomjs_proxy:
345
            result = {
346
                "orig_url": url,
347
                "content": "phantomjs is not enabled.",
348
                "headers": {},
349
                "status_code": 501,
350
                "url": url,
351
                "cookies": {},
352
                "time": 0,
353
                "save": task.get('fetch', {}).get('save')
354
            }
355
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
356
            callback('http', task, result)
357
            self.on_result('http', task, result)
358
            return task, result
359
360
        request_conf = {
361
            'follow_redirects': False
362
        }
363
364
        fetch = copy.deepcopy(self.default_options)
365
        fetch['url'] = url
366
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
367
        fetch['headers']['User-Agent'] = self.user_agent
368
        task_fetch = task.get('fetch', {})
369
        for each in task_fetch:
370
            if each != 'headers':
371
                fetch[each] = task_fetch[each]
372
        fetch['headers'].update(task_fetch.get('headers', {}))
373
374
        if 'timeout' in fetch:
375
            request_conf['connect_timeout'] = fetch['timeout']
376
            request_conf['request_timeout'] = fetch['timeout'] + 1
377
378
        session = cookies.RequestsCookieJar()
379
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
380
        if fetch.get('cookies'):
381
            session.update(fetch['cookies'])
382
            if 'Cookie' in request.headers:
383
                del request.headers['Cookie']
384
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
385
386
        def handle_response(response):
387
            if not response.body:
388
                return handle_error(Exception('no response from phantomjs'))
389
390
            try:
391
                result = json.loads(utils.text(response.body))
392
                if response.error:
393
                    result['error'] = utils.text(response.error)
394
            except Exception as e:
395
                return handle_error(e)
396
397
            if result.get('status_code', 200):
398
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
399
                            task.get('project'), task.get('taskid'), url, result['time'])
400
            else:
401
                logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
402
                             task.get('project'), task.get('taskid'),
403
                             url, result['content'], result['time'])
404
            callback('phantomjs', task, result)
405
            self.on_result('phantomjs', task, result)
406
            return task, result
407
408
        handle_error = lambda x: self.handle_error('phantomjs',
409
                                                   url, task, start_time, callback, x)
410
411
        fetch['headers'] = dict(fetch['headers'])
412
        try:
413
            request = tornado.httpclient.HTTPRequest(
414
                url="%s" % self.phantomjs_proxy, method="POST",
415
                body=json.dumps(fetch), **request_conf)
416
            if self.async:
417
                self.http_client.fetch(request, handle_response)
418
            else:
419
                return handle_response(self.http_client.fetch(request))
420
        except tornado.httpclient.HTTPError as e:
421
            if e.response:
422
                return handle_response(e.response)
423
            else:
424
                return handle_error(e)
425
        except Exception as e:
426
            return handle_error(e)
427
428
    def run(self):
429
        '''Run loop'''
430
        logger.info("fetcher starting...")
431
432
        def queue_loop():
433
            if not self.outqueue or not self.inqueue:
434
                return
435
            while not self._quit:
436
                try:
437
                    if self.outqueue.full():
438
                        break
439
                    if self.http_client.free_size() <= 0:
440
                        break
441
                    task = self.inqueue.get_nowait()
442
                    # FIXME: decode unicode_obj should used after data selete from
443
                    # database, it's used here for performance
444
                    task = utils.decode_unicode_obj(task)
445
                    self.fetch(task)
446
                except queue.Empty:
447
                    break
448
                except KeyboardInterrupt:
449
                    break
450
                except Exception as e:
451
                    logger.exception(e)
452
                    break
453
454
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
455
        self._running = True
456
457
        try:
458
            self.ioloop.start()
459
        except KeyboardInterrupt:
460
            pass
461
462
        logger.info("fetcher exiting...")
463
464
    def quit(self):
465
        '''Quit fetcher'''
466
        self._running = False
467
        self._quit = True
468
        self.ioloop.stop()
469
470
    def size(self):
471
        return self.http_client.size()
472
473
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
474
        '''Run xmlrpc server'''
475
        import umsgpack
476
        try:
477
            from xmlrpc.server import SimpleXMLRPCServer
478
            from xmlrpc.client import Binary
479
        except ImportError:
480
            from SimpleXMLRPCServer import SimpleXMLRPCServer
481
            from xmlrpclib import Binary
482
483
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
484
        server.register_introspection_functions()
485
        server.register_multicall_functions()
486
487
        server.register_function(self.quit, '_quit')
488
        server.register_function(self.size)
489
490
        def sync_fetch(task):
491
            result = self.sync_fetch(task)
492
            result = Binary(umsgpack.packb(result))
493
            return result
494
        server.register_function(sync_fetch, 'fetch')
495
496
        def dump_counter(_time, _type):
497
            return self._cnt[_time].to_dict(_type)
498
        server.register_function(dump_counter, 'counter')
499
500
        server.timeout = 0.5
501
        while not self._quit:
502
            server.handle_request()
503
        server.server_close()
504
505
    def on_fetch(self, type, task):
506
        '''Called before task fetch'''
507
        pass
508
509
    def on_result(self, type, task, result):
510
        '''Called after task fetched'''
511
        status_code = result.get('status_code', 599)
512
        if status_code != 599:
513
            status_code = (int(status_code) / 100 * 100)
514
        self._cnt['5m'].event((task.get('project'), status_code), +1)
515
        self._cnt['1h'].event((task.get('project'), status_code), +1)
516
517
        if type == 'http' and result.get('time'):
518
            content_len = len(result.get('content', ''))
519
            self._cnt['5m'].event((task.get('project'), 'speed'),
520
                                  float(content_len) / result.get('time'))
521
            self._cnt['1h'].event((task.get('project'), 'speed'),
522
                                  float(content_len) / result.get('time'))
523
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
524
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
525