BenchResultWorker   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 8
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 10
c 0
b 0
f 0
wmc 2

2 Methods

Rating   Name   Duplication   Size   Complexity  
A on_result() 0 3 1
A __init__() 0 3 1
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-12-08 22:23:10
7
# rate: 10000000000
8
# burst: 10000000000
9
10
import time
11
import logging
12
logger = logging.getLogger('bench')
13
14
from six.moves import queue as Queue
15
from pyspider.scheduler import ThreadBaseScheduler as Scheduler
16
from pyspider.fetcher.tornado_fetcher import Fetcher
17
from pyspider.processor import Processor
18
from pyspider.result import ResultWorker
19
from pyspider.libs.utils import md5string
20
21
22
def bench_test_taskdb(taskdb):
23
    project_name = '__bench_test__'
24
    task = {
25
        "fetch": {
26
            "fetch_type": "js",
27
            "headers": {
28
                "User-Agent": "BaiDuSpider"
29
            }
30
        },
31
        "process": {
32
            "callback": "detail_page"
33
        },
34
        "project": project_name,
35
        "taskid": "553300d2582154413b4982c00c34a2d5",
36
        "url": "http://www.sciencedirect.com/science/article/pii/S1674200109000704"
37
    }
38
39
    track = {
40
        "fetch": {
41
            "content": None,
42
            "encoding": "unicode",
43
            "error": None,
44
            "headers": {
45
                "last-modified": "Wed, 04 Mar 2015 09:24:33 GMT"
46
            },
47
            "ok": True,
48
            "redirect_url": None,
49
            "status_code": 200,
50
            "time": 5.543
51
        },
52
        "process": {
53
            "exception": None,
54
            "follows": 4,
55
            "logs": "",
56
            "ok": True,
57
            "result": "{'url': u'",
58
            "time": 0.07105398178100586
59
        }
60
    }
61
62 View Code Duplication
    def test_insert(n, start=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
63
        logger.info("taskdb insert %d", n)
64
        start_time = time.time()
65
        for i in range(n):
66
            task['url'] = 'http://bench.pyspider.org/?l=%d' % (i + start)
67
            task['taskid'] = md5string(task['url'])
68
            task['track'] = {}
69
            taskdb.insert(task['project'], task['taskid'], task)
70
        end_time = time.time()
71
        cost_time = end_time - start_time
72
        logger.info("cost %.2fs, %.2f/s %.2fms",
73
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
74
75 View Code Duplication
    def test_update(n, start=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
76
        logger.info("taskdb update %d" % n)
77
        start_time = time.time()
78
        for i in range(n):
79
            task['url'] = 'http://bench.pyspider.org/?l=%d' % (i + start)
80
            task['taskid'] = md5string(task['url'])
81
            task['track'] = track
82
            taskdb.update(task['project'], task['taskid'], task)
83
        end_time = time.time()
84
        cost_time = end_time - start_time
85
        logger.info("cost %.2fs, %.2f/s %.2fms",
86
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
87
88
    request_task_fields = [
89
        'taskid',
90
        'project',
91
        'url',
92
        'status',
93
        'fetch',
94
        'process',
95
        'track',
96
        'lastcrawltime'
97
    ]
98
99
    def test_get(n, start=0, random=True, fields=request_task_fields):
100
        logger.info("taskdb get %d %s" % (n, "randomly" if random else ""))
101
        range_n = list(range(n))
102
        if random:
103
            from random import shuffle
104
            shuffle(range_n)
105
        start_time = time.time()
106
        for i in range_n:
107
            task['url'] = 'http://bench.pyspider.org/?l=%d' % (i + start)
108
            task['taskid'] = md5string(task['url'])
109
            task['track'] = track
110
            taskdb.get_task(task['project'], task['taskid'], fields=fields)
111
        end_time = time.time()
112
        cost_time = end_time - start_time
113
        logger.info("cost %.2fs, %.2f/s %.2fms",
114
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
115
116
    try:
117
        test_insert(1000)
118
        test_update(1000)
119
        test_get(1000)
120
        test_insert(10000, 1000)
121
        test_update(10000, 1000)
122
        test_get(10000, 1000)
123
    except Exception as e:
124
        logger.exception(e)
125
    finally:
126
        taskdb.drop(project_name)
127
128
129
def bench_test_message_queue(queue):
130
    task = {
131
        "fetch": {
132
            "fetch_type": "js",
133
            "headers": {
134
                "User-Agent": "BaiDuSpider"
135
            }
136
        },
137
        "process": {
138
            "callback": "detail_page"
139
        },
140
        "project": "__bench_test__",
141
        "taskid": "553300d2582154413b4982c00c34a2d5",
142
        "url": "http://www.sciencedirect.com/science/article/pii/S1674200109000704"
143
    }
144
145
    def test_put(n):
146
        logger.info("message queue put %d", n)
147
        start_time = time.time()
148
        for i in range(n):
149
            task['url'] = 'http://bench.pyspider.org/?l=%d' % i
150
            task['taskid'] = md5string(task['url'])
151
            queue.put(task, block=True, timeout=1)
152
        end_time = time.time()
153
        cost_time = end_time - start_time
154
        logger.info("cost %.2fs, %.2f/s %.2fms",
155
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
156
157
    def test_get(n):
158
        logger.info("message queue get %d", n)
159
        start_time = time.time()
160
        for i in range(n):
161
            try:
162
                queue.get(True, 1)
163
            except Queue.Empty:
164
                logger.error('message queue empty while get %d', i)
165
                raise
166
        end_time = time.time()
167
        cost_time = end_time - start_time
168
        logger.info("cost %.2fs, %.2f/s %.2fms",
169
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
170
171
    try:
172
        test_put(1000)
173
        test_get(1000)
174
        test_put(10000)
175
        test_get(10000)
176
    except Exception as e:
177
        logger.exception(e)
178
    finally:
179
        if hasattr(queue, 'channel'):
180
            queue.channel.queue_purge(queue.name)
181
182
        # clear message queue
183
        try:
184
            while queue.get(False):
185
                continue
186
        except Queue.Empty:
187
            pass
188
189
190
class BenchMixin(object):
191
    """Report to logger for bench test"""
192
    def _bench_init(self):
193
        self.done_cnt = 0
194
        self.start_time = time.time()
195
        self.last_cnt = 0
196
        self.last_report = 0
197
198
    def _bench_report(self, name, prefix=0, rjust=0):
199
        self.done_cnt += 1
200
        now = time.time()
201
        if now - self.last_report >= 1:
202
            rps = float(self.done_cnt - self.last_cnt) / (now - self.last_report)
203
            output = ''
204
            if prefix:
205
                output += " " * prefix
206
            output += ("%s %s pages (at %d pages/min)" % (
207
                name, self.done_cnt, rps * 60.0)).rjust(rjust)
208
            logger.info(output)
209
            self.last_cnt = self.done_cnt
210
            self.last_report = now
211
212
213
class BenchScheduler(Scheduler, BenchMixin):
214
    def __init__(self, *args, **kwargs):
215
        super(BenchScheduler, self).__init__(*args, **kwargs)
216
        self._bench_init()
217
218
    def on_task_status(self, task):
219
        self._bench_report('Crawled')
220
        return super(BenchScheduler, self).on_task_status(task)
221
222
223
class BenchFetcher(Fetcher, BenchMixin):
224
    def __init__(self, *args, **kwargs):
225
        super(BenchFetcher, self).__init__(*args, **kwargs)
226
        self._bench_init()
227
228
    def on_result(self, type, task, result):
229
        self._bench_report("Fetched", 0, 75)
230
        return super(BenchFetcher, self).on_result(type, task, result)
231
232
233
class BenchProcessor(Processor, BenchMixin):
234
    def __init__(self, *args, **kwargs):
235
        super(BenchProcessor, self).__init__(*args, **kwargs)
236
        self._bench_init()
237
238
    def on_task(self, task, response):
239
        self._bench_report("Processed", 75)
240
        return super(BenchProcessor, self).on_task(task, response)
241
242
243
class BenchResultWorker(ResultWorker, BenchMixin):
244
    def __init__(self, *args, **kwargs):
245
        super(BenchResultWorker, self).__init__(*args, **kwargs)
246
        self._bench_init()
247
248
    def on_result(self, task, result):
249
        self._bench_report("Saved", 0, 150)
250
        super(BenchResultWorker, self).on_result(task, result)
251
252
253
from pyspider.libs.base_handler import BaseHandler
254
255
256
class Handler(BaseHandler):
257
    def on_start(self, response):
258
        self.crawl('http://127.0.0.1:5000/bench',
259
                   params={'total': response.save.get('total', 10000), 'show': response.save.get('show', 20)},
260
                   callback=self.index_page)
261
262
    def index_page(self, response):
263
        for each in response.doc('a[href^="http://"]').items():
264
            self.crawl(each.attr.href, callback=self.index_page)
265
        return response.url
266