Completed
Push — master ( 09da17...fe2300 )
by Roy
13s
created

Fetcher.xmlrpc_run()   B

Complexity

Conditions 4

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
c 0
b 0
f 0
dl 0
loc 34
rs 8.5806

1 Method

Rating   Name   Duplication   Size   Complexity  
A Fetcher.dump_counter() 0 2 1
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import os
11
import sys
12
import six
13
import copy
14
import time
15
import json
16
import logging
17
import traceback
18
import functools
19
import threading
20
import tornado.ioloop
21
import tornado.httputil
22
import tornado.httpclient
23
import pyspider
24
25
from six.moves import queue, http_cookies
26
from six.moves.urllib.robotparser import RobotFileParser
27
from requests import cookies
28
from six.moves.urllib.parse import urljoin, urlsplit
29
from tornado import gen
30
from tornado.curl_httpclient import CurlAsyncHTTPClient
31
from tornado.simple_httpclient import SimpleAsyncHTTPClient
32
33
from pyspider.libs import utils, dataurl, counter
34
from pyspider.libs.url import quote_chinese
35
from .cookie_utils import extract_cookies_to_jar
36
logger = logging.getLogger('fetcher')
37
38
39
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
40
41
    def free_size(self):
42
        return len(self._free_list)
43
44
    def size(self):
45
        return len(self._curls) - self.free_size()
46
47
48
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
49
50
    def free_size(self):
51
        return self.max_clients - self.size()
52
53
    def size(self):
54
        return len(self.active)
55
56
fetcher_output = {
57
    "status_code": int,
58
    "orig_url": str,
59
    "url": str,
60
    "headers": dict,
61
    "content": str,
62
    "cookies": dict,
63
}
64
65
66
class Fetcher(object):
67
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
68
    default_options = {
69
        'method': 'GET',
70
        'headers': {
71
        },
72
        'use_gzip': True,
73
        'timeout': 120,
74
        'connect_timeout': 20,
75
    }
76
    phantomjs_proxy = None
77
    splash_endpoint = None
78
    splash_lua_source = open(os.path.join(os.path.dirname(__file__), "splash_fetcher.lua")).read()
79
    robot_txt_age = 60*60  # 1h
80
81
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
82
        self.inqueue = inqueue
83
        self.outqueue = outqueue
84
85
        self.poolsize = poolsize
86
        self._running = False
87
        self._quit = False
88
        self.proxy = proxy
89
        self.async = async
90
        self.ioloop = tornado.ioloop.IOLoop()
91
92
        self.robots_txt_cache = {}
93
94
        # binding io_loop to http_client here
95
        if self.async:
96
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
97
                                                     io_loop=self.ioloop)
98
        else:
99
            self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
100
101
        self._cnt = {
102
            '5m': counter.CounterManager(
103
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
104
            '1h': counter.CounterManager(
105
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
106
        }
107
108
    def send_result(self, type, task, result):
109
        '''Send fetch result to processor'''
110
        if self.outqueue:
111
            try:
112
                self.outqueue.put((task, result))
113
            except Exception as e:
114
                logger.exception(e)
115
116
    def fetch(self, task, callback=None):
117
        if self.async:
118
            return self.async_fetch(task, callback)
119
        else:
120
            return self.async_fetch(task, callback).result()
121
122
    @gen.coroutine
123
    def async_fetch(self, task, callback=None):
124
        '''Do one fetch'''
125
        url = task.get('url', 'data:,')
126
        if callback is None:
127
            callback = self.send_result
128
129
        type = 'None'
130
        start_time = time.time()
131
        try:
132
            if url.startswith('data:'):
133
                type = 'data'
134
                result = yield gen.maybe_future(self.data_fetch(url, task))
135
            elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
136
                type = 'phantomjs'
137
                result = yield self.phantomjs_fetch(url, task)
138
            elif task.get('fetch', {}).get('fetch_type') in ('splash', ):
139
                type = 'splash'
140
                result = yield self.splash_fetch(url, task)
141
            else:
142
                type = 'http'
143
                result = yield self.http_fetch(url, task)
144
        except Exception as e:
145
            logger.exception(e)
146
            result = self.handle_error(type, url, task, start_time, e)
147
148
        callback(type, task, result)
149
        self.on_result(type, task, result)
150
        raise gen.Return(result)
151
152
    def sync_fetch(self, task):
153
        '''Synchronization fetch, usually used in xmlrpc thread'''
154
        if not self._running:
155
            return self.ioloop.run_sync(functools.partial(self.async_fetch, task, lambda t, _, r: True))
156
157
        wait_result = threading.Condition()
158
        _result = {}
159
160
        def callback(type, task, result):
161
            wait_result.acquire()
162
            _result['type'] = type
163
            _result['task'] = task
164
            _result['result'] = result
165
            wait_result.notify()
166
            wait_result.release()
167
168
        wait_result.acquire()
169
        self.ioloop.add_callback(self.fetch, task, callback)
170
        while 'result' not in _result:
171
            wait_result.wait()
172
        wait_result.release()
173
        return _result['result']
174
175
    def data_fetch(self, url, task):
176
        '''A fake fetcher for dataurl'''
177
        self.on_fetch('data', task)
178
        result = {}
179
        result['orig_url'] = url
180
        result['content'] = dataurl.decode(url)
181
        result['headers'] = {}
182
        result['status_code'] = 200
183
        result['url'] = url
184
        result['cookies'] = {}
185
        result['time'] = 0
186
        result['save'] = task.get('fetch', {}).get('save')
187
        if len(result['content']) < 70:
188
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
189
        else:
190
            logger.info(
191
                "[200] %s:%s data:,%s...[content:%d] 0s",
192
                task.get('project'), task.get('taskid'),
193
                result['content'][:70],
194
                len(result['content'])
195
            )
196
197
        return result
198
199
    def handle_error(self, type, url, task, start_time, error):
200
        result = {
201
            'status_code': getattr(error, 'code', 599),
202
            'error': utils.text(error),
203
            'traceback': traceback.format_exc() if sys.exc_info()[0] else None,
204
            'content': "",
205
            'time': time.time() - start_time,
206
            'orig_url': url,
207
            'url': url,
208
            "save": task.get('fetch', {}).get('save')
209
        }
210
        logger.error("[%d] %s:%s %s, %r %.2fs",
211
                     result['status_code'], task.get('project'), task.get('taskid'),
212
                     url, error, result['time'])
213
        return result
214
215
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
216
217
    def pack_tornado_request_parameters(self, url, task):
218
        fetch = copy.deepcopy(self.default_options)
219
        fetch['url'] = url
220
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
221
        fetch['headers']['User-Agent'] = self.user_agent
222
        task_fetch = task.get('fetch', {})
223
        for each in self.allowed_options:
224
            if each in task_fetch:
225
                fetch[each] = task_fetch[each]
226
        fetch['headers'].update(task_fetch.get('headers', {}))
227
228
        if task.get('track'):
229
            track_headers = tornado.httputil.HTTPHeaders(
230
                task.get('track', {}).get('fetch', {}).get('headers') or {})
231
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
232
        else:
233
            track_headers = {}
234
            track_ok = False
235
        # proxy
236
        proxy_string = None
237
        if isinstance(task_fetch.get('proxy'), six.string_types):
238
            proxy_string = task_fetch['proxy']
239
        elif self.proxy and task_fetch.get('proxy', True):
240
            proxy_string = self.proxy
241
        if proxy_string:
242
            if '://' not in proxy_string:
243
                proxy_string = 'http://' + proxy_string
244
            proxy_splited = urlsplit(proxy_string)
245
            fetch['proxy_host'] = proxy_splited.hostname
246
            if proxy_splited.username:
247
                fetch['proxy_username'] = proxy_splited.username
248
            if proxy_splited.password:
249
                fetch['proxy_password'] = proxy_splited.password
250
            if six.PY2:
251
                for key in ('proxy_host', 'proxy_username', 'proxy_password'):
252
                    if key in fetch:
253
                        fetch[key] = fetch[key].encode('utf8')
254
            fetch['proxy_port'] = proxy_splited.port or 8080
255
256
        # etag
257
        if task_fetch.get('etag', True):
258
            _t = None
259
            if isinstance(task_fetch.get('etag'), six.string_types):
260
                _t = task_fetch.get('etag')
261
            elif track_ok:
262
                _t = track_headers.get('etag')
263
            if _t and 'If-None-Match' not in fetch['headers']:
264
                fetch['headers']['If-None-Match'] = _t
265
        # last modifed
266
        if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
267
            last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
268
            _t = None
269
            if isinstance(last_modified, six.string_types):
270
                _t = last_modified
271
            elif track_ok:
272
                _t = track_headers.get('last-modified')
273
            if _t and 'If-Modified-Since' not in fetch['headers']:
274
                fetch['headers']['If-Modified-Since'] = _t
275
        # timeout
276
        if 'timeout' in fetch:
277
            fetch['request_timeout'] = fetch['timeout']
278
            del fetch['timeout']
279
        # data rename to body
280
        if 'data' in fetch:
281
            fetch['body'] = fetch['data']
282
            del fetch['data']
283
284
        return fetch
285
286
    @gen.coroutine
287
    def can_fetch(self, user_agent, url):
288
        parsed = urlsplit(url)
289
        domain = parsed.netloc
290
        if domain in self.robots_txt_cache:
291
            robot_txt = self.robots_txt_cache[domain]
292
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
293
                robot_txt = None
294
        else:
295
            robot_txt = None
296
297
        if robot_txt is None:
298
            robot_txt = RobotFileParser()
299
            try:
300
                response = yield gen.maybe_future(self.http_client.fetch(
301
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
302
                content = response.body
303
            except tornado.httpclient.HTTPError as e:
304
                logger.error('load robots.txt from %s error: %r', domain, e)
305
                content = ''
306
307
            try:
308
                content = content.decode('utf8', 'ignore')
309
            except UnicodeDecodeError:
310
                content = ''
311
312
            robot_txt.parse(content.splitlines())
313
            self.robots_txt_cache[domain] = robot_txt
314
315
        raise gen.Return(robot_txt.can_fetch(user_agent, url))
316
317
    def clear_robot_txt_cache(self):
318
        now = time.time()
319
        for domain, robot_txt in self.robots_txt_cache.items():
320
            if now - robot_txt.mtime() > self.robot_txt_age:
321
                del self.robots_txt_cache[domain]
322
323
    @gen.coroutine
324
    def http_fetch(self, url, task):
325
        '''HTTP fetcher'''
326
        start_time = time.time()
327
        self.on_fetch('http', task)
328
        handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
329
330
        # setup request parameters
331
        fetch = self.pack_tornado_request_parameters(url, task)
332
        task_fetch = task.get('fetch', {})
333
334
        session = cookies.RequestsCookieJar()
335
        # fix for tornado request obj
336
        if 'Cookie' in fetch['headers']:
337
            c = http_cookies.SimpleCookie()
338
            try:
339
                c.load(fetch['headers']['Cookie'])
340
            except AttributeError:
341
                c.load(utils.utf8(fetch['headers']['Cookie']))
342
            for key in c:
343
                session.set(key, c[key])
344
            del fetch['headers']['Cookie']
345
        if 'cookies' in fetch:
346
            session.update(fetch['cookies'])
347
            del fetch['cookies']
348
349
        max_redirects = task_fetch.get('max_redirects', 5)
350
        # we will handle redirects by hand to capture cookies
351
        fetch['follow_redirects'] = False
352
353
        # making requests
354
        while True:
355
            # robots.txt
356
            if task_fetch.get('robots_txt', False):
357
                can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
358
                if not can_fetch:
359
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
360
                    raise gen.Return(handle_error(error))
361
362
            try:
363
                request = tornado.httpclient.HTTPRequest(**fetch)
364
                # if cookie already in header, get_cookie_header wouldn't work
365
                old_cookie_header = request.headers.get('Cookie')
366
                if old_cookie_header:
367
                    del request.headers['Cookie']
368
                cookie_header = cookies.get_cookie_header(session, request)
369
                if cookie_header:
370
                    request.headers['Cookie'] = cookie_header
371
                elif old_cookie_header:
372
                    request.headers['Cookie'] = old_cookie_header
373
            except Exception as e:
374
                logger.exception(fetch)
375
                raise gen.Return(handle_error(e))
376
377
            try:
378
                response = yield gen.maybe_future(self.http_client.fetch(request))
379
            except tornado.httpclient.HTTPError as e:
380
                if e.response:
381
                    response = e.response
382
                else:
383
                    raise gen.Return(handle_error(e))
384
385
            extract_cookies_to_jar(session, response.request, response.headers)
386
            if (response.code in (301, 302, 303, 307)
387
                    and response.headers.get('Location')
388
                    and task_fetch.get('allow_redirects', True)):
389
                if max_redirects <= 0:
390
                    error = tornado.httpclient.HTTPError(
391
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
392
                        response)
393
                    raise gen.Return(handle_error(error))
394
                if response.code in (302, 303):
395
                    fetch['method'] = 'GET'
396
                    if 'body' in fetch:
397
                        del fetch['body']
398
                fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
399
                fetch['request_timeout'] -= time.time() - start_time
400
                if fetch['request_timeout'] < 0:
401
                    fetch['request_timeout'] = 0.1
402
                max_redirects -= 1
403
                continue
404
405
            result = {}
406
            result['orig_url'] = url
407
            result['content'] = response.body or ''
408
            result['headers'] = dict(response.headers)
409
            result['status_code'] = response.code
410
            result['url'] = response.effective_url or url
411
            result['time'] = time.time() - start_time
412
            result['cookies'] = session.get_dict()
413
            result['save'] = task_fetch.get('save')
414
            if response.error:
415
                result['error'] = utils.text(response.error)
416
            if 200 <= response.code < 300:
417
                logger.info("[%d] %s:%s %s %.2fs", response.code,
418
                            task.get('project'), task.get('taskid'),
419
                            url, result['time'])
420
            else:
421
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
422
                               task.get('project'), task.get('taskid'),
423
                               url, result['time'])
424
425
            raise gen.Return(result)
426
427
    @gen.coroutine
428
    def phantomjs_fetch(self, url, task):
429
        '''Fetch with phantomjs proxy'''
430
        start_time = time.time()
431
        self.on_fetch('phantomjs', task)
432
        handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
433
434
        # check phantomjs proxy is enabled
435
        if not self.phantomjs_proxy:
436
            result = {
437
                "orig_url": url,
438
                "content": "phantomjs is not enabled.",
439
                "headers": {},
440
                "status_code": 501,
441
                "url": url,
442
                "time": time.time() - start_time,
443
                "cookies": {},
444
                "save": task.get('fetch', {}).get('save')
445
            }
446
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
447
            raise gen.Return(result)
448
449
        # setup request parameters
450
        fetch = self.pack_tornado_request_parameters(url, task)
451
        task_fetch = task.get('fetch', {})
452
        for each in task_fetch:
453
            if each not in fetch:
454
                fetch[each] = task_fetch[each]
455
456
        # robots.txt
457
        if task_fetch.get('robots_txt', False):
458
            user_agent = fetch['headers']['User-Agent']
459
            can_fetch = yield self.can_fetch(user_agent, url)
460
            if not can_fetch:
461
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
462
                raise gen.Return(handle_error(error))
463
464
        request_conf = {
465
            'follow_redirects': False
466
        }
467
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
468
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
469
470
        session = cookies.RequestsCookieJar()
471
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
472
        if fetch.get('cookies'):
473
            session.update(fetch['cookies'])
474
            if 'Cookie' in request.headers:
475
                del request.headers['Cookie']
476
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
477
478
        # making requests
479
        fetch['headers'] = dict(fetch['headers'])
480
        try:
481
            request = tornado.httpclient.HTTPRequest(
482
                url=self.phantomjs_proxy, method="POST",
483
                body=json.dumps(fetch), **request_conf)
484
        except Exception as e:
485
            raise gen.Return(handle_error(e))
486
487
        try:
488
            response = yield gen.maybe_future(self.http_client.fetch(request))
489
        except tornado.httpclient.HTTPError as e:
490
            if e.response:
491
                response = e.response
492
            else:
493
                raise gen.Return(handle_error(e))
494
495
        if not response.body:
496
            raise gen.Return(handle_error(Exception('no response from phantomjs: %r' % response)))
497
498
        result = {}
499
        try:
500
            result = json.loads(utils.text(response.body))
501
            assert 'status_code' in result, result
502
        except Exception as e:
503
            if response.error:
504
                result['error'] = utils.text(response.error)
505
            raise gen.Return(handle_error(e))
506
507
        if result.get('status_code', 200):
508
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
509
                        task.get('project'), task.get('taskid'), url, result['time'])
510
        else:
511
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
512
                         task.get('project'), task.get('taskid'),
513
                         url, result['content'], result['time'])
514
515
        raise gen.Return(result)
516
517
    @gen.coroutine
518
    def splash_fetch(self, url, task):
519
        '''Fetch with splash'''
520
        start_time = time.time()
521
        self.on_fetch('splash', task)
522
        handle_error = lambda x: self.handle_error('splash', url, task, start_time, x)
523
524
        # check phantomjs proxy is enabled
525
        if not self.splash_endpoint:
526
            result = {
527
                "orig_url": url,
528
                "content": "splash is not enabled.",
529
                "headers": {},
530
                "status_code": 501,
531
                "url": url,
532
                "time": time.time() - start_time,
533
                "cookies": {},
534
                "save": task.get('fetch', {}).get('save')
535
            }
536
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
537
            raise gen.Return(result)
538
539
        # setup request parameters
540
        fetch = self.pack_tornado_request_parameters(url, task)
541
        task_fetch = task.get('fetch', {})
542
        for each in task_fetch:
543
            if each not in fetch:
544
                fetch[each] = task_fetch[each]
545
546
        # robots.txt
547
        if task_fetch.get('robots_txt', False):
548
            user_agent = fetch['headers']['User-Agent']
549
            can_fetch = yield self.can_fetch(user_agent, url)
550
            if not can_fetch:
551
                error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
552
                raise gen.Return(handle_error(error))
553
554
        request_conf = {
555
            'follow_redirects': False,
556
            'headers': {
557
                'Content-Type': 'application/json',
558
            }
559
        }
560
        request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
561
        request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
562
563
        session = cookies.RequestsCookieJar()
564
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
565
        if fetch.get('cookies'):
566
            session.update(fetch['cookies'])
567
            if 'Cookie' in request.headers:
568
                del request.headers['Cookie']
569
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
570
571
        # making requests
572
        fetch['lua_source'] = self.splash_lua_source
573
        fetch['headers'] = dict(fetch['headers'])
574
        try:
575
            request = tornado.httpclient.HTTPRequest(
576
                url=self.splash_endpoint, method="POST",
577
                body=json.dumps(fetch), **request_conf)
578
        except Exception as e:
579
            raise gen.Return(handle_error(e))
580
581
        try:
582
            response = yield gen.maybe_future(self.http_client.fetch(request))
583
        except tornado.httpclient.HTTPError as e:
584
            if e.response:
585
                response = e.response
586
            else:
587
                raise gen.Return(handle_error(e))
588
589
        if not response.body:
590
            raise gen.Return(handle_error(Exception('no response from phantomjs')))
591
592
        result = {}
593
        try:
594
            result = json.loads(utils.text(response.body))
595
            assert 'status_code' in result, result
596
        except ValueError as e:
597
            logger.error("result is not json: %r", response.body[:500])
598
            raise gen.Return(handle_error(e))
599
        except Exception as e:
600
            if response.error:
601
                result['error'] = utils.text(response.error)
602
            raise gen.Return(handle_error(e))
603
604
        if result.get('status_code', 200):
605
            logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
606
                        task.get('project'), task.get('taskid'), url, result['time'])
607
        else:
608
            logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
609
                         task.get('project'), task.get('taskid'),
610
                         url, result['content'], result['time'])
611
612
        raise gen.Return(result)
613
614
    def run(self):
615
        '''Run loop'''
616
        logger.info("fetcher starting...")
617
618
        def queue_loop():
619
            if not self.outqueue or not self.inqueue:
620
                return
621
            while not self._quit:
622
                try:
623
                    if self.outqueue.full():
624
                        break
625
                    if self.http_client.free_size() <= 0:
626
                        break
627
                    task = self.inqueue.get_nowait()
628
                    # FIXME: decode unicode_obj should used after data selete from
629
                    # database, it's used here for performance
630
                    task = utils.decode_unicode_obj(task)
631
                    self.fetch(task)
632
                except queue.Empty:
633
                    break
634
                except KeyboardInterrupt:
635
                    break
636
                except Exception as e:
637
                    logger.exception(e)
638
                    break
639
640
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
641
        tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
642
        self._running = True
643
644
        try:
645
            self.ioloop.start()
646
        except KeyboardInterrupt:
647
            pass
648
649
        logger.info("fetcher exiting...")
650
651
    def quit(self):
652
        '''Quit fetcher'''
653
        self._running = False
654
        self._quit = True
655
        self.ioloop.add_callback(self.ioloop.stop)
656
        if hasattr(self, 'xmlrpc_server'):
657
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
658
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
659
660
    def size(self):
661
        return self.http_client.size()
662
663
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
664
        '''Run xmlrpc server'''
665
        import umsgpack
666
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
667
        try:
668
            from xmlrpc.client import Binary
669
        except ImportError:
670
            from xmlrpclib import Binary
671
672
        application = WSGIXMLRPCApplication()
673
674
        application.register_function(self.quit, '_quit')
675
        application.register_function(self.size)
676
677
        def sync_fetch(task):
678
            result = self.sync_fetch(task)
679
            result = Binary(umsgpack.packb(result))
680
            return result
681
        application.register_function(sync_fetch, 'fetch')
682
683
        def dump_counter(_time, _type):
684
            return self._cnt[_time].to_dict(_type)
685
        application.register_function(dump_counter, 'counter')
686
687
        import tornado.wsgi
688
        import tornado.ioloop
689
        import tornado.httpserver
690
691
        container = tornado.wsgi.WSGIContainer(application)
692
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
693
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
694
        self.xmlrpc_server.listen(port=port, address=bind)
695
        logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
696
        self.xmlrpc_ioloop.start()
697
698
    def on_fetch(self, type, task):
699
        '''Called before task fetch'''
700
        pass
701
702
    def on_result(self, type, task, result):
703
        '''Called after task fetched'''
704
        status_code = result.get('status_code', 599)
705
        if status_code != 599:
706
            status_code = (int(status_code) / 100 * 100)
707
        self._cnt['5m'].event((task.get('project'), status_code), +1)
708
        self._cnt['1h'].event((task.get('project'), status_code), +1)
709
710
        if type in ('http', 'phantomjs') and result.get('time'):
711
            content_len = len(result.get('content', ''))
712
            self._cnt['5m'].event((task.get('project'), 'speed'),
713
                                  float(content_len) / result.get('time'))
714
            self._cnt['1h'].event((task.get('project'), 'speed'),
715
                                  float(content_len) / result.get('time'))
716
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
717
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
718