| Total Complexity | 44 | 
| Total Lines | 381 | 
| Duplicated Lines | 8.4 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like cli often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | #!/usr/bin/python3  | 
            ||
| 2 | |||
| 3 | # -*- coding: utf-8 -*-  | 
            ||
| 4 | |||
| 5 | import atexit  | 
            ||
| 6 | import json  | 
            ||
| 7 | import os  | 
            ||
| 8 | from os.path import abspath, dirname, join  | 
            ||
| 9 | import pytest  | 
            ||
| 10 | import re  | 
            ||
| 11 | import shutil  | 
            ||
| 12 | import subprocess  | 
            ||
| 13 | import sys  | 
            ||
| 14 | import tempfile  | 
            ||
| 15 | import unittest  | 
            ||
| 16 | |||
| 17 | sys.path.insert(0, join(dirname(abspath(__file__)), '../..'))  | 
            ||
| 18 | sys.path.insert(0, join(dirname(abspath(__file__)), '../../inji'))  | 
            ||
| 19 | |||
| 20 | import inji  | 
            ||
| 21 | inji = abspath(join(sys.path[0], 'inji'))  | 
            ||
| 22 | |||
| 23 | def run(cmd, *args, **kwargs):  | 
            ||
| 24 | os.environ['PYTHONUNBUFFERED'] = "1"  | 
            ||
| 25 | proc = subprocess.Popen( [cmd, *args],  | 
            ||
| 26 | stdout = subprocess.PIPE,  | 
            ||
| 27 | stderr = subprocess.PIPE,  | 
            ||
| 28 | )  | 
            ||
| 29 | stdout, stderr = proc.communicate(**kwargs)  | 
            ||
| 30 |   return proc.returncode, stdout.decode('utf-8'), stderr.decode('utf-8') | 
            ||
| 31 | |||
| 32 | def check_output(*args, **kwargs):  | 
            ||
| 33 |   return subprocess.check_output( [*args], **kwargs ).decode('utf-8') | 
            ||
| 34 | |||
| 35 | def file_from_text(*args, **kwargs):  | 
            ||
| 36 | """ Write args to a tempfile and return the filename """  | 
            ||
| 37 | |||
| 38 |   fqdir=kwargs.get('dir', tempfile.tempdir) | 
            ||
| 39 | |||
| 40 |   if kwargs.get('name') is None: | 
            ||
| 41 | _, filename = tempfile.mkstemp( text=True, dir=fqdir)  | 
            ||
| 42 | else:  | 
            ||
| 43 |     filename = join(fqdir, kwargs.get('name')) | 
            ||
| 44 | |||
| 45 | atexit.register(os.remove, filename)  | 
            ||
| 46 | with open(filename, "a+") as f:  | 
            ||
| 47 |     f.write('\n'.join(args)) | 
            ||
| 48 | |||
| 49 | return abspath(filename)  | 
            ||
| 50 | |||
| 51 | class TestFixtureHelloWorld(unittest.TestCase):  | 
            ||
| 52 | |||
| 53 | def test_hello_world(self):  | 
            ||
| 54 |     code, out, err = run('/bin/echo', 'hello', 'world') | 
            ||
| 55 | assert 'hello' in out  | 
            ||
| 56 | assert 'world' in out  | 
            ||
| 57 | assert '' in err  | 
            ||
| 58 | assert code == 0  | 
            ||
| 59 | |||
| 60 | def test_check_output(self):  | 
            ||
| 61 | out = check_output( 'sed', 's/foo/bar/', input=b'foo' )  | 
            ||
| 62 | assert "bar" in out  | 
            ||
| 63 | |||
| 64 | class TestInjiCmd(unittest.TestCase):  | 
            ||
| 65 | |||
| 66 | def test_help(self):  | 
            ||
| 67 | """Test help message is emitted"""  | 
            ||
| 68 | code, out, err = run(inji, '-h')  | 
            ||
| 69 | assert code == 0  | 
            ||
| 70 | assert 'usage:' in out  | 
            ||
| 71 | assert '' == err  | 
            ||
| 72 | |||
| 73 | def test_stdin(self):  | 
            ||
| 74 | """Templates should be read in from STDIN (-) by default"""  | 
            ||
| 75 | assert "Hola world!\n" == \  | 
            ||
| 76 | check_output( inji,  | 
            ||
| 77 |                     input=b"{% set foo='world!' %}Hola {{ foo }}" | 
            ||
| 78 | )  | 
            ||
| 79 | |||
| 80 | def test_stdin_empty_input(self):  | 
            ||
| 81 | """Empty template string should return a newline"""  | 
            ||
| 82 | assert '\n' == check_output( inji, input=b"" )  | 
            ||
| 83 | |||
| 84 | def test_json_config_args(self):  | 
            ||
| 85 | """Config passed as JSON string"""  | 
            ||
| 86 | assert "Hola world!\n" == \  | 
            ||
| 87 | check_output(  | 
            ||
| 88 |         inji, '-j', '{"foo": "world!"}', | 
            ||
| 89 |           input=b"Hola {{ foo }}" | 
            ||
| 90 | )  | 
            ||
| 91 | |||
| 92 | View Code Duplication | def test_invalid_json_config_args(self):  | 
            |
| 93 | """Empty json config args should cause an error"""  | 
            ||
| 94 | class InvalidJSONConfigException(Exception): pass  | 
            ||
| 95 |     input_cases = [ '', '}{', '{@}', '{"foo": tru}' ] # invalid JSON inputs | 
            ||
| 96 | for cfg in input_cases:  | 
            ||
| 97 | with pytest.raises(InvalidJSONConfigException) as e_info:  | 
            ||
| 98 | try:  | 
            ||
| 99 | check_output( inji, '-j', cfg,  | 
            ||
| 100 | stderr=subprocess.STDOUT,  | 
            ||
| 101 | )  | 
            ||
| 102 | except subprocess.CalledProcessError as exc:  | 
            ||
| 103 |           msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 104 | raise InvalidJSONConfigException(msg) from exc  | 
            ||
| 105 | e = str(e_info)  | 
            ||
| 106 |       assert re.search("Error parsing JSON config:", e) | 
            ||
| 107 | assert "exit_code:2 " in e  | 
            ||
| 108 | |||
| 109 | def test_kv_config_args(self):  | 
            ||
| 110 | """ Config passed as KV strings """  | 
            ||
| 111 | assert check_output(  | 
            ||
| 112 | inji,  | 
            ||
| 113 | '-k', 'foo=bar',  | 
            ||
| 114 | '-k', 'foo=world!', # valid, last declaration wins  | 
            ||
| 115 |           input=b"Hola {{ foo }}" | 
            ||
| 116 | ) == "Hola world!\n"  | 
            ||
| 117 | assert check_output(  | 
            ||
| 118 | inji, '-k', 'foo=', # valid, sets foo to be empty  | 
            ||
| 119 |           input=b"Hola {{ foo }}" | 
            ||
| 120 | ) == "Hola \n"  | 
            ||
| 121 | |||
| 122 | View Code Duplication | def test_invalid_kv_config_args(self):  | 
            |
| 123 | """Invalid KV config args should cause an error"""  | 
            ||
| 124 | class InvalidKVConfigException(Exception): pass  | 
            ||
| 125 | input_cases = [ '', '=', '=baz' ] # invalid KV inputs  | 
            ||
| 126 | for cfg in input_cases:  | 
            ||
| 127 | with pytest.raises(InvalidKVConfigException) as e_info:  | 
            ||
| 128 | try:  | 
            ||
| 129 | check_output( inji, '-d', cfg,  | 
            ||
| 130 | stderr=subprocess.STDOUT,  | 
            ||
| 131 | )  | 
            ||
| 132 | except subprocess.CalledProcessError as exc:  | 
            ||
| 133 |           msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 134 | raise InvalidKVConfigException(msg) from exc  | 
            ||
| 135 | e = str(e_info)  | 
            ||
| 136 |       assert re.search("Invalid key found parsing", e) | 
            ||
| 137 | assert "exit_code:2 " in e  | 
            ||
| 138 | |||
| 139 | def test_12factor_config_sourcing(self):  | 
            ||
| 140 | """Test config sourcing precendence should adhere to 12-factor app expectations"""  | 
            ||
| 141 | tmpdir = tempfile.mkdtemp(prefix='param-')  | 
            ||
| 142 | atexit.register(shutil.rmtree, tmpdir)  | 
            ||
| 143 | |||
| 144 | for item in ['dev', 'prod', 'stage']:  | 
            ||
| 145 | cdir = os.path.join(tmpdir, item)  | 
            ||
| 146 | os.mkdir(cdir)  | 
            ||
| 147 | for file in 'a.yml', 'b.yaml', 'c.yaml':  | 
            ||
| 148 | fqn = os.path.join(cdir, file)  | 
            ||
| 149 |         file_from_text(f"zoo: {file}\nitem: {item}", dir=cdir, name=fqn) | 
            ||
| 150 | |||
| 151 | cfg_file = file_from_text(  | 
            ||
| 152 | f"zar: bella\nzoo: cfg",  | 
            ||
| 153 | dir=tmpdir, name='inji.yml' )  | 
            ||
| 154 | |||
| 155 | param_file = file_from_text(  | 
            ||
| 156 | f"zar: zella\nzoo: zorg",  | 
            ||
| 157 | dir=tmpdir, name='vars.yaml' )  | 
            ||
| 158 | |||
| 159 | # needed to source lowest precedence config from ./inji.y?ml  | 
            ||
| 160 | OLDPWD=os.getcwd()  | 
            ||
| 161 | os.chdir(tmpdir)  | 
            ||
| 162 | |||
| 163 | # test we are able to recursively source params  | 
            ||
| 164 | # but also source from the p5 config file relative to PWD  | 
            ||
| 165 |     assert re.search('Hola \w+ from bella', | 
            ||
| 166 | check_output(  | 
            ||
| 167 | inji,  | 
            ||
| 168 |           '-o', f"{tmpdir}/dev", | 
            ||
| 169 |           input=b"Hola {{ item }} from {{ zar }}" | 
            ||
| 170 | )  | 
            ||
| 171 | )  | 
            ||
| 172 | |||
| 173 | # dev/c.yaml should be last file sourced  | 
            ||
| 174 | assert "Hola c.yaml\n" == \  | 
            ||
| 175 | check_output(  | 
            ||
| 176 |         inji, '-o', f"{tmpdir}/dev", | 
            ||
| 177 |           input=b"Hola {{ zoo }}" | 
            ||
| 178 | )  | 
            ||
| 179 | |||
| 180 | # prod/ should be the last overlay sourced  | 
            ||
| 181 | assert "Hola prod\n" == \  | 
            ||
| 182 | check_output(  | 
            ||
| 183 | inji,  | 
            ||
| 184 |           '-o', f"{tmpdir}/stage", | 
            ||
| 185 |           '-o', f"{tmpdir}/dev", | 
            ||
| 186 |           '-o', f"{tmpdir}/prod", | 
            ||
| 187 |           input=b"Hola {{ item }}" | 
            ||
| 188 | )  | 
            ||
| 189 | |||
| 190 | # precedence 4  | 
            ||
| 191 | # named config file trumps overlays, arg position is irrelevant  | 
            ||
| 192 | assert "Hola zella from zorg\n" == \  | 
            ||
| 193 | check_output(  | 
            ||
| 194 | inji,  | 
            ||
| 195 |           '-o', f"{tmpdir}/stage", | 
            ||
| 196 |           '-o', f"{tmpdir}/prod", | 
            ||
| 197 | '-v', param_file,  | 
            ||
| 198 |           '-o', f"{tmpdir}/dev", | 
            ||
| 199 |           input=b"Hola {{ zar }} from {{ zoo }}" | 
            ||
| 200 | )  | 
            ||
| 201 | |||
| 202 | # precedence 3  | 
            ||
| 203 | # env vars trump named config files  | 
            ||
| 204 | os.environ['zoo']='env'  | 
            ||
| 205 | assert "Hola zella from env\n" == \  | 
            ||
| 206 | check_output(  | 
            ||
| 207 | inji,  | 
            ||
| 208 |           '-o', f"{tmpdir}/stage", | 
            ||
| 209 |           '-o', f"{tmpdir}/prod", | 
            ||
| 210 | '-v', param_file,  | 
            ||
| 211 |           '-o', f"{tmpdir}/dev", | 
            ||
| 212 |           input=b"Hola {{ zar }} from {{ zoo }}" | 
            ||
| 213 | )  | 
            ||
| 214 | |||
| 215 | # precedence 2  | 
            ||
| 216 | # cli params passed in as JSON take ultimate precendence  | 
            ||
| 217 | assert "Hola zella from world!\n" == \  | 
            ||
| 218 | check_output(  | 
            ||
| 219 | inji,  | 
            ||
| 220 |           '-o', f"{tmpdir}/stage", | 
            ||
| 221 |           '-c', '{"zoo": "world!"}', | 
            ||
| 222 |           '-o', f"{tmpdir}/prod", | 
            ||
| 223 | '-v', param_file,  | 
            ||
| 224 |           '-o', f"{tmpdir}/dev", | 
            ||
| 225 |           input=b"Hola {{ zar }} from {{ zoo }}" | 
            ||
| 226 | )  | 
            ||
| 227 | |||
| 228 | # precedence 1  | 
            ||
| 229 | # except when params are defined in the templates themselves, off course!  | 
            ||
| 230 | assert "Hola quux from mars\n" == \  | 
            ||
| 231 | check_output(  | 
            ||
| 232 | inji,  | 
            ||
| 233 |           '-o', f"{tmpdir}/stage", | 
            ||
| 234 |           '-c', '{"zoo": "mars"}', | 
            ||
| 235 |           '-o', f"{tmpdir}/prod", | 
            ||
| 236 | '-v', param_file,  | 
            ||
| 237 |           '-o', f"{tmpdir}/dev", | 
            ||
| 238 |           input=b"{% set zar='quux' %}Hola {{ zar }} from {{ zoo }}" | 
            ||
| 239 | )  | 
            ||
| 240 | |||
| 241 |     os.environ.pop('zoo') | 
            ||
| 242 | os.chdir(OLDPWD)  | 
            ||
| 243 | |||
| 244 | def test_strict_undefined_var(self):  | 
            ||
| 245 | """Undefined variables should cause a failure"""  | 
            ||
| 246 | class BarUndefinedException(Exception): pass  | 
            ||
| 247 | with pytest.raises(BarUndefinedException) as e_info:  | 
            ||
| 248 | try:  | 
            ||
| 249 | check_output( inji,  | 
            ||
| 250 |                       input=b"{% set foo='world!' %}Hola {{ bar }}", | 
            ||
| 251 | stderr=subprocess.STDOUT,  | 
            ||
| 252 | )  | 
            ||
| 253 | except subprocess.CalledProcessError as exc:  | 
            ||
| 254 |         msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 255 | raise BarUndefinedException(msg) from exc  | 
            ||
| 256 | e = str(e_info)  | 
            ||
| 257 | assert "exit_code:1 " in e  | 
            ||
| 258 |     assert re.search('jinja2.exceptions.UndefinedError.+bar.+is undefined', e) | 
            ||
| 259 | |||
| 260 | def test_keep_undefined_var(self):  | 
            ||
| 261 | """Undefined variables in keep mode should be kept"""  | 
            ||
| 262 |     assert '[Hola {{ foo }}]\n' == \ | 
            ||
| 263 | check_output( inji, '-s', 'keep',  | 
            ||
| 264 |                       input=b"[Hola {{ foo }}]" | 
            ||
| 265 | )  | 
            ||
| 266 | |||
| 267 | def test_empty_undefined_var(self):  | 
            ||
| 268 | """Undefined variables in empty mode should leave spaces behind placeholders"""  | 
            ||
| 269 | assert '[Hola ]\n' == \  | 
            ||
| 270 | check_output( inji, '-s', 'empty',  | 
            ||
| 271 |                       input=b"[Hola {{ foo }}]" | 
            ||
| 272 | )  | 
            ||
| 273 | |||
| 274 | def test_template_render_with_envvars(self):  | 
            ||
| 275 | """Environment variables should be referenceable as parameters"""  | 
            ||
| 276 |     template = file_from_text("Hola {{ foo }}") | 
            ||
| 277 | os.environ['foo'] = 'world!'  | 
            ||
| 278 | assert 'Hola world!\n' == \  | 
            ||
| 279 | check_output( inji, '-t', template )  | 
            ||
| 280 |     os.environ.pop('foo') | 
            ||
| 281 | |||
| 282 | def test_template_render_with_internal_vars(self):  | 
            ||
| 283 | """  | 
            ||
| 284 | Jinja template should render correctly  | 
            ||
| 285 | referencing those variables set in the templates themselves  | 
            ||
| 286 | """  | 
            ||
| 287 |     template = file_from_text("{% set foo='world!' %}Hola {{ foo }}") | 
            ||
| 288 | assert 'Hola world!\n' == \  | 
            ||
| 289 | check_output( inji, '-t', template )  | 
            ||
| 290 | |||
| 291 | def test_template_missing(self):  | 
            ||
| 292 | """ Missing template files should cause an error"""  | 
            ||
| 293 | class TemplateFileMissingException(Exception): pass  | 
            ||
| 294 | with pytest.raises(TemplateFileMissingException) as e_info:  | 
            ||
| 295 | try:  | 
            ||
| 296 | check_output( inji, '-t', 'nonexistent-template.j2',  | 
            ||
| 297 | stderr=subprocess.STDOUT,  | 
            ||
| 298 | )  | 
            ||
| 299 | except subprocess.CalledProcessError as exc:  | 
            ||
| 300 |         msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 301 | raise TemplateFileMissingException(msg) from exc  | 
            ||
| 302 | e = str(e_info)  | 
            ||
| 303 | assert "exit_code:2 " in e  | 
            ||
| 304 |     assert re.search('nonexistent-template.j2.. does not exist', e) | 
            ||
| 305 | |||
| 306 | def test_template_directory(self):  | 
            ||
| 307 | """ Using a directory as a template source should cause an error"""  | 
            ||
| 308 | class TemplateAsDirectoryException(Exception): pass  | 
            ||
| 309 | with pytest.raises(TemplateAsDirectoryException) as e_info:  | 
            ||
| 310 | try:  | 
            ||
| 311 | check_output( inji, '-t', '/',  | 
            ||
| 312 | stderr=subprocess.STDOUT,  | 
            ||
| 313 | )  | 
            ||
| 314 | except subprocess.CalledProcessError as exc:  | 
            ||
| 315 |         msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 316 | raise TemplateAsDirectoryException(msg) from exc  | 
            ||
| 317 | e = str(e_info)  | 
            ||
| 318 | assert "exit_code:2 " in e  | 
            ||
| 319 |     assert re.search('/.. is not a file', e) | 
            ||
| 320 | |||
| 321 | def test_template_render_with_varsfile(self):  | 
            ||
| 322 | """Params from params files should be rendered on the template output"""  | 
            ||
| 323 |     template = file_from_text("Hola {{ foo }}") | 
            ||
| 324 |     varsfile = file_from_text("foo: world!") | 
            ||
| 325 | assert 'Hola world!\n' == \  | 
            ||
| 326 | check_output( inji, '-t', template, '-v', varsfile )  | 
            ||
| 327 | |||
| 328 | def test_template_render_with_multiple_varsfiles(self):  | 
            ||
| 329 | """Params from multiple files should be merged before rendering"""  | 
            ||
| 330 |     template = file_from_text("Hola {{ foo }}, Hello {{ bar }}, t{{ t }}") | 
            ||
| 331 |     varsfile1 = file_from_text("foo: world!\nt: quux") | 
            ||
| 332 |     varsfile2 = file_from_text("bar: metaverse\nt: moocow") | 
            ||
| 333 | assert 'Hola world!, Hello metaverse, tmoocow\n' == \  | 
            ||
| 334 | check_output(  | 
            ||
| 335 | inji, '-t', template,  | 
            ||
| 336 | '-v', varsfile1,  | 
            ||
| 337 | '-v', varsfile2  | 
            ||
| 338 | )  | 
            ||
| 339 | |||
| 340 | def test_error_with_empty_varsfile(self):  | 
            ||
| 341 | """ An empty vars file is an error, we ought to fail early """  | 
            ||
| 342 | """  | 
            ||
| 343 | There may be a case for allowing this to just be a warning and so  | 
            ||
| 344 | we may change this behaviour in the future. For now, it definitely  | 
            ||
| 345 | is something we should fail-early on.  | 
            ||
| 346 | """  | 
            ||
| 347 | class EmptyVarsFileException(Exception): pass  | 
            ||
| 348 | with pytest.raises(EmptyVarsFileException) as e_info:  | 
            ||
| 349 | try:  | 
            ||
| 350 |         template = file_from_text("Hola {{ foo }}, Hello {{ bar }}") | 
            ||
| 351 |         varsfile = file_from_text('') | 
            ||
| 352 | check_output( inji, '-t', template, '-v', varsfile,  | 
            ||
| 353 | stderr=subprocess.STDOUT,  | 
            ||
| 354 | )  | 
            ||
| 355 | except subprocess.CalledProcessError as exc:  | 
            ||
| 356 |         msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 357 | raise EmptyVarsFileException(msg) from exc  | 
            ||
| 358 | e = str(e_info)  | 
            ||
| 359 |     assert re.search('TypeError: .* contains no data', e) | 
            ||
| 360 | assert "exit_code:1 " in e  | 
            ||
| 361 | |||
| 362 | def test_error_with_malformed_varsfile(self):  | 
            ||
| 363 | """ An invalid varsfile is a fail-early error """  | 
            ||
| 364 | class MalformedVarsFileException(Exception): pass  | 
            ||
| 365 | with pytest.raises(MalformedVarsFileException) as e_info:  | 
            ||
| 366 | try:  | 
            ||
| 367 |         varsfile = file_from_text('@') | 
            ||
| 368 | check_output( inji, '-v', varsfile,  | 
            ||
| 369 |                       input=b"Hola {{ foo }}, Hello {{ bar }}", | 
            ||
| 370 | stderr=subprocess.STDOUT  | 
            ||
| 371 | )  | 
            ||
| 372 | except subprocess.CalledProcessError as exc:  | 
            ||
| 373 |         msg = 'exit_code:{} output:{}'.format(exc.returncode, exc.output) | 
            ||
| 374 | raise MalformedVarsFileException(msg) from exc  | 
            ||
| 375 | e = str(e_info)  | 
            ||
| 376 |     assert re.search('cannot start any token', e) | 
            ||
| 377 | assert "exit_code:1 " in e  | 
            ||
| 378 | |||
| 379 | if __name__ == '__main__':  | 
            ||
| 380 | TestInjiCmd().test_empty_json_config_args()  | 
            ||
| 381 | |||
| 382 |