Completed
Push — master ( dc161a...9592b7 )
by Roy
07:27 queued 06:22
created

pyspider.fetcher.Fetcher.dump_counter()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-12-17 11:07:19
7
8
from __future__ import unicode_literals
9
10
import six
11
import copy
12
import time
13
import json
14
import logging
15
import threading
16
import tornado.ioloop
17
import tornado.httputil
18
import tornado.httpclient
19
import pyspider
20
21
from six.moves import queue, http_cookies
22
from requests import cookies
23
from six.moves.urllib.parse import urljoin, urlsplit
24
from tornado.curl_httpclient import CurlAsyncHTTPClient
25
from tornado.simple_httpclient import SimpleAsyncHTTPClient
26
from pyspider.libs import utils, dataurl, counter
27
from .cookie_utils import extract_cookies_to_jar
28
logger = logging.getLogger('fetcher')
29
30
try:
31
    from ghost import Ghost, TimeoutError
32
except ImportError:
33
    Ghost = None
34
    TimeoutError = None
35
36
37
class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
38
39
    def free_size(self):
40
        return len(self._free_list)
41
42
    def size(self):
43
        return len(self._curls) - self.free_size()
44
45
46
class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
47
48
    def free_size(self):
49
        return self.max_clients - self.size()
50
51
    def size(self):
52
        return len(self.active)
53
54
fetcher_output = {
55
    "status_code": int,
56
    "orig_url": str,
57
    "url": str,
58
    "headers": dict,
59
    "content": str,
60
    "cookies": dict,
61
}
62
63
64
class Fetcher(object):
65
    user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
66
    default_options = {
67
        'method': 'GET',
68
        'headers': {
69
        },
70
        'use_gzip': True,
71
        'timeout': 120,
72
    }
73
    phantomjs_proxy = None
74
75
    def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async=True):
76
        self.inqueue = inqueue
77
        self.outqueue = outqueue
78
79
        self.poolsize = poolsize
80
        self._running = False
81
        self._quit = False
82
        self.proxy = proxy
83
        self.async = async
84
        self.ioloop = tornado.ioloop.IOLoop()
85
        if Ghost:
86
            self.ghost = Ghost()
87
        else:
88
            self.ghost = None
89
90
        # binding io_loop to http_client here
91
        if self.async:
92
            self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
93
                                                     io_loop=self.ioloop)
94
        else:
95
            self.http_client = tornado.httpclient.HTTPClient(
96
                MyCurlAsyncHTTPClient, max_clients=self.poolsize
97
            )
98
99
        self._cnt = {
100
            '5m': counter.CounterManager(
101
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
102
            '1h': counter.CounterManager(
103
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
104
        }
105
106
    def send_result(self, type, task, result):
107
        '''Send fetch result to processor'''
108
        if self.outqueue:
109
            try:
110
                self.outqueue.put((task, result))
111
            except Exception as e:
112
                logger.exception(e)
113
114
    def fetch(self, task, callback=None):
115
        '''Do one fetch'''
116
        url = task.get('url', 'data:,')
117
        if callback is None:
118
            callback = self.send_result
119
        if url.startswith('data:'):
120
            return self.data_fetch(url, task, callback)
121
        elif task.get('fetch', {}).get('fetch_type') in ('js', 'ghost'):
122
            return self.ghost_fetch(url, task, callback)
123
        elif task.get('fetch', {}).get('fetch_type') in ('phantomjs', ):
124
            return self.phantomjs_fetch(url, task, callback)
125
        else:
126
            return self.http_fetch(url, task, callback)
127
128
    def sync_fetch(self, task):
129
        '''Synchronization fetch'''
130
        wait_result = threading.Condition()
131
        _result = {}
132
133
        def callback(type, task, result):
134
            wait_result.acquire()
135
            _result['type'] = type
136
            _result['task'] = task
137
            _result['result'] = result
138
            wait_result.notify()
139
            wait_result.release()
140
141
        wait_result.acquire()
142
        self.fetch(task, callback=callback)
143
        while 'result' not in _result:
144
            wait_result.wait()
145
        wait_result.release()
146
        return _result['result']
147
148
    def data_fetch(self, url, task, callback):
149
        '''A fake fetcher for dataurl'''
150
        self.on_fetch('data', task)
151
        result = {}
152
        result['orig_url'] = url
153
        result['content'] = dataurl.decode(url)
154
        result['headers'] = {}
155
        result['status_code'] = 200
156
        result['url'] = url
157
        result['cookies'] = {}
158
        result['time'] = 0
159
        result['save'] = task.get('fetch', {}).get('save')
160
        if len(result['content']) < 70:
161
            logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
162
        else:
163
            logger.info(
164
                "[200] %s:%s data:,%s...[content:%d] 0s",
165
                task.get('project'), task.get('taskid'),
166
                result['content'][:70],
167
                len(result['content'])
168
            )
169
170
        callback('data', task, result)
171
        self.on_result('data', task, result)
172
        return task, result
173
174
    def handle_error(self, type, url, task, start_time, callback, error):
175
        result = {
176
            'status_code': getattr(error, 'code', 599),
177
            'error': utils.text(error),
178
            'content': "",
179
            'time': time.time() - start_time,
180
            'orig_url': url,
181
            'url': url,
182
        }
183
        logger.error("[%d] %s:%s %s, %r %.2fs",
184
                     result['status_code'], task.get('project'), task.get('taskid'),
185
                     url, error, result['time'])
186
        callback(type, task, result)
187
        self.on_result(type, task, result)
188
        return task, result
189
190
    allowed_options = ['method', 'data', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
191
192
    def http_fetch(self, url, task, callback):
193
        '''HTTP fetcher'''
194
        start_time = time.time()
195
196
        self.on_fetch('http', task)
197
        fetch = copy.deepcopy(self.default_options)
198
        fetch['url'] = url
199
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
200
        fetch['headers']['User-Agent'] = self.user_agent
201
        task_fetch = task.get('fetch', {})
202
        for each in self.allowed_options:
203
            if each in task_fetch:
204
                fetch[each] = task_fetch[each]
205
        fetch['headers'].update(task_fetch.get('headers', {}))
206
207
        if task.get('track'):
208
            track_headers = tornado.httputil.HTTPHeaders(
209
                task.get('track', {}).get('fetch', {}).get('headers') or {})
210
            track_ok = task.get('track', {}).get('process', {}).get('ok', False)
211
        else:
212
            track_headers = {}
213
            track_ok = False
214
        # proxy
215
        proxy_string = None
216
        if isinstance(task_fetch.get('proxy'), six.string_types):
217
            proxy_string = task_fetch['proxy']
218
        elif self.proxy and task_fetch.get('proxy', True):
219
            proxy_string = self.proxy
220
        if proxy_string:
221
            if '://' not in proxy_string:
222
                proxy_string = 'http://' + proxy_string
223
            proxy_splited = urlsplit(proxy_string)
224
            if proxy_splited.username:
225
                fetch['proxy_username'] = proxy_splited.username
226
                if six.PY2:
227
                    fetch['proxy_username'] = fetch['proxy_username'].encode('utf8')
228
            if proxy_splited.password:
229
                fetch['proxy_password'] = proxy_splited.password
230
                if six.PY2:
231
                    fetch['proxy_password'] = fetch['proxy_password'].encode('utf8')
232
            fetch['proxy_host'] = proxy_splited.hostname.encode('utf8')
233
            if six.PY2:
234
                fetch['proxy_host'] = fetch['proxy_host'].encode('utf8')
235
            fetch['proxy_port'] = proxy_splited.port or 8080
236
237
        # etag
238
        if task_fetch.get('etag', True):
239
            _t = None
240
            if isinstance(task_fetch.get('etag'), six.string_types):
241
                _t = task_fetch.get('etag')
242
            elif track_ok:
243
                _t = track_headers.get('etag')
244
            if _t and 'If-None-Match' not in fetch['headers']:
245
                fetch['headers']['If-None-Match'] = _t
246
        # last modifed
247
        if task_fetch.get('last_modified', True):
248
            _t = None
249
            if isinstance(task_fetch.get('last_modifed'), six.string_types):
250
                _t = task_fetch.get('last_modifed')
251
            elif track_ok:
252
                _t = track_headers.get('last-modified')
253
            if _t and 'If-Modified-Since' not in fetch['headers']:
254
                fetch['headers']['If-Modified-Since'] = _t
255
256
        session = cookies.RequestsCookieJar()
257
258
        # fix for tornado request obj
259
        if 'Cookie' in fetch['headers']:
260
            c = http_cookies.SimpleCookie()
261
            try:
262
                c.load(fetch['headers']['Cookie'])
263
            except AttributeError:
264
                c.load(utils.utf8(fetch['headers']['Cookie']))
265
            for key in c:
266
                session.set(key, c[key])
267
            del fetch['headers']['Cookie']
268
        fetch['follow_redirects'] = False
269
        if 'timeout' in fetch:
270
            fetch['connect_timeout'] = fetch['request_timeout'] = fetch['timeout']
271
            del fetch['timeout']
272
        if 'data' in fetch:
273
            fetch['body'] = fetch['data']
274
            del fetch['data']
275
        if 'cookies' in fetch:
276
            session.update(fetch['cookies'])
277
            del fetch['cookies']
278
279
        store = {}
280
        store['max_redirects'] = task_fetch.get('max_redirects', 5)
281
282
        def handle_response(response):
283
            extract_cookies_to_jar(session, response.request, response.headers)
284
            if (response.code in (301, 302, 303, 307)
285
                    and response.headers.get('Location')
286
                    and task_fetch.get('allow_redirects', True)):
287
                if store['max_redirects'] <= 0:
288
                    error = tornado.httpclient.HTTPError(
289
                        599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
290
                        response)
291
                    return handle_error(error)
292
                if response.code in (302, 303):
293
                    fetch['method'] = 'GET'
294
                    if 'body' in fetch:
295
                        del fetch['body']
296
                fetch['url'] = urljoin(fetch['url'], response.headers['Location'])
297
                fetch['request_timeout'] -= time.time() - start_time
298
                if fetch['request_timeout'] < 0:
299
                    fetch['request_timeout'] = 0.1
300
                fetch['connect_timeout'] = fetch['request_timeout']
301
                store['max_redirects'] -= 1
302
                return make_request(fetch)
303
304
            result = {}
305
            result['orig_url'] = url
306
            result['content'] = response.body or ''
307
            result['headers'] = dict(response.headers)
308
            result['status_code'] = response.code
309
            result['url'] = response.effective_url or url
310
            result['cookies'] = session.get_dict()
311
            result['time'] = time.time() - start_time
312
            result['save'] = task_fetch.get('save')
313
            if response.error:
314
                result['error'] = utils.text(response.error)
315
            if 200 <= response.code < 300:
316
                logger.info("[%d] %s:%s %s %.2fs", response.code,
317
                            task.get('project'), task.get('taskid'),
318
                            url, result['time'])
319
            else:
320
                logger.warning("[%d] %s:%s %s %.2fs", response.code,
321
                               task.get('project'), task.get('taskid'),
322
                               url, result['time'])
323
            callback('http', task, result)
324
            self.on_result('http', task, result)
325
            return task, result
326
327
        handle_error = lambda x: self.handle_error('http',
328
                                                   url, task, start_time, callback, x)
329
330
        def make_request(fetch):
331
            try:
332
                request = tornado.httpclient.HTTPRequest(**fetch)
333
                cookie_header = cookies.get_cookie_header(session, request)
334
                if cookie_header:
335
                    request.headers['Cookie'] = cookie_header
336
                if self.async:
337
                    self.http_client.fetch(request, handle_response)
338
                else:
339
                    return handle_response(self.http_client.fetch(request))
340
            except tornado.httpclient.HTTPError as e:
341
                if e.response:
342
                    return handle_response(e.response)
343
                else:
344
                    return handle_error(e)
345
            except Exception as e:
346
                logger.exception(fetch)
347
                return handle_error(e)
348
349
        return make_request(fetch)
350
351
    def ghost_fetch(self, url, task, callback):
352
        '''Fetch with ghost.py'''
353
        start_time = time.time()
354
355
        self.on_fetch('ghost', task)
356
        if not self.ghost:
357
            result = {
358
                "orig_url": url,
359
                "content": "ghost is not enabled.",
360
                "headers": {},
361
                "status_code": 501,
362
                "url": url,
363
                "cookies": {},
364
                "time": 0,
365
                "save": task.get('fetch', {}).get('save')
366
            }
367
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
368
            callback('http', task, result)
369
            self.on_result('http', task, result)
370
            return task, result
371
372
        fetch = copy.deepcopy(self.default_options)
373
        fetch['url'] = url
374
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
375
        fetch['headers']['User-Agent'] = self.user_agent
376
        task_fetch = task.get('fetch', {})
377
        for each in task_fetch:
378
            if each != 'headers':
379
                fetch[each] = task_fetch[each]
380
        fetch['headers'].update(task_fetch.get('headers', {}))
381
382
        ghost_config = {
383
            'user_agent': fetch['headers']['User-Agent'],
384
            'viewport_size': (fetch.get('js_viewport_height', 768*3), fetch.get('js_viewport_width', 1024)),
385
            'wait_timeout': 0,
386
            'display': False,
387
            'ignore_ssl_errors': True,
388
            'download_images': fetch.get('load_images', False),
389
        }
390
391
        def handle_response(session):
392
            page = get_page_from_session(session)
393
            if not page:
394
                return handle_error('Unable to load requested page')
395
396
            result = {
397
                'orig_url': url,
398
                'status_code': page.http_status,
399
                'error': None,
400
                'content': session.content,
401
                'headers': page.headers,
402
                'url': page.url,
403
                'cookies': session.cookies,
404
                'time': time.time() - start_time,
405
                'js_script_result': session.js_script_result,
406
                'save': task_fetch.get('save'),
407
            }
408
            session.exit()
409
410
            if 200 <= result['status_code'] < 300:
411
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
412
                            task.get('project'), task.get('taskid'),
413
                            url, result['time'])
414
            else:
415
                logger.warning("[%d] %s:%s %s %.2fs", result['status_code'],
416
                               task.get('project'), task.get('taskid'),
417
                               url, result['time'])
418
            callback('ghost', task, result)
419
            self.on_result('ghost', task, result)
420
            return task, result
421
422
        handle_error = lambda x: self.handle_error('ghost', url, task, start_time, callback, x)
423
424
        def check_output(session):
425
            if time.time() - start_time > fetch.get('timeout', 120) or session.loaded:
426
                if fetch.get('js_script', None) and fetch.get('js_run_at', 'document-end') != 'document-start' \
427
                        and not getattr(session, 'js_run', False):
428
                    session.js_script_result, resources = session.evaluate(fetch.get('js_script', None))
429
                    session.http_resources = resources
430
                    session.js_run = True
431
                    self.ioloop.call_later(1, check_output, session)
432
                    return
433
                return handle_response(session)
434
            self.ioloop.call_later(1, check_output, session)
435
436
        def get_page_from_session(session):
437
            resources = session.http_resources
438
439
            url = self.main_frame.url().toString()
440
            url_without_hash = url.split("#")[0]
441
442
            for resource in resources:
443
                if url == resource.url or url_without_hash == resource.url:
444
                    return resource
445
446
        session = self.ghost.start(**ghost_config)
447
448
        try:
449
            # proxy
450
            proxy_string = None
451
            if isinstance(task_fetch.get('proxy'), six.string_types):
452
                proxy_string = task_fetch['proxy']
453
            elif self.proxy and task_fetch.get('proxy', True):
454
                proxy_string = self.proxy
455
            if proxy_string:
456
                if '://' not in proxy_string:
457
                    proxy_string = 'http://' + proxy_string
458
                proxy_splited = urlsplit(proxy_string)
459
                session.set_proxy(proxy_splited.schema, host=proxy_splited.hostname, port=(proxy_splited.port or 8080),
460
                                  user=proxy_splited.username, password=proxy_splited.password)
461
462
            session.js_script_result = None
463
            session.open(fetch['url'], method=fetch['method'], headers=dict(fetch['headers']),
464
                         body=fetch.get('data', None), wait=False, user_agent=fetch['headers']['User-Agent'])
465
466
            # document-start
467
            if fetch.get('js_script', None) and fetch.get('js_run_at', 'document-end') == 'document-start':
468
                session.js_script_result, resources = session.evaluate(fetch.get('js_script', None))
469
                session.js_run = True
470
471
            if self.async:
472
                check_output(session)
473
            else:
474
                session.wait_for(lambda: session.loaded, 'Unable to load requested page', fetch.get('timeout', 120))
475
                if fetch.get('js_script', None) and fetch.get('js_run_at', 'document-end') != 'document-start':
476
                    session.js_script_result, resources = session.evaluate(fetch.get('js_script', None))
477
                    session.http_resources = resources
478
                    session.js_run = True
479
                time.sleep(1)
480
                session.wait_for(lambda: session.loaded, 'Unable to load requested page',
481
                                 fetch.get('timeout', 120) - (time.time() - start_time))
482
                return handle_response(session)
483
        except TimeoutError:
484
            return handle_response(session)
485
        except Exception as e:
486
            session.exit()
487
            return handle_error(e)
488
489
    def phantomjs_fetch(self, url, task, callback):
490
        '''Fetch with phantomjs proxy'''
491
        start_time = time.time()
492
493
        self.on_fetch('phantomjs', task)
494
        if not self.phantomjs_proxy:
495
            result = {
496
                "orig_url": url,
497
                "content": "phantomjs is not enabled.",
498
                "headers": {},
499
                "status_code": 501,
500
                "url": url,
501
                "cookies": {},
502
                "time": 0,
503
                "save": task.get('fetch', {}).get('save')
504
            }
505
            logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
506
            callback('http', task, result)
507
            self.on_result('http', task, result)
508
            return task, result
509
510
        request_conf = {
511
            'follow_redirects': False
512
        }
513
514
        fetch = copy.deepcopy(self.default_options)
515
        fetch['url'] = url
516
        fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
517
        fetch['headers']['User-Agent'] = self.user_agent
518
        task_fetch = task.get('fetch', {})
519
        for each in task_fetch:
520
            if each != 'headers':
521
                fetch[each] = task_fetch[each]
522
        fetch['headers'].update(task_fetch.get('headers', {}))
523
524
        if 'timeout' in fetch:
525
            request_conf['connect_timeout'] = fetch['timeout']
526
            request_conf['request_timeout'] = fetch['timeout'] + 1
527
528
        session = cookies.RequestsCookieJar()
529
        request = tornado.httpclient.HTTPRequest(url=fetch['url'])
530
        if fetch.get('cookies'):
531
            session.update(fetch['cookies'])
532
            if 'Cookie' in request.headers:
533
                del request.headers['Cookie']
534
            fetch['headers']['Cookie'] = cookies.get_cookie_header(session, request)
535
536
        def handle_response(response):
537
            if not response.body:
538
                return handle_error(Exception('no response from phantomjs'))
539
540
            try:
541
                result = json.loads(utils.text(response.body))
542
                if response.error:
543
                    result['error'] = utils.text(response.error)
544
            except Exception as e:
545
                return handle_error(e)
546
547
            if result.get('status_code', 200):
548
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
549
                            task.get('project'), task.get('taskid'), url, result['time'])
550
            else:
551
                logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
552
                             task.get('project'), task.get('taskid'),
553
                             url, result['content'], result['time'])
554
            callback('phantomjs', task, result)
555
            self.on_result('phantomjs', task, result)
556
            return task, result
557
558
        handle_error = lambda x: self.handle_error('phantomjs',
559
                                                   url, task, start_time, callback, x)
560
561
        fetch['headers'] = dict(fetch['headers'])
562
        try:
563
            request = tornado.httpclient.HTTPRequest(
564
                url="%s" % self.phantomjs_proxy, method="POST",
565
                body=json.dumps(fetch), **request_conf)
566
            if self.async:
567
                self.http_client.fetch(request, handle_response)
568
            else:
569
                return handle_response(self.http_client.fetch(request))
570
        except tornado.httpclient.HTTPError as e:
571
            if e.response:
572
                return handle_response(e.response)
573
            else:
574
                return handle_error(e)
575
        except Exception as e:
576
            return handle_error(e)
577
578
    def run(self):
579
        '''Run loop'''
580
        logger.info("fetcher starting...")
581
582
        def queue_loop():
583
            if not self.outqueue or not self.inqueue:
584
                return
585
            while not self._quit:
586
                try:
587
                    if self.outqueue.full():
588
                        break
589
                    if self.http_client.free_size() <= 0:
590
                        break
591
                    task = self.inqueue.get_nowait()
592
                    # FIXME: decode unicode_obj should used after data selete from
593
                    # database, it's used here for performance
594
                    task = utils.decode_unicode_obj(task)
595
                    self.fetch(task)
596
                except queue.Empty:
597
                    break
598
                except KeyboardInterrupt:
599
                    break
600
                except Exception as e:
601
                    logger.exception(e)
602
                    break
603
604
        tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
605
        self._running = True
606
607
        try:
608
            self.ioloop.start()
609
        except KeyboardInterrupt:
610
            pass
611
612
        logger.info("fetcher exiting...")
613
614
    def quit(self):
615
        '''Quit fetcher'''
616
        self._running = False
617
        self._quit = True
618
        self.ioloop.stop()
619
620
    def size(self):
621
        return self.http_client.size()
622
623
    def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
624
        '''Run xmlrpc server'''
625
        import umsgpack
626
        try:
627
            from xmlrpc.server import SimpleXMLRPCServer
628
            from xmlrpc.client import Binary
629
        except ImportError:
630
            from SimpleXMLRPCServer import SimpleXMLRPCServer
631
            from xmlrpclib import Binary
632
633
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
634
        server.register_introspection_functions()
635
        server.register_multicall_functions()
636
637
        server.register_function(self.quit, '_quit')
638
        server.register_function(self.size)
639
640
        def sync_fetch(task):
641
            result = self.sync_fetch(task)
642
            result = Binary(umsgpack.packb(result))
643
            return result
644
        server.register_function(sync_fetch, 'fetch')
645
646
        def dump_counter(_time, _type):
647
            return self._cnt[_time].to_dict(_type)
648
        server.register_function(dump_counter, 'counter')
649
650
        server.timeout = 0.5
651
        while not self._quit:
652
            server.handle_request()
653
        server.server_close()
654
655
    def on_fetch(self, type, task):
656
        '''Called before task fetch'''
657
        pass
658
659
    def on_result(self, type, task, result):
660
        '''Called after task fetched'''
661
        status_code = result.get('status_code', 599)
662
        if status_code != 599:
663
            status_code = (int(status_code) / 100 * 100)
664
        self._cnt['5m'].event((task.get('project'), status_code), +1)
665
        self._cnt['1h'].event((task.get('project'), status_code), +1)
666
667
        if type == 'http' and result.get('time'):
668
            content_len = len(result.get('content', ''))
669
            self._cnt['5m'].event((task.get('project'), 'speed'),
670
                                  float(content_len) / result.get('time'))
671
            self._cnt['1h'].event((task.get('project'), 'speed'),
672
                                  float(content_len) / result.get('time'))
673
            self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
674
            self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
675