TestRun.test_10_cli()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-11-21 22:32:35
7
8
from __future__ import print_function
9
10
import os
11
import sys
12
import six
13
import time
14
import json
15
import signal
16
import shutil
17
import inspect
18
import requests
19
import unittest2 as unittest
20
21
from pyspider import run
22
from pyspider.libs import utils
23
from tests import data_sample_handler
24
25
class TestRun(unittest.TestCase):
26
27
    @classmethod
28
    def setUpClass(self):
29
        shutil.rmtree('./data/tests', ignore_errors=True)
30
        os.makedirs('./data/tests')
31
32
        import tests.data_test_webpage
33
        import httpbin
34
        self.httpbin_thread = utils.run_in_subprocess(httpbin.app.run, port=14887, passthrough_errors=False)
35
        self.httpbin = 'http://127.0.0.1:14887'
36
37
    @classmethod
38
    def tearDownClass(self):
39
        self.httpbin_thread.terminate()
40
        self.httpbin_thread.join()
41
42
        assert not utils.check_port_open(5000)
43
        assert not utils.check_port_open(23333)
44
        assert not utils.check_port_open(24444)
45
        assert not utils.check_port_open(25555)
46
        assert not utils.check_port_open(14887)
47
48
        shutil.rmtree('./data/tests', ignore_errors=True)
49
50
    def test_10_cli(self):
51
        ctx = run.cli.make_context('test', [], None, obj=dict(testing_mode=True))
52
        ctx = run.cli.invoke(ctx)
53
        self.assertEqual(ctx.obj.debug, False)
54
        for db in ('taskdb', 'projectdb', 'resultdb'):
55
            self.assertIsNotNone(getattr(ctx.obj, db))
56
        for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
57
                     'fetcher2processor', 'processor2result'):
58
            self.assertIsNotNone(getattr(ctx.obj, name))
59
        self.assertEqual(len(ctx.obj.instances), 0)
60
61
    def test_20_cli_config(self):
62
        with open('./data/tests/config.json', 'w') as fp:
63
            json.dump({
64
                'debug': True,
65
                'taskdb': 'mysql+taskdb://localhost:23456/taskdb',
66
                'amqp-url': 'amqp://guest:guest@localhost:23456/%%2F'
67
            }, fp)
68
        ctx = run.cli.make_context('test',
69
                                   ['--config', './data/tests/config.json'],
70
                                   None, obj=dict(testing_mode=True))
71
        ctx = run.cli.invoke(ctx)
72
        self.assertEqual(ctx.obj.debug, True)
73
74
        import mysql.connector
75
        with self.assertRaises(mysql.connector.Error):
76
            ctx.obj.taskdb
77
78
        with self.assertRaises(Exception):
79
            ctx.obj.newtask_queue
80
81
    def test_30_cli_command_line(self):
82
        ctx = run.cli.make_context(
83
            'test',
84
            ['--projectdb', 'mongodb+projectdb://localhost:23456/projectdb'],
85
            None,
86
            obj=dict(testing_mode=True)
87
        )
88
        ctx = run.cli.invoke(ctx)
89
90
        from pymongo.errors import ConnectionFailure
91
        with self.assertRaises(ConnectionFailure):
92
            ctx.obj.projectdb
93
94
    def test_40_cli_env(self):
95
        try:
96
            os.environ['RESULTDB'] = 'sqlite+resultdb://'
97
            ctx = run.cli.make_context('test', [], None,
98
                                       obj=dict(testing_mode=True))
99
            ctx = run.cli.invoke(ctx)
100
101
            from pyspider.database.sqlite import resultdb
102
            self.assertIsInstance(ctx.obj.resultdb, resultdb.ResultDB)
103
        finally:
104
            del os.environ['RESULTDB']
105
106 View Code Duplication
    @unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
107
    def test_50_docker_rabbitmq(self):
108
        try:
109
            os.environ['RABBITMQ_NAME'] = 'rabbitmq'
110
            os.environ['RABBITMQ_PORT_5672_TCP_ADDR'] = 'localhost'
111
            os.environ['RABBITMQ_PORT_5672_TCP_PORT'] = '5672'
112
            ctx = run.cli.make_context('test', [], None,
113
                                       obj=dict(testing_mode=True))
114
            ctx = run.cli.invoke(ctx)
115
            queue = ctx.obj.newtask_queue
116
            queue.put('abc')
117
            queue.delete()
118
        except Exception as e:
119
            self.assertIsNone(e)
120
        finally:
121
            del os.environ['RABBITMQ_NAME']
122
            del os.environ['RABBITMQ_PORT_5672_TCP_ADDR']
123
            del os.environ['RABBITMQ_PORT_5672_TCP_PORT']
124
125 View Code Duplication
    @unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no mongodb server for test.')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
126
    def test_60_docker_mongodb(self):
127
        try:
128
            os.environ['MONGODB_NAME'] = 'mongodb'
129
            os.environ['MONGODB_PORT_27017_TCP_ADDR'] = 'localhost'
130
            os.environ['MONGODB_PORT_27017_TCP_PORT'] = '27017'
131
            ctx = run.cli.make_context('test', [], None,
132
                                       obj=dict(testing_mode=True))
133
            ctx = run.cli.invoke(ctx)
134
            ctx.obj.resultdb
135
        except Exception as e:
136
            self.assertIsNone(e)
137
        finally:
138
            del os.environ['MONGODB_NAME']
139
            del os.environ['MONGODB_PORT_27017_TCP_ADDR']
140
            del os.environ['MONGODB_PORT_27017_TCP_PORT']
141
142 View Code Duplication
    @unittest.skip('noly available in docker')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
    @unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.')
144
    def test_70_docker_mysql(self):
145
        try:
146
            os.environ['MYSQL_NAME'] = 'mysql'
147
            os.environ['MYSQL_PORT_3306_TCP_ADDR'] = 'localhost'
148
            os.environ['MYSQL_PORT_3306_TCP_PORT'] = '3306'
149
            ctx = run.cli.make_context('test', [], None,
150
                                       obj=dict(testing_mode=True))
151
            ctx = run.cli.invoke(ctx)
152
            ctx.obj.resultdb
153
        except Exception as e:
154
            self.assertIsNone(e)
155
        finally:
156
            del os.environ['MYSQL_NAME']
157
            del os.environ['MYSQL_PORT_3306_TCP_ADDR']
158
            del os.environ['MYSQL_PORT_3306_TCP_PORT']
159
160
    def test_80_docker_phantomjs(self):
161
        try:
162
            os.environ['PHANTOMJS_NAME'] = 'phantomjs'
163
            os.environ['PHANTOMJS_PORT_25555_TCP'] = 'tpc://binux:25678'
164
            ctx = run.cli.make_context('test', [], None,
165
                                       obj=dict(testing_mode=True))
166
            ctx = run.cli.invoke(ctx)
167
            self.assertEqual(ctx.obj.phantomjs_proxy, 'binux:25678')
168
        except Exception as e:
169
            self.assertIsNone(e)
170
        finally:
171
            del os.environ['PHANTOMJS_NAME']
172
            del os.environ['PHANTOMJS_PORT_25555_TCP']
173
174
    def test_90_docker_scheduler(self):
175
        try:
176
            os.environ['SCHEDULER_NAME'] = 'scheduler'
177
            os.environ['SCHEDULER_PORT_23333_TCP'] = 'tpc://binux:25678'
178
            ctx = run.cli.make_context('test', [], None,
179
                                       obj=dict(testing_mode=True))
180
            ctx = run.cli.invoke(ctx)
181
            webui = run.cli.get_command(ctx, 'webui')
182
            webui_ctx = webui.make_context('webui', [], ctx)
183
            app = webui.invoke(webui_ctx)
184
            rpc = app.config['scheduler_rpc']
185
            self.assertEqual(rpc._ServerProxy__host, 'binux:25678')
186
        except Exception as e:
187
            self.assertIsNone(e)
188
        finally:
189
            del os.environ['SCHEDULER_NAME']
190
            del os.environ['SCHEDULER_PORT_23333_TCP']
191
192
    def test_a100_all(self):
193
        import subprocess
194
        #cmd = [sys.executable]
195
        cmd = ['coverage', 'run']
196
        p = subprocess.Popen(cmd+[
197
            inspect.getsourcefile(run),
198
            '--taskdb', 'sqlite+taskdb:///data/tests/all_test_task.db',
199
            '--resultdb', 'sqlite+resultdb:///data/tests/all_test_result.db',
200
            '--projectdb', 'local+projectdb://'+inspect.getsourcefile(data_sample_handler),
201
            'all',
202
        ], close_fds=True, preexec_fn=os.setsid)
203
204
205
        try:
206
            limit = 30
207
            while limit >= 0:
208
                time.sleep(3)
209
                # click run
210
                try:
211
                    requests.post('http://localhost:5000/run', data={
212
                        'project': 'data_sample_handler',
213
                    })
214
                except requests.exceptions.ConnectionError:
215
                    limit -= 1
216
                    continue
217
                break
218
219
            limit = 30
220
            data = requests.get('http://localhost:5000/counter')
221
            self.assertEqual(data.status_code, 200)
222
            while data.json().get('data_sample_handler', {}).get('5m', {}).get('success', 0) < 5:
223
                time.sleep(1)
224
                data = requests.get('http://localhost:5000/counter')
225
                limit -= 1
226
                if limit <= 0:
227
                    break
228
229
            self.assertGreater(limit, 0)
230
            rv = requests.get('http://localhost:5000/results?project=data_sample_handler')
231
            self.assertIn('<th>url</th>', rv.text)
232
            self.assertIn('class=url', rv.text)
233
        except:
234
            raise
235
        finally:
236
            time.sleep(1)
237
            os.killpg(p.pid, signal.SIGTERM)
238
            p.wait()
239
240
    def test_a110_one(self):
241
        pid, fd = os.forkpty()
242
        #cmd = [sys.executable]
243
        cmd = ['coverage', 'run']
244
        cmd += [
245
            inspect.getsourcefile(run),
246
            'one',
247
            '-i',
248
            inspect.getsourcefile(data_sample_handler)
249
        ]
250
251
        if pid == 0:
252
            # child
253
            os.execvp(cmd[0], cmd)
254
        else:
255
            # parent
256
            def wait_text(timeout=1):
257
                import select
258
                text = []
259
                while True:
260
                    rl, wl, xl = select.select([fd], [], [], timeout)
261
                    if not rl:
262
                        break
263
                    try:
264
                        t = os.read(fd, 1024)
265
                    except OSError:
266
                        break
267
                    if not t:
268
                        break
269
                    t = utils.text(t)
270
                    text.append(t)
271
                    print(t, end='')
272
                return ''.join(text)
273
274
            text = wait_text(3)
275
            self.assertIn('new task data_sample_handler:on_start', text)
276
            self.assertIn('pyspider shell', text)
277
278
            os.write(fd, utils.utf8('run()\n'))
279
            text = wait_text()
280
            self.assertIn('task done data_sample_handler:on_start', text)
281
282
            os.write(fd, utils.utf8('crawl("%s/pyspider/test.html")\n' % self.httpbin))
283
            text = wait_text()
284
            self.assertIn('/robots.txt', text)
285
286
            os.write(fd, utils.utf8('crawl("%s/links/10/0")\n' % self.httpbin))
287
            text = wait_text()
288
            if '"title": "Links"' not in text:
289
                os.write(fd, utils.utf8('crawl("%s/links/10/1")\n' % self.httpbin))
290
                text = wait_text()
291
                self.assertIn('"title": "Links"', text)
292
293
            os.write(fd, utils.utf8('crawl("%s/404")\n' % self.httpbin))
294
            text = wait_text()
295
            self.assertIn('task retry', text)
296
297
            os.write(fd, b'quit_pyspider()\n')
298
            text = wait_text()
299
            self.assertIn('scheduler exiting...', text)
300
            os.close(fd)
301
            os.kill(pid, signal.SIGINT)
302
303
class TestSendMessage(unittest.TestCase):
304
305
    @classmethod
306
    def setUpClass(self):
307
        shutil.rmtree('./data/tests', ignore_errors=True)
308
        os.makedirs('./data/tests')
309
310
        ctx = run.cli.make_context('test', [
311
            '--taskdb', 'sqlite+taskdb:///data/tests/task.db',
312
            '--projectdb', 'sqlite+projectdb:///data/tests/projectdb.db',
313
            '--resultdb', 'sqlite+resultdb:///data/tests/resultdb.db',
314
        ], None, obj=dict(testing_mode=True))
315
        self.ctx = run.cli.invoke(ctx)
316
317
        ctx = run.scheduler.make_context('scheduler', [], self.ctx)
318
        scheduler = run.scheduler.invoke(ctx)
319
        self.xmlrpc_thread = utils.run_in_thread(scheduler.xmlrpc_run)
320
        self.scheduler_thread = utils.run_in_thread(scheduler.run)
321
322
        time.sleep(1)
323
324 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
325
    def tearDownClass(self):
326
        for each in self.ctx.obj.instances:
327
            each.quit()
328
        self.xmlrpc_thread.join()
329
        self.scheduler_thread.join()
330
        time.sleep(1)
331
332
        assert not utils.check_port_open(5000)
333
        assert not utils.check_port_open(23333)
334
        assert not utils.check_port_open(24444)
335
        assert not utils.check_port_open(25555)
336
337
        shutil.rmtree('./data/tests', ignore_errors=True)
338
339
    def test_10_send_message(self):
340
        ctx = run.send_message.make_context('send_message', [
341
            'test_project', 'test_message'
342
        ], self.ctx)
343
        self.assertTrue(run.send_message.invoke(ctx))
344
        while True:
345
            task = self.ctx.obj.scheduler2fetcher.get(timeout=1)
346
            if task['url'] == 'data:,on_message':
347
                break
348
        self.assertEqual(task['process']['callback'], '_on_message')
349
350