Completed
Push — master ( 91d01a...a7f33c )
by Roy
01:20
created

test_put()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 11
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-12-08 22:23:10
7
8
import time
9
import logging
10
logger = logging.getLogger('bench')
11
12
from six.moves import queue as Queue
13
from pyspider.scheduler import ThreadBaseScheduler as Scheduler
14
from pyspider.fetcher.tornado_fetcher import Fetcher
15
from pyspider.processor import Processor
16
from pyspider.result import ResultWorker
17
from pyspider.libs.utils import md5string
18
19
20
def bench_test_taskdb(taskdb):
21
    project_name = '__bench_test__'
22
    task = {
23
        "fetch": {
24
            "fetch_type": "js",
25
            "headers": {
26
                "User-Agent": "BaiDuSpider"
27
            }
28
        },
29
        "process": {
30
            "callback": "detail_page"
31
        },
32
        "project": project_name,
33
        "taskid": "553300d2582154413b4982c00c34a2d5",
34
        "url": "http://www.sciencedirect.com/science/article/pii/S1674200109000704"
35
    }
36
37
    track = {
38
        "fetch": {
39
            "content": None,
40
            "encoding": "unicode",
41
            "error": None,
42
            "headers": {
43
                "last-modified": "Wed, 04 Mar 2015 09:24:33 GMT"
44
            },
45
            "ok": True,
46
            "redirect_url": None,
47
            "status_code": 200,
48
            "time": 5.543
49
        },
50
        "process": {
51
            "exception": None,
52
            "follows": 4,
53
            "logs": "",
54
            "ok": True,
55
            "result": "{'url': u'",
56
            "time": 0.07105398178100586
57
        }
58
    }
59
60 View Code Duplication
    def test_insert(n, start=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
61
        logger.info("taskdb insert %d", n)
62
        start_time = time.time()
63
        for i in range(n):
64
            task['url'] = 'http://bench.pyspider.org/?l=%d' % (i + start)
65
            task['taskid'] = md5string(task['url'])
66
            task['track'] = {}
67
            taskdb.insert(task['project'], task['taskid'], task)
68
        end_time = time.time()
69
        cost_time = end_time - start_time
70
        logger.info("cost %.2fs, %.2f/s %.2fms",
71
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
72
73 View Code Duplication
    def test_update(n, start=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74
        logger.info("taskdb update %d" % n)
75
        start_time = time.time()
76
        for i in range(n):
77
            task['url'] = 'http://bench.pyspider.org/?l=%d' % (i + start)
78
            task['taskid'] = md5string(task['url'])
79
            task['track'] = track
80
            taskdb.update(task['project'], task['taskid'], task)
81
        end_time = time.time()
82
        cost_time = end_time - start_time
83
        logger.info("cost %.2fs, %.2f/s %.2fms",
84
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
85
86
    request_task_fields = [
87
        'taskid',
88
        'project',
89
        'url',
90
        'status',
91
        'fetch',
92
        'process',
93
        'track',
94
        'lastcrawltime'
95
    ]
96
97
    def test_get(n, start=0, random=True, fields=request_task_fields):
98
        logger.info("taskdb get %d %s" % (n, "randomly" if random else ""))
99
        range_n = list(range(n))
100
        if random:
101
            from random import shuffle
102
            shuffle(range_n)
103
        start_time = time.time()
104
        for i in range_n:
105
            task['url'] = 'http://bench.pyspider.org/?l=%d' % (i + start)
106
            task['taskid'] = md5string(task['url'])
107
            task['track'] = track
108
            taskdb.get_task(task['project'], task['taskid'], fields=fields)
109
        end_time = time.time()
110
        cost_time = end_time - start_time
111
        logger.info("cost %.2fs, %.2f/s %.2fms",
112
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
113
114
    try:
115
        test_insert(1000)
116
        test_update(1000)
117
        test_get(1000)
118
        test_insert(10000, 1000)
119
        test_update(10000, 1000)
120
        test_get(10000, 1000)
121
    except Exception as e:
122
        logger.exception(e)
123
    finally:
124
        taskdb.drop(project_name)
125
126
127
def bench_test_message_queue(queue):
128
    task = {
129
        "fetch": {
130
            "fetch_type": "js",
131
            "headers": {
132
                "User-Agent": "BaiDuSpider"
133
            }
134
        },
135
        "process": {
136
            "callback": "detail_page"
137
        },
138
        "project": "__bench_test__",
139
        "taskid": "553300d2582154413b4982c00c34a2d5",
140
        "url": "http://www.sciencedirect.com/science/article/pii/S1674200109000704"
141
    }
142
143
    def test_put(n):
144
        logger.info("message queue put %d", n)
145
        start_time = time.time()
146
        for i in range(n):
147
            task['url'] = 'http://bench.pyspider.org/?l=%d' % i
148
            task['taskid'] = md5string(task['url'])
149
            queue.put(task, block=True, timeout=1)
150
        end_time = time.time()
151
        cost_time = end_time - start_time
152
        logger.info("cost %.2fs, %.2f/s %.2fms",
153
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
154
155
    def test_get(n):
156
        logger.info("message queue get %d", n)
157
        start_time = time.time()
158
        for i in range(n):
159
            try:
160
                queue.get(True, 1)
161
            except Queue.Empty:
162
                logger.error('message queue empty while get %d', i)
163
                raise
164
        end_time = time.time()
165
        cost_time = end_time - start_time
166
        logger.info("cost %.2fs, %.2f/s %.2fms",
167
                    cost_time, n * 1.0 / cost_time, cost_time / n * 1000)
168
169
    try:
170
        test_put(1000)
171
        test_get(1000)
172
        test_put(10000)
173
        test_get(10000)
174
    except Exception as e:
175
        logger.exception(e)
176
    finally:
177
        if hasattr(queue, 'channel'):
178
            queue.channel.queue_purge(queue.name)
179
180
        # clear message queue
181
        try:
182
            while queue.get(False):
183
                continue
184
        except Queue.Empty:
185
            pass
186
187
188
class BenchMixin(object):
189
    """Report to logger for bench test"""
190
    def _bench_init(self):
191
        self.done_cnt = 0
192
        self.start_time = time.time()
193
        self.last_cnt = 0
194
        self.last_report = 0
195
196
    def _bench_report(self, name, prefix=0, rjust=0):
197
        self.done_cnt += 1
198
        now = time.time()
199
        if now - self.last_report >= 1:
200
            rps = float(self.done_cnt - self.last_cnt) / (now - self.last_report)
201
            output = ''
202
            if prefix:
203
                output += " " * prefix
204
            output += ("%s %s pages (at %d pages/min)" % (
205
                name, self.done_cnt, rps * 60.0)).rjust(rjust)
206
            logger.info(output)
207
            self.last_cnt = self.done_cnt
208
            self.last_report = now
209
210
211
class BenchScheduler(Scheduler, BenchMixin):
212
    def __init__(self, *args, **kwargs):
213
        super(BenchScheduler, self).__init__(*args, **kwargs)
214
        self._bench_init()
215
        self.trigger_on_start('__bench_test__')
216
217
    def on_task_status(self, task):
218
        self._bench_report('Crawled')
219
        return super(BenchScheduler, self).on_task_status(task)
220
221
222
class BenchFetcher(Fetcher, BenchMixin):
223
    def __init__(self, *args, **kwargs):
224
        super(BenchFetcher, self).__init__(*args, **kwargs)
225
        self._bench_init()
226
227
    def on_result(self, type, task, result):
228
        self._bench_report("Fetched", 0, 75)
229
        return super(BenchFetcher, self).on_result(type, task, result)
230
231
232
class BenchProcessor(Processor, BenchMixin):
233
    def __init__(self, *args, **kwargs):
234
        super(BenchProcessor, self).__init__(*args, **kwargs)
235
        self._bench_init()
236
237
    def on_task(self, task, response):
238
        self._bench_report("Processed", 75)
239
        return super(BenchProcessor, self).on_task(task, response)
240
241
242
class BenchResultWorker(ResultWorker, BenchMixin):
243
    def __init__(self, *args, **kwargs):
244
        super(BenchResultWorker, self).__init__(*args, **kwargs)
245
        self._bench_init()
246
247
    def on_result(self, task, result):
248
        self._bench_report("Saved", 0, 150)
249
        super(BenchResultWorker, self).on_result(task, result)
250
251
252
bench_script = '''
253
from pyspider.libs.base_handler import *
254
255
class Handler(BaseHandler):
256
    def on_start(self):
257
        self.crawl('http://127.0.0.1:5000/bench',
258
                   params={'total': %(total)d, 'show': %(show)d},
259
                   callback=self.index_page)
260
261
    def index_page(self, response):
262
        for each in response.doc('a[href^="http://"]').items():
263
            self.crawl(each.attr.href, callback=self.index_page)
264
        return response.url
265
'''
266