Total Complexity | 41 |
Total Lines | 277 |
Duplicated Lines | 18.41 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like TestRun often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
25 | class TestRun(unittest.TestCase): |
||
26 | |||
27 | @classmethod |
||
28 | def setUpClass(self): |
||
29 | shutil.rmtree('./data/tests', ignore_errors=True) |
||
30 | os.makedirs('./data/tests') |
||
31 | |||
32 | import tests.data_test_webpage |
||
33 | import httpbin |
||
34 | self.httpbin_thread = utils.run_in_subprocess(httpbin.app.run, port=14887, passthrough_errors=False) |
||
35 | self.httpbin = 'http://127.0.0.1:14887' |
||
36 | |||
37 | @classmethod |
||
38 | def tearDownClass(self): |
||
39 | self.httpbin_thread.terminate() |
||
40 | self.httpbin_thread.join() |
||
41 | |||
42 | assert not utils.check_port_open(5000) |
||
43 | assert not utils.check_port_open(23333) |
||
44 | assert not utils.check_port_open(24444) |
||
45 | assert not utils.check_port_open(25555) |
||
46 | assert not utils.check_port_open(14887) |
||
47 | |||
48 | shutil.rmtree('./data/tests', ignore_errors=True) |
||
49 | |||
50 | def test_10_cli(self): |
||
51 | ctx = run.cli.make_context('test', [], None, obj=dict(testing_mode=True)) |
||
52 | ctx = run.cli.invoke(ctx) |
||
53 | self.assertEqual(ctx.obj.debug, False) |
||
54 | for db in ('taskdb', 'projectdb', 'resultdb'): |
||
55 | self.assertIsNotNone(getattr(ctx.obj, db)) |
||
56 | for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher', |
||
57 | 'fetcher2processor', 'processor2result'): |
||
58 | self.assertIsNotNone(getattr(ctx.obj, name)) |
||
59 | self.assertEqual(len(ctx.obj.instances), 0) |
||
60 | |||
61 | def test_20_cli_config(self): |
||
62 | with open('./data/tests/config.json', 'w') as fp: |
||
63 | json.dump({ |
||
64 | 'debug': True, |
||
65 | 'taskdb': 'mysql+taskdb://localhost:23456/taskdb', |
||
66 | 'amqp-url': 'amqp://guest:guest@localhost:23456/%%2F' |
||
67 | }, fp) |
||
68 | ctx = run.cli.make_context('test', |
||
69 | ['--config', './data/tests/config.json'], |
||
70 | None, obj=dict(testing_mode=True)) |
||
71 | ctx = run.cli.invoke(ctx) |
||
72 | self.assertEqual(ctx.obj.debug, True) |
||
73 | |||
74 | import mysql.connector |
||
75 | with self.assertRaises(mysql.connector.Error): |
||
76 | ctx.obj.taskdb |
||
77 | |||
78 | with self.assertRaises(Exception): |
||
79 | ctx.obj.newtask_queue |
||
80 | |||
81 | def test_30_cli_command_line(self): |
||
82 | ctx = run.cli.make_context( |
||
83 | 'test', |
||
84 | ['--projectdb', 'mongodb+projectdb://localhost:23456/projectdb'], |
||
85 | None, |
||
86 | obj=dict(testing_mode=True) |
||
87 | ) |
||
88 | ctx = run.cli.invoke(ctx) |
||
89 | |||
90 | from pymongo.errors import ConnectionFailure |
||
91 | with self.assertRaises(ConnectionFailure): |
||
92 | ctx.obj.projectdb |
||
93 | |||
94 | def test_40_cli_env(self): |
||
95 | try: |
||
96 | os.environ['RESULTDB'] = 'sqlite+resultdb://' |
||
97 | ctx = run.cli.make_context('test', [], None, |
||
98 | obj=dict(testing_mode=True)) |
||
99 | ctx = run.cli.invoke(ctx) |
||
100 | |||
101 | from pyspider.database.sqlite import resultdb |
||
102 | self.assertIsInstance(ctx.obj.resultdb, resultdb.ResultDB) |
||
103 | finally: |
||
104 | del os.environ['RESULTDB'] |
||
105 | |||
106 | View Code Duplication | @unittest.skipIf(os.environ.get('IGNORE_RABBITMQ') or os.environ.get('IGNORE_ALL'), 'no rabbitmq server for test.') |
|
|
|||
107 | def test_50_docker_rabbitmq(self): |
||
108 | try: |
||
109 | os.environ['RABBITMQ_NAME'] = 'rabbitmq' |
||
110 | os.environ['RABBITMQ_PORT_5672_TCP_ADDR'] = 'localhost' |
||
111 | os.environ['RABBITMQ_PORT_5672_TCP_PORT'] = '5672' |
||
112 | ctx = run.cli.make_context('test', [], None, |
||
113 | obj=dict(testing_mode=True)) |
||
114 | ctx = run.cli.invoke(ctx) |
||
115 | queue = ctx.obj.newtask_queue |
||
116 | queue.put('abc') |
||
117 | queue.delete() |
||
118 | except Exception as e: |
||
119 | self.assertIsNone(e) |
||
120 | finally: |
||
121 | del os.environ['RABBITMQ_NAME'] |
||
122 | del os.environ['RABBITMQ_PORT_5672_TCP_ADDR'] |
||
123 | del os.environ['RABBITMQ_PORT_5672_TCP_PORT'] |
||
124 | |||
125 | View Code Duplication | @unittest.skipIf(os.environ.get('IGNORE_MONGODB') or os.environ.get('IGNORE_ALL'), 'no mongodb server for test.') |
|
126 | def test_60_docker_mongodb(self): |
||
127 | try: |
||
128 | os.environ['MONGODB_NAME'] = 'mongodb' |
||
129 | os.environ['MONGODB_PORT_27017_TCP_ADDR'] = 'localhost' |
||
130 | os.environ['MONGODB_PORT_27017_TCP_PORT'] = '27017' |
||
131 | ctx = run.cli.make_context('test', [], None, |
||
132 | obj=dict(testing_mode=True)) |
||
133 | ctx = run.cli.invoke(ctx) |
||
134 | ctx.obj.resultdb |
||
135 | except Exception as e: |
||
136 | self.assertIsNone(e) |
||
137 | finally: |
||
138 | del os.environ['MONGODB_NAME'] |
||
139 | del os.environ['MONGODB_PORT_27017_TCP_ADDR'] |
||
140 | del os.environ['MONGODB_PORT_27017_TCP_PORT'] |
||
141 | |||
142 | View Code Duplication | @unittest.skip('noly available in docker') |
|
143 | @unittest.skipIf(os.environ.get('IGNORE_MYSQL') or os.environ.get('IGNORE_ALL'), 'no mysql server for test.') |
||
144 | def test_70_docker_mysql(self): |
||
145 | try: |
||
146 | os.environ['MYSQL_NAME'] = 'mysql' |
||
147 | os.environ['MYSQL_PORT_3306_TCP_ADDR'] = 'localhost' |
||
148 | os.environ['MYSQL_PORT_3306_TCP_PORT'] = '3306' |
||
149 | ctx = run.cli.make_context('test', [], None, |
||
150 | obj=dict(testing_mode=True)) |
||
151 | ctx = run.cli.invoke(ctx) |
||
152 | ctx.obj.resultdb |
||
153 | except Exception as e: |
||
154 | self.assertIsNone(e) |
||
155 | finally: |
||
156 | del os.environ['MYSQL_NAME'] |
||
157 | del os.environ['MYSQL_PORT_3306_TCP_ADDR'] |
||
158 | del os.environ['MYSQL_PORT_3306_TCP_PORT'] |
||
159 | |||
160 | def test_80_docker_phantomjs(self): |
||
161 | try: |
||
162 | os.environ['PHANTOMJS_NAME'] = 'phantomjs' |
||
163 | os.environ['PHANTOMJS_PORT_25555_TCP'] = 'tpc://binux:25678' |
||
164 | ctx = run.cli.make_context('test', [], None, |
||
165 | obj=dict(testing_mode=True)) |
||
166 | ctx = run.cli.invoke(ctx) |
||
167 | self.assertEqual(ctx.obj.phantomjs_proxy, 'binux:25678') |
||
168 | except Exception as e: |
||
169 | self.assertIsNone(e) |
||
170 | finally: |
||
171 | del os.environ['PHANTOMJS_NAME'] |
||
172 | del os.environ['PHANTOMJS_PORT_25555_TCP'] |
||
173 | |||
174 | def test_90_docker_scheduler(self): |
||
175 | try: |
||
176 | os.environ['SCHEDULER_NAME'] = 'scheduler' |
||
177 | os.environ['SCHEDULER_PORT_23333_TCP'] = 'tpc://binux:25678' |
||
178 | ctx = run.cli.make_context('test', [], None, |
||
179 | obj=dict(testing_mode=True)) |
||
180 | ctx = run.cli.invoke(ctx) |
||
181 | webui = run.cli.get_command(ctx, 'webui') |
||
182 | webui_ctx = webui.make_context('webui', [], ctx) |
||
183 | app = webui.invoke(webui_ctx) |
||
184 | rpc = app.config['scheduler_rpc'] |
||
185 | self.assertEqual(rpc._ServerProxy__host, 'binux:25678') |
||
186 | except Exception as e: |
||
187 | self.assertIsNone(e) |
||
188 | finally: |
||
189 | del os.environ['SCHEDULER_NAME'] |
||
190 | del os.environ['SCHEDULER_PORT_23333_TCP'] |
||
191 | |||
192 | def test_a100_all(self): |
||
193 | import subprocess |
||
194 | #cmd = [sys.executable] |
||
195 | cmd = ['coverage', 'run'] |
||
196 | p = subprocess.Popen(cmd+[ |
||
197 | inspect.getsourcefile(run), |
||
198 | '--taskdb', 'sqlite+taskdb:///data/tests/all_test_task.db', |
||
199 | '--resultdb', 'sqlite+resultdb:///data/tests/all_test_result.db', |
||
200 | '--projectdb', 'local+projectdb://'+inspect.getsourcefile(data_sample_handler), |
||
201 | 'all', |
||
202 | ], close_fds=True, preexec_fn=os.setsid) |
||
203 | |||
204 | |||
205 | try: |
||
206 | limit = 30 |
||
207 | while limit >= 0: |
||
208 | time.sleep(3) |
||
209 | # click run |
||
210 | try: |
||
211 | requests.post('http://localhost:5000/run', data={ |
||
212 | 'project': 'data_sample_handler', |
||
213 | }) |
||
214 | except requests.exceptions.ConnectionError: |
||
215 | limit -= 1 |
||
216 | continue |
||
217 | break |
||
218 | |||
219 | limit = 30 |
||
220 | data = requests.get('http://localhost:5000/counter') |
||
221 | self.assertEqual(data.status_code, 200) |
||
222 | while data.json().get('data_sample_handler', {}).get('5m', {}).get('success', 0) < 5: |
||
223 | time.sleep(1) |
||
224 | data = requests.get('http://localhost:5000/counter') |
||
225 | limit -= 1 |
||
226 | if limit <= 0: |
||
227 | break |
||
228 | |||
229 | self.assertGreater(limit, 0) |
||
230 | rv = requests.get('http://localhost:5000/results?project=data_sample_handler') |
||
231 | self.assertIn('<th>url</th>', rv.text) |
||
232 | self.assertIn('class=url', rv.text) |
||
233 | except: |
||
234 | raise |
||
235 | finally: |
||
236 | time.sleep(1) |
||
237 | os.killpg(p.pid, signal.SIGTERM) |
||
238 | p.wait() |
||
239 | |||
240 | def test_a110_one(self): |
||
241 | pid, fd = os.forkpty() |
||
242 | #cmd = [sys.executable] |
||
243 | cmd = ['coverage', 'run'] |
||
244 | cmd += [ |
||
245 | inspect.getsourcefile(run), |
||
246 | 'one', |
||
247 | '-i', |
||
248 | inspect.getsourcefile(data_sample_handler) |
||
249 | ] |
||
250 | |||
251 | if pid == 0: |
||
252 | # child |
||
253 | os.execvp(cmd[0], cmd) |
||
254 | else: |
||
255 | # parent |
||
256 | def wait_text(timeout=1): |
||
257 | import select |
||
258 | text = [] |
||
259 | while True: |
||
260 | rl, wl, xl = select.select([fd], [], [], timeout) |
||
261 | if not rl: |
||
262 | break |
||
263 | try: |
||
264 | t = os.read(fd, 1024) |
||
265 | except OSError: |
||
266 | break |
||
267 | if not t: |
||
268 | break |
||
269 | t = utils.text(t) |
||
270 | text.append(t) |
||
271 | print(t, end='') |
||
272 | return ''.join(text) |
||
273 | |||
274 | text = wait_text(3) |
||
275 | self.assertIn('new task data_sample_handler:on_start', text) |
||
276 | self.assertIn('pyspider shell', text) |
||
277 | |||
278 | os.write(fd, utils.utf8('run()\n')) |
||
279 | text = wait_text() |
||
280 | self.assertIn('task done data_sample_handler:on_start', text) |
||
281 | |||
282 | os.write(fd, utils.utf8('crawl("%s/pyspider/test.html")\n' % self.httpbin)) |
||
283 | text = wait_text() |
||
284 | self.assertIn('/robots.txt', text) |
||
285 | |||
286 | os.write(fd, utils.utf8('crawl("%s/links/10/0")\n' % self.httpbin)) |
||
287 | text = wait_text() |
||
288 | if '"title": "Links"' not in text: |
||
289 | os.write(fd, utils.utf8('crawl("%s/links/10/1")\n' % self.httpbin)) |
||
290 | text = wait_text() |
||
291 | self.assertIn('"title": "Links"', text) |
||
292 | |||
293 | os.write(fd, utils.utf8('crawl("%s/404")\n' % self.httpbin)) |
||
294 | text = wait_text() |
||
295 | self.assertIn('task retry', text) |
||
296 | |||
297 | os.write(fd, b'quit_pyspider()\n') |
||
298 | text = wait_text() |
||
299 | self.assertIn('scheduler exiting...', text) |
||
300 | os.close(fd) |
||
301 | os.kill(pid, signal.SIGINT) |
||
302 | |||
350 |