Completed
Push — master ( 0eebd6...a019cb )
by Roy
01:01
created

pyspider.fetcher.Fetcher.make_request()   B

Complexity

Conditions 6

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 18
rs 8
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import functools
16
import threading
17
import tornado.ioloop
18
import tornado.httputil
19
import tornado.httpclient
20
import pyspider
21
22
from six.moves import queue, http_cookies
23
from requests import cookies
24
from six.moves.urllib.parse import urljoin, urlsplit
25
from tornado import gen
26
from tornado.curl_httpclient import CurlAsyncHTTPClient
27
from tornado.simple_httpclient import SimpleAsyncHTTPClient
28
from pyspider.libs import utils, dataurl, counter
29
from .cookie_utils import extract_cookies_to_jar
30
logger = logging.getLogger('fetcher')
31
32
33
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
34
35
    def free_size(self):
36
        return len(self._free_list)
37
38
    def size(self):
39
        return len(self._curls) - self.free_size()
40
41
42
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
43
44
    def free_size(self):
45
        return self.max_clients - self.size()
46
47
    def size(self):
48
        return len(self.active)
49
50
fetcher_output = {
51
    "status_code": int,
52
    "orig_url": str,
53
    "url": str,
54
    "headers": dict,
55
    "content": str,
56
    "cookies": dict,
57
}
58
59
60
class Fetcher(object):
61
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
62
    default_options = {
63
        'method': 'GET',
64
        'headers': {
65
        },
66
        'use_gzip': True,
67
        'timeout': 120,
68
    }
69
    phantomjs_proxy = None
70
71
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
72
        self.inqueue = inqueue
73
        self.outqueue = outqueue
74
75
        self.poolsize = poolsize
76
        self._running = False
77
        self._quit = False
78
        self.proxy = proxy
79
        self.async = async
80
        self.ioloop = tornado.ioloop.IOLoop()
81
82
        # binding io_loop to http_client here
83
        self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
84
                                                 io_loop=self.ioloop)
85
86
        self._cnt = {
87
            '5m': counter.CounterManager(
88
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
89
            '1h': counter.CounterManager(
90
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
91
        }
92
93
    def send_result(self, type, task, result):
94
        '''Send fetch result to processor'''
95
        if self.outqueue:
96
            try:
97
                self.outqueue.put((task, result))
98
            except Exception as e:
99
                logger.exception(e)
100
101
    def fetch(self, task, callback=None):
102
        if self.async:
103
            return self.async_fetch(task, callback)
104
        else:
105
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, callback))
106
107
    def async_fetch(self, task, callback=None):
108
        '''Do one fetch'''
109
        url = task.get('url', 'data:,')
110
        if callback is None:
111
            callback = self.send_result
112
        if url.startswith('data:'):
113
            return gen.maybe_future(self.data_fetch(url, task, callback))
114
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
115
            return gen.maybe_future(self.phantomjs_fetch(url, task, callback))
116
        else:
117
            return gen.maybe_future(self.http_fetch(url, task, callback))
118
119
    def sync_fetch(self, task):
120
        '''Synchronization fetch, usually used in xmlrpc thread'''
121
        if not self._running:
122
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
123
124
        wait_result = threading.Condition()
125
        _result = {}
126
127
        def callback(type, task, result):
128
            wait_result.acquire()
129
            _result['type'] = type
130
            _result['task'] = task
131
            _result['result'] = result
132
            wait_result.notify()
133
            wait_result.release()
134
135
        wait_result.acquire()
136
        self.fetch(task, callback=callback)
137
        while 'result' not in _result:
138
            wait_result.wait()
139
        wait_result.release()
140
        return _result['result']
141
142
    def data_fetch(self, url, task, callback):
143
        '''A fake fetcher for dataurl'''
144
        self.on_fetch('data', task)
145
        result = {}
146
        result['orig_url'] = url
147
        result['content'] = dataurl.decode(url)
148
        result['headers'] = {}
149
        result['status_code'] = 200
150
        result['url'] = url
151
        result['cookies'] = {}
152
        result['time'] = 0
153
        result['save'] = task.get('fetch', {}).get('save')
154
        if len(result['content']) < 70:
155
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
156
        else:
157
            logger.info(
158
                "[200] %s:%s data:,%s...[content:%d] 0s",
159
                task.get('project'), task.get('taskid'),
160
                result['content'][:70],
161
                len(result['content'])
162
            )
163
164
        callback('data', task, result)
165
        self.on_result('data', task, result)
166
        return task, result
167
168
    def handle_error(self, type, url, task, start_time, callback, error):
169
        result = {
170
            'status_code': getattr(error, 'code', 599),
171
            'error': utils.text(error),
172
            'content': "",
173
            'time': time.time() - start_time,
174
            'orig_url': url,
175
            'url': url,
176
        }
177
        logger.error("[%d] %s:%s %s, %r %.2fs",
178
                     result['status_code'], task.get('project'), task.get('taskid'),
179
                     url, error, result['time'])
180
        callback(type, task, result)
181
        self.on_result(type, task, result)
182
        return task, result
183
184
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
185
186
    def pack_tornado_request_parameters(self, url, task):
187
        fetch = copy.deepcopy(self.default_options)
188
        fetch['url'] = url
189
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
190
        fetch['headers']['User-Agent'] = self.user_agent
191
        task_fetch = task.get('fetch', {})
192
        for each in self.allowed_options:
193
            if each in task_fetch:
194
                fetch[each] = task_fetch[each]
195
        fetch['headers'].update(task_fetch.get('headers', {}))
196
197
        if task.get('track'):
198
            track_headers = tornado.httputil.HTTPHeaders(
199
                task.get('track', {}).get('fetch', {}).get('headers') or {})
200
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
201
        else:
202
            track_headers = {}
203
            track_ok = False
204
        # proxy
205
        proxy_string = None
206
        if isinstance(task_fetch.get('proxy'), six.string_types):
207
            proxy_string = task_fetch['proxy']
208
        elif self.proxy and task_fetch.get('proxy', True):
209
            proxy_string = self.proxy
210
        if proxy_string:
211
            if '://' not in proxy_string:
212
                proxy_string = 'http://' + proxy_string
213
            proxy_splited = urlsplit(proxy_string)
214
            if proxy_splited.username:
215
                fetch['proxy_username'] = proxy_splited.username
216
                if six.PY2:
217
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
218
            if proxy_splited.password:
219
                fetch['proxy_password'] = proxy_splited.password
220
                if six.PY2:
221
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
222
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
223
            if six.PY2:
224
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
225
            fetch['proxy_port'] = proxy_splited.port or 8080
226
227
        # etag
228
        if task_fetch.get('etag', True):
229
            _t = None
230
            if isinstance(task_fetch.get('etag'), six.string_types):
231
                _t = task_fetch.get('etag')
232
            elif track_ok:
233
                _t = track_headers.get('etag')
234
            if _t and 'If-None-Match' not in fetch['headers']:
235
                fetch['headers']['If-None-Match'] = _t
236
        # last modifed
237
        if task_fetch.get('last_modified', True):
238
            _t = None
239
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
240
                _t = task_fetch.get('last_modifed')
241
            elif track_ok:
242
                _t = track_headers.get('last-modified')
243
            if _t and 'If-Modified-Since' not in fetch['headers']:
244
                fetch['headers']['If-Modified-Since'] = _t
245
        # timeout
246
        if 'timeout' in fetch:
247
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
248
            del fetch['timeout']
249
        # data rename to body
250
        if 'data' in fetch:
251
            fetch['body'] = fetch['data']
252
            del fetch['data']
253
254
        return fetch
255
256
    @gen.coroutine
257
    def http_fetch(self, url, task, callback):
258
        '''HTTP fetcher'''
259
        start_time = time.time()
260
261
        self.on_fetch('http', task)
262
263
        # setup request parameters
264
        fetch = self.pack_tornado_request_parameters(url, task)
265
        task_fetch = task.get('fetch', {})
266
267
        session = cookies.RequestsCookieJar()
268
        # fix for tornado request obj
269
        if 'Cookie' in fetch['headers']:
270
            c = http_cookies.SimpleCookie()
271
            try:
272
                c.load(fetch['headers']['Cookie'])
273
            except AttributeError:
274
                c.load(utils.utf8(fetch['headers']['Cookie']))
275
            for key in c:
276
                session.set(key, c[key])
277
            del fetch['headers']['Cookie']
278
        if 'cookies' in fetch:
279
            session.update(fetch['cookies'])
280
            del fetch['cookies']
281
282
        max_redirects = task_fetch.get('max_redirects', 5)
283
        # we will handle redirects by hand to capture cookies
284
        fetch['follow_redirects'] = False
285
286
        handle_error = lambda x: self.handle_error('http', url, task, start_time, callback, x)
287
288
        # making requests
289
        while True:
290
            try:
291
                request = tornado.httpclient.HTTPRequest(**fetch)
292
                cookie_header = cookies.get_cookie_header(session, request)
293
                if cookie_header:
294
                    request.headers['Cookie'] = cookie_header
295
            except Exception as e:
296
                logger.exception(fetch)
297
                raise gen.Return(handle_error(e))
298
299
            try:
300
                response = yield self.http_client.fetch(request)
301
            except tornado.httpclient.HTTPError as e:
302
                if e.response:
303
                    response = e.response
304
                else:
305
                    raise gen.Return(handle_error(e))
306
307
            extract_cookies_to_jar(session, response.request, response.headers)
308
            if (response.code in (301, 302, 303, 307)
309
                    and response.headers.get('Location')
310
                    and task_fetch.get('allow_redirects', True)):
311
                if max_redirects <= 0:
312
                    error = tornado.httpclient.HTTPError(
313
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
314
                        response)
315
                    raise gen.Return(handle_error(error))
316
                if response.code in (302, 303):
317
                    fetch['method'] = 'GET'
318
                    if 'body' in fetch:
319
                        del fetch['body']
320
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
321
                fetch['request_timeout'] -= time.time() - start_time
322
                if fetch['request_timeout'] < 0:
323
                    fetch['request_timeout'] = 0.1
324
                fetch['connect_timeout'] = fetch['request_timeout']
325
                max_redirects -= 1
326
                continue
327
328
            result = {}
329
            result['orig_url'] = url
330
            result['content'] = response.body or ''
331
            result['headers'] = dict(response.headers)
332
            result['status_code'] = response.code
333
            result['url'] = response.effective_url or url
334
            result['cookies'] = session.get_dict()
335
            result['time'] = time.time() - start_time
336
            result['save'] = task_fetch.get('save')
337
            if response.error:
338
                result['error'] = utils.text(response.error)
339
            if 200 <= response.code < 300:
340
                logger.info("[%d] %s:%s %s %.2fs", response.code,
341
                            task.get('project'), task.get('taskid'),
342
                            url, result['time'])
343
            else:
344
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
345
                               task.get('project'), task.get('taskid'),
346
                               url, result['time'])
347
348
            callback('http', task, result)
349
            self.on_result('http', task, result)
350
            raise gen.Return((task, result))
351
352
    @gen.coroutine
353
    def phantomjs_fetch(self, url, task, callback):
354
        '''Fetch with phantomjs proxy'''
355
        start_time = time.time()
356
357
        self.on_fetch('phantomjs', task)
358
359
        # check phantomjs proxy is enabled
360
        if not self.phantomjs_proxy:
361
            result = {
362
                "orig_url": url,
363
                "content": "phantomjs is not enabled.",
364
                "headers": {},
365
                "status_code": 501,
366
                "url": url,
367
                "cookies": {},
368
                "time": 0,
369
                "save": task.get('fetch', {}).get('save')
370
            }
371
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
372
            callback('http', task, result)
373
            self.on_result('http', task, result)
374
            raise gen.Return((task, result))
375
376
        # setup request parameters
377
        fetch = self.pack_tornado_request_parameters(url, task)
378
        task_fetch = task.get('fetch', {})
379
        for each in task_fetch:
380
            if each not in fetch:
381
                fetch[each] = task_fetch[each]
382
383
        request_conf = {
384
            'follow_redirects': False
385
        }
386
        if 'timeout' in task_fetch:
387
            request_conf['connect_timeout'] = task_fetch['timeout']
388
            request_conf['request_timeout'] = task_fetch['timeout'] + 1
389
390
        session = cookies.RequestsCookieJar()
391
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
392
        if fetch.get('cookies'):
393
            session.update(fetch['cookies'])
394
            if 'Cookie' in request.headers:
395
                del request.headers['Cookie']
396
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
397
398
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, callback, x)
399
400
        # making requests
401
        try:
402
            request = tornado.httpclient.HTTPRequest(
403
                url="%s" % self.phantomjs_proxy, method="POST",
404
                body=json.dumps(fetch), **request_conf)
405
        except Exception as e:
406
            raise gen.Return(handle_error(e))
407
408
        try:
409
            response = yield self.http_client.fetch(request)
410
        except tornado.httpclient.HTTPError as e:
411
            if e.response:
412
                response = e.response
413
414
        if not response.body:
415
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
416
417
        try:
418
            result = json.loads(utils.text(response.body))
419
            if response.error:
420
                result['error'] = utils.text(response.error)
421
        except Exception as e:
422
            raise gen.Return(handle_error(e))
423
424
        if result.get('status_code', 200):
425
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
426
                        task.get('project'), task.get('taskid'), url, result['time'])
427
        else:
428
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
429
                         task.get('project'), task.get('taskid'),
430
                         url, result['content'], result['time'])
431
432
        callback('phantomjs', task, result)
433
        self.on_result('phantomjs', task, result)
434
        raise gen.Return((task, result))
435
436
    def run(self):
437
        '''Run loop'''
438
        logger.info("fetcher starting...")
439
440
        def queue_loop():
441
            if not self.outqueue or not self.inqueue:
442
                return
443
            while not self._quit:
444
                try:
445
                    if self.outqueue.full():
446
                        break
447
                    if self.http_client.free_size() <= 0:
448
                        break
449
                    task = self.inqueue.get_nowait()
450
                    # FIXME: decode unicode_obj should used after data selete from
451
                    # database, it's used here for performance
452
                    task = utils.decode_unicode_obj(task)
453
                    self.fetch(task)
454
                except queue.Empty:
455
                    break
456
                except KeyboardInterrupt:
457
                    break
458
                except Exception as e:
459
                    logger.exception(e)
460
                    break
461
462
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
463
        self._running = True
464
465
        try:
466
            self.ioloop.start()
467
        except KeyboardInterrupt:
468
            pass
469
470
        logger.info("fetcher exiting...")
471
472
    def quit(self):
473
        '''Quit fetcher'''
474
        self._running = False
475
        self._quit = True
476
        self.ioloop.stop()
477
478
    def size(self):
479
        return self.http_client.size()
480
481
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
482
        '''Run xmlrpc server'''
483
        import umsgpack
484
        try:
485
            from xmlrpc.server import SimpleXMLRPCServer
486
            from xmlrpc.client import Binary
487
        except ImportError:
488
            from SimpleXMLRPCServer import SimpleXMLRPCServer
489
            from xmlrpclib import Binary
490
491
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
492
        server.register_introspection_functions()
493
        server.register_multicall_functions()
494
495
        server.register_function(self.quit, '_quit')
496
        server.register_function(self.size)
497
498
        def sync_fetch(task):
499
            result = self.sync_fetch(task)
500
            result = Binary(umsgpack.packb(result))
501
            return result
502
        server.register_function(sync_fetch, 'fetch')
503
504
        def dump_counter(_time, _type):
505
            return self._cnt[_time].to_dict(_type)
506
        server.register_function(dump_counter, 'counter')
507
508
        server.timeout = 0.5
509
        while not self._quit:
510
            server.handle_request()
511
        server.server_close()
512
513
    def on_fetch(self, type, task):
514
        '''Called before task fetch'''
515
        pass
516
517
    def on_result(self, type, task, result):
518
        '''Called after task fetched'''
519
        status_code = result.get('status_code', 599)
520
        if status_code != 599:
521
            status_code = (int(status_code) / 100 * 100)
522
        self._cnt['5m'].event((task.get('project'), status_code), +1)
523
        self._cnt['1h'].event((task.get('project'), status_code), +1)
524
525
        if type == 'http' and result.get('time'):
526
            content_len = len(result.get('content', ''))
527
            self._cnt['5m'].event((task.get('project'), 'speed'),
528
                                  float(content_len) / result.get('time'))
529
            self._cnt['1h'].event((task.get('project'), 'speed'),
530
                                  float(content_len) / result.get('time'))
531
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
532
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
533