Total Complexity | 44 |
Total Lines | 381 |
Duplicated Lines | 8.4 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like cli often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/python3 |
||
2 | |||
3 | # -*- coding: utf-8 -*- |
||
4 | |||
5 | import atexit |
||
6 | import json |
||
7 | import os |
||
8 | from os.path import abspath, dirname, join |
||
9 | import pytest |
||
10 | import re |
||
11 | import shutil |
||
12 | import subprocess |
||
13 | import sys |
||
14 | import tempfile |
||
15 | import unittest |
||
16 | |||
17 | sys.path.insert(0, join(dirname(abspath(__file__)), '../..')) |
||
18 | sys.path.insert(0, join(dirname(abspath(__file__)), '../../inji')) |
||
19 | |||
20 | import inji |
||
21 | inji = abspath(join(sys.path[0], 'inji')) |
||
22 | |||
23 | def run(cmd, *args, **kwargs): |
||
24 | os.environ['PYTHONUNBUFFERED'] = "1" |
||
25 | proc = subprocess.Popen( [cmd, *args], |
||
26 | stdout = subprocess.PIPE, |
||
27 | stderr = subprocess.PIPE, |
||
28 | ) |
||
29 | stdout, stderr = proc.communicate(**kwargs) |
||
30 | return proc.returncode, stdout.decode('utf-8'), stderr.decode('utf-8') |
||
31 | |||
32 | def check_output(*args, **kwargs): |
||
33 | return subprocess.check_output( [*args], **kwargs ).decode('utf-8') |
||
34 | |||
35 | def file_from_text(*args, **kwargs): |
||
36 | """ Write args to a tempfile and return the filename """ |
||
37 | |||
38 | fqdir=kwargs.get('dir', tempfile.tempdir) |
||
39 | |||
40 | if kwargs.get('name') is None: |
||
41 | _, filename = tempfile.mkstemp( text=True, dir=fqdir) |
||
42 | else: |
||
43 | filename = join(fqdir, kwargs.get('name')) |
||
44 | |||
45 | atexit.register(os.remove, filename) |
||
46 | with open(filename, "a+") as f: |
||
47 | f.write('\n'.join(args)) |
||
48 | |||
49 | return abspath(filename) |
||
50 | |||
51 | class TestFixtureHelloWorld(unittest.TestCase): |
||
52 | |||
53 | def test_hello_world(self): |
||
54 | code, out, err = run('/bin/echo', 'hello', 'world') |
||
55 | assert 'hello' in out |
||
56 | assert 'world' in out |
||
57 | assert '' in err |
||
58 | assert code == 0 |
||
59 | |||
60 | def test_check_output(self): |
||
61 | out = check_output( 'sed', 's/foo/bar/', input=b'foo' ) |
||
62 | assert "bar" in out |
||
63 | |||
64 | class TestInjiCmd(unittest.TestCase): |
||
65 | |||
66 | def test_help(self): |
||
67 | """Test help message is emitted""" |
||
68 | code, out, err = run(inji, '-h') |
||
69 | assert code == 0 |
||
70 | assert 'usage:' in out |
||
71 | assert '' == err |
||
72 | |||
73 | def test_stdin(self): |
||
74 | """Templates should be read in from STDIN (-) by default""" |
||
75 | assert "Hola world!\n" == \ |
||
76 | check_output( inji, |
||
77 | input=b"{% set foo='world!' %}Hola {{ foo }}" |
||
78 | ) |
||
79 | |||
80 | def test_stdin_empty_input(self): |
||
81 | """Empty template string should return a newline""" |
||
82 | assert '\n' == check_output( inji, input=b"" ) |
||
83 | |||
84 | def test_json_config_args(self): |
||
85 | """Config passed as JSON string""" |
||
86 | assert "Hola world!\n" == \ |
||
87 | check_output( |
||
88 | inji, '-j', '{"foo": "world!"}', |
||
89 | input=b"Hola {{ foo }}" |
||
90 | ) |
||
91 | |||
92 | View Code Duplication | def test_invalid_json_config_args(self): |
|
1 ignored issue
–
show
|
|||
93 | """Empty json config args should cause an error""" |
||
94 | class InvalidJSONConfigException(Exception): pass |
||
95 | input_cases = [ '', '}{', '{@}', '{"foo": tru}' ] # invalid JSON inputs |
||
96 | for cfg in input_cases: |
||
97 | with pytest.raises(InvalidJSONConfigException) as e_info: |
||
98 | try: |
||
99 | check_output( inji, '-j', cfg, |
||
100 | stderr=subprocess.STDOUT, |
||
101 | ) |
||
102 | except subprocess.CalledProcessError as exc: |
||
103 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
104 | raise InvalidJSONConfigException(msg) from exc |
||
105 | e = str(e_info) |
||
106 | assert re.search("Error parsing JSON config:", e) |
||
107 | assert "exit_code:2 " in e |
||
108 | |||
109 | def test_kv_config_args(self): |
||
110 | """ Config passed as KV strings """ |
||
111 | assert check_output( |
||
112 | inji, |
||
113 | '-k', 'foo=bar', |
||
114 | '-k', 'foo=world!', # valid, last declaration wins |
||
115 | input=b"Hola {{ foo }}" |
||
116 | ) == "Hola world!\n" |
||
117 | assert check_output( |
||
118 | inji, '-k', 'foo=', # valid, sets foo to be empty |
||
119 | input=b"Hola {{ foo }}" |
||
120 | ) == "Hola \n" |
||
121 | |||
122 | View Code Duplication | def test_invalid_kv_config_args(self): |
|
1 ignored issue
–
show
|
|||
123 | """Invalid KV config args should cause an error""" |
||
124 | class InvalidKVConfigException(Exception): pass |
||
125 | input_cases = [ '=', '=baz' ] # invalid KV inputs |
||
126 | for cfg in input_cases: |
||
127 | with pytest.raises(InvalidKVConfigException) as e_info: |
||
128 | try: |
||
129 | check_output( inji, '-d', cfg, |
||
130 | stderr=subprocess.STDOUT, |
||
131 | ) |
||
132 | except subprocess.CalledProcessError as exc: |
||
133 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
134 | raise InvalidKVConfigException(msg) from exc |
||
135 | e = str(e_info) |
||
136 | assert re.search("Missing or Invalid key found parsing", e) |
||
137 | assert "exit_code:2 " in e |
||
138 | |||
139 | def test_12factor_config_sourcing(self): |
||
140 | """Test config sourcing precendence should adhere to 12-factor app expectations""" |
||
141 | tmpdir = tempfile.mkdtemp(prefix='param-') |
||
142 | atexit.register(shutil.rmtree, tmpdir) |
||
143 | |||
144 | for item in ['dev', 'prod', 'stage']: |
||
145 | cdir = os.path.join(tmpdir, item) |
||
146 | os.mkdir(cdir) |
||
147 | for file in 'a.yml', 'b.yaml', 'c.yaml': |
||
148 | fqn = os.path.join(cdir, file) |
||
149 | file_from_text(f"zoo: {file}\nitem: {item}", dir=cdir, name=fqn) |
||
150 | |||
151 | cfg_file = file_from_text( |
||
152 | f"zar: bella\nzoo: cfg", |
||
153 | dir=tmpdir, name='inji.yml' ) |
||
154 | |||
155 | param_file = file_from_text( |
||
156 | f"zar: zella\nzoo: zorg", |
||
157 | dir=tmpdir, name='vars.yaml' ) |
||
158 | |||
159 | # needed to source lowest precedence config from ./inji.y?ml |
||
160 | OLDPWD=os.getcwd() |
||
161 | os.chdir(tmpdir) |
||
162 | |||
163 | # test we are able to recursively source params |
||
164 | # but also source from the p5 config file relative to PWD |
||
165 | assert re.search('Hola \w+ from bella', |
||
166 | check_output( |
||
167 | inji, |
||
168 | '-o', f"{tmpdir}/dev", |
||
169 | input=b"Hola {{ item }} from {{ zar }}" |
||
170 | ) |
||
171 | ) |
||
172 | |||
173 | # dev/c.yaml should be last file sourced |
||
174 | assert "Hola c.yaml\n" == \ |
||
175 | check_output( |
||
176 | inji, '-o', f"{tmpdir}/dev", |
||
177 | input=b"Hola {{ zoo }}" |
||
178 | ) |
||
179 | |||
180 | # prod/ should be the last overlay sourced |
||
181 | assert "Hola prod\n" == \ |
||
182 | check_output( |
||
183 | inji, |
||
184 | '-o', f"{tmpdir}/stage", |
||
185 | '-o', f"{tmpdir}/dev", |
||
186 | '-o', f"{tmpdir}/prod", |
||
187 | input=b"Hola {{ item }}" |
||
188 | ) |
||
189 | |||
190 | # precedence 4 |
||
191 | # named config file trumps overlays, arg position is irrelevant |
||
192 | assert "Hola zella from zorg\n" == \ |
||
193 | check_output( |
||
194 | inji, |
||
195 | '-o', f"{tmpdir}/stage", |
||
196 | '-o', f"{tmpdir}/prod", |
||
197 | '-v', param_file, |
||
198 | '-o', f"{tmpdir}/dev", |
||
199 | input=b"Hola {{ zar }} from {{ zoo }}" |
||
200 | ) |
||
201 | |||
202 | # precedence 3 |
||
203 | # env vars trump named config files |
||
204 | os.environ['zoo']='env' |
||
205 | assert "Hola zella from env\n" == \ |
||
206 | check_output( |
||
207 | inji, |
||
208 | '-o', f"{tmpdir}/stage", |
||
209 | '-o', f"{tmpdir}/prod", |
||
210 | '-v', param_file, |
||
211 | '-o', f"{tmpdir}/dev", |
||
212 | input=b"Hola {{ zar }} from {{ zoo }}" |
||
213 | ) |
||
214 | |||
215 | # precedence 2 |
||
216 | # cli params passed in as JSON take ultimate precendence |
||
217 | assert "Hola zella from world!\n" == \ |
||
218 | check_output( |
||
219 | inji, |
||
220 | '-o', f"{tmpdir}/stage", |
||
221 | '-c', '{"zoo": "world!"}', |
||
222 | '-o', f"{tmpdir}/prod", |
||
223 | '-v', param_file, |
||
224 | '-o', f"{tmpdir}/dev", |
||
225 | input=b"Hola {{ zar }} from {{ zoo }}" |
||
226 | ) |
||
227 | |||
228 | # precedence 1 |
||
229 | # except when params are defined in the templates themselves, off course! |
||
230 | assert "Hola quux from mars\n" == \ |
||
231 | check_output( |
||
232 | inji, |
||
233 | '-o', f"{tmpdir}/stage", |
||
234 | '-c', '{"zoo": "mars"}', |
||
235 | '-o', f"{tmpdir}/prod", |
||
236 | '-v', param_file, |
||
237 | '-o', f"{tmpdir}/dev", |
||
238 | input=b"{% set zar='quux' %}Hola {{ zar }} from {{ zoo }}" |
||
239 | ) |
||
240 | |||
241 | os.environ.pop('zoo') |
||
242 | os.chdir(OLDPWD) |
||
243 | |||
244 | def test_strict_undefined_var(self): |
||
245 | """Undefined variables should cause a failure""" |
||
246 | class BarUndefinedException(Exception): pass |
||
247 | with pytest.raises(BarUndefinedException) as e_info: |
||
248 | try: |
||
249 | check_output( inji, |
||
250 | input=b"{% set foo='world!' %}Hola {{ bar }}", |
||
251 | stderr=subprocess.STDOUT, |
||
252 | ) |
||
253 | except subprocess.CalledProcessError as exc: |
||
254 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
255 | raise BarUndefinedException(msg) from exc |
||
256 | e = str(e_info) |
||
257 | assert "exit_code:1 " in e |
||
258 | assert re.search('jinja2.exceptions.UndefinedError.+bar.+is undefined', e) |
||
259 | |||
260 | def test_keep_undefined_var(self): |
||
261 | """Undefined variables in keep mode should be kept""" |
||
262 | assert '[Hola {{ foo }}]\n' == \ |
||
263 | check_output( inji, '-s', 'keep', |
||
264 | input=b"[Hola {{ foo }}]" |
||
265 | ) |
||
266 | |||
267 | def test_empty_undefined_var(self): |
||
268 | """Undefined variables in empty mode should leave spaces behind placeholders""" |
||
269 | assert '[Hola ]\n' == \ |
||
270 | check_output( inji, '-s', 'empty', |
||
271 | input=b"[Hola {{ foo }}]" |
||
272 | ) |
||
273 | |||
274 | def test_template_render_with_envvars(self): |
||
275 | """Environment variables should be referenceable as parameters""" |
||
276 | template = file_from_text("Hola {{ foo }}") |
||
277 | os.environ['foo'] = 'world!' |
||
278 | assert 'Hola world!\n' == \ |
||
279 | check_output( inji, '-t', template ) |
||
280 | os.environ.pop('foo') |
||
281 | |||
282 | def test_template_render_with_internal_vars(self): |
||
283 | """ |
||
284 | Jinja template should render correctly |
||
285 | referencing those variables set in the templates themselves |
||
286 | """ |
||
287 | template = file_from_text("{% set foo='world!' %}Hola {{ foo }}") |
||
288 | assert 'Hola world!\n' == \ |
||
289 | check_output( inji, '-t', template ) |
||
290 | |||
291 | def test_template_missing(self): |
||
292 | """ Missing template files should cause an error""" |
||
293 | class TemplateFileMissingException(Exception): pass |
||
294 | with pytest.raises(TemplateFileMissingException) as e_info: |
||
295 | try: |
||
296 | check_output( inji, '-t', 'nonexistent-template.j2', |
||
297 | stderr=subprocess.STDOUT, |
||
298 | ) |
||
299 | except subprocess.CalledProcessError as exc: |
||
300 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
301 | raise TemplateFileMissingException(msg) from exc |
||
302 | e = str(e_info) |
||
303 | assert "exit_code:2 " in e |
||
304 | assert re.search('nonexistent-template.j2.. does not exist', e) |
||
305 | |||
306 | def test_template_directory(self): |
||
307 | """ Using a directory as a template source should cause an error""" |
||
308 | class TemplateAsDirectoryException(Exception): pass |
||
309 | with pytest.raises(TemplateAsDirectoryException) as e_info: |
||
310 | try: |
||
311 | check_output( inji, '-t', '/', |
||
312 | stderr=subprocess.STDOUT, |
||
313 | ) |
||
314 | except subprocess.CalledProcessError as exc: |
||
315 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
316 | raise TemplateAsDirectoryException(msg) from exc |
||
317 | e = str(e_info) |
||
318 | assert "exit_code:2 " in e |
||
319 | assert re.search('/.. is not a file', e) |
||
320 | |||
321 | def test_template_render_with_varsfile(self): |
||
322 | """Params from params files should be rendered on the template output""" |
||
323 | template = file_from_text("Hola {{ foo }}") |
||
324 | varsfile = file_from_text("foo: world!") |
||
325 | assert 'Hola world!\n' == \ |
||
326 | check_output( inji, '-t', template, '-v', varsfile ) |
||
327 | |||
328 | def test_template_render_with_multiple_varsfiles(self): |
||
329 | """Params from multiple files should be merged before rendering""" |
||
330 | template = file_from_text("Hola {{ foo }}, Hello {{ bar }}, t{{ t }}") |
||
331 | varsfile1 = file_from_text("foo: world!\nt: quux") |
||
332 | varsfile2 = file_from_text("bar: metaverse\nt: moocow") |
||
333 | assert 'Hola world!, Hello metaverse, tmoocow\n' == \ |
||
334 | check_output( |
||
335 | inji, '-t', template, |
||
336 | '-v', varsfile1, |
||
337 | '-v', varsfile2 |
||
338 | ) |
||
339 | |||
340 | def test_error_with_empty_varsfile(self): |
||
341 | """ An empty vars file is an error, we ought to fail early """ |
||
342 | """ |
||
343 | There may be a case for allowing this to just be a warning and so |
||
344 | we may change this behaviour in the future. For now, it definitely |
||
345 | is something we should fail-early on. |
||
346 | """ |
||
347 | class EmptyVarsFileException(Exception): pass |
||
348 | with pytest.raises(EmptyVarsFileException) as e_info: |
||
349 | try: |
||
350 | template = file_from_text("Hola {{ foo }}, Hello {{ bar }}") |
||
351 | varsfile = file_from_text('') |
||
352 | check_output( inji, '-t', template, '-v', varsfile, |
||
353 | stderr=subprocess.STDOUT, |
||
354 | ) |
||
355 | except subprocess.CalledProcessError as exc: |
||
356 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
357 | raise EmptyVarsFileException(msg) from exc |
||
358 | e = str(e_info) |
||
359 | assert re.search('TypeError: .* contains no data', e) |
||
360 | assert "exit_code:1 " in e |
||
361 | |||
362 | def test_error_with_malformed_varsfile(self): |
||
363 | """ An invalid varsfile is a fail-early error """ |
||
364 | class MalformedVarsFileException(Exception): pass |
||
365 | with pytest.raises(MalformedVarsFileException) as e_info: |
||
366 | try: |
||
367 | varsfile = file_from_text('@') |
||
368 | check_output( inji, '-v', varsfile, |
||
369 | input=b"Hola {{ foo }}, Hello {{ bar }}", |
||
370 | stderr=subprocess.STDOUT |
||
371 | ) |
||
372 | except subprocess.CalledProcessError as exc: |
||
373 | msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) |
||
374 | raise MalformedVarsFileException(msg) from exc |
||
375 | e = str(e_info) |
||
376 | assert re.search('cannot start any token', e) |
||
377 | assert "exit_code:1 " in e |
||
378 | |||
379 | if __name__ == '__main__': |
||
380 | TestInjiCmd().test_empty_json_config_args() |
||
381 | |||
382 |