Total Complexity | 47 |
Total Lines | 162 |
Duplicated Lines | 86.42 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like dirutility.open.clazz often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | from __future__ import annotations |
||
2 | |||
3 | import abc |
||
4 | import os |
||
5 | import tarfile |
||
6 | from typing import Union, Dict, TextIO, List |
||
7 | from zipfile import PyZipFile |
||
8 | |||
9 | from dirutility.error import InvalidFileNameError, InvalidDirStructureError |
||
10 | |||
11 | |||
12 | View Code Duplication | class DirStructure: |
|
|
|||
13 | def __init__(self, **paths: Dict[str, Union[dict, str, TextIO, None]]): |
||
14 | self._content = {} |
||
15 | for pathname, _c in paths.items(): |
||
16 | if isinstance(_c, dict): |
||
17 | self._content[pathname] = DirStructure(**_c) |
||
18 | else: |
||
19 | self._content[pathname] = _c |
||
20 | |||
21 | @property |
||
22 | def content(self): |
||
23 | return self._content.copy() |
||
24 | |||
25 | @property |
||
26 | def dirsname(self) -> List[str]: |
||
27 | res = list(self._content.keys()) |
||
28 | res.sort() |
||
29 | return res |
||
30 | |||
31 | def _is_filename_valid(self, filename): |
||
32 | if isinstance(filename, str) and filename.strip(): |
||
33 | return True |
||
34 | else: |
||
35 | return False |
||
36 | |||
37 | def validate_filename(self, filename): |
||
38 | if not self._is_filename_valid(filename): |
||
39 | raise InvalidFileNameError(filename) |
||
40 | |||
41 | def create(self, prefix_dir: str): |
||
42 | for filename, _c in self._content.items(): |
||
43 | self.validate_filename(filename) |
||
44 | path = '{}/{}'.format(prefix_dir, filename) |
||
45 | if _c is None: |
||
46 | fd = open(path, 'w') |
||
47 | fd.close() |
||
48 | if isinstance(_c, str): |
||
49 | with open(path, 'w') as fd: |
||
50 | fd.write(_c) |
||
51 | elif isinstance(_c, dict): |
||
52 | os.mkdir(path) |
||
53 | elif isinstance(_c, DirStructure): |
||
54 | os.mkdir(path) |
||
55 | _c.create(path) |
||
56 | else: |
||
57 | with open(path, 'w') as fd: |
||
58 | for _l in _c: |
||
59 | fd.write(_l) |
||
60 | |||
61 | |||
62 | View Code Duplication | class DirBase(metaclass=abc.ABCMeta): |
|
63 | def __init__(self, dirpath: str, structure: Union[DirStructure, None] = None): |
||
64 | self._dirpath = dirpath |
||
65 | self._structure = structure |
||
66 | |||
67 | @property |
||
68 | def dirname(self) -> str: |
||
69 | return self.dirpath.rsplit('/', 1)[1] |
||
70 | |||
71 | @property |
||
72 | def dirpath(self) -> str: |
||
73 | return self._dirpath |
||
74 | |||
75 | @property |
||
76 | def dirparent(self) -> str: |
||
77 | return self.dirpath.rsplit('/', 1)[0] |
||
78 | |||
79 | def dirs(self, relative=True) -> List[str]: |
||
80 | if relative: |
||
81 | return self._structure.dirsname |
||
82 | else: |
||
83 | return ['{}/{}'.format(self._dirpath, _name) for _name in self._structure.dirsname] |
||
84 | |||
85 | def create_structure(self, **paths: Dict[str, Union[dict, str, TextIO, None]]): |
||
86 | self._structure = DirStructure(**paths) |
||
87 | self._structure.create(self._dirpath) |
||
88 | |||
89 | |||
90 | class TempDir(DirBase): |
||
91 | pass |
||
92 | |||
93 | |||
94 | View Code Duplication | class TarFile(object): |
|
95 | def __init__(self, filepath: str, tempdir: TempDir, mode: str = 'w:gz'): |
||
96 | |||
97 | self._mode = mode.strip() |
||
98 | _ar = self._mode.split(':') |
||
99 | if len(_ar) > 1 and _ar[-1].strip(): |
||
100 | self._compress_type = _ar[-1].strip() |
||
101 | else: |
||
102 | self._compress_type = None |
||
103 | self._open_mode = _ar[0] |
||
104 | self._filepath = filepath.strip() |
||
105 | self._filename = self._filepath.rsplit('/', 1)[-1] |
||
106 | self._filename_without_suffix = self._filename.rsplit('.', 1)[0] |
||
107 | self._validate_filename(self._filepath) |
||
108 | self._tempdir = tempdir |
||
109 | |||
110 | def _is_filename_valid(self, filename): |
||
111 | if filename.endswith(self._compress_type): |
||
112 | return True |
||
113 | else: |
||
114 | return False |
||
115 | |||
116 | def _validate_filename(self, filename): |
||
117 | if not self._is_filename_valid(filename): |
||
118 | raise InvalidFileNameError(filename) |
||
119 | |||
120 | @property |
||
121 | def filepath(self) -> str: |
||
122 | return self._filepath |
||
123 | |||
124 | def filename(self, with_suffix=True) -> str: |
||
125 | if with_suffix: |
||
126 | return self._filename |
||
127 | else: |
||
128 | return self._filename_without_suffix |
||
129 | |||
130 | @property |
||
131 | def tempdir(self) -> TempDir: |
||
132 | return self._tempdir |
||
133 | |||
134 | def create_archive(self, withdir: bool = False): |
||
135 | with tarfile.open(self._filepath, mode=self._mode) as tar: |
||
136 | for dirpath, dirname in zip(self._tempdir.dirs(relative=False), self._tempdir.dirs()): |
||
137 | tar.add(dirpath, arcname=dirname, recursive=True) |
||
138 | |||
139 | |||
140 | View Code Duplication | class ZipFile(TarFile): |
|
141 | def __init__(self, filepath: str, tempdir: TempDir, mode: str = 'w:zip'): |
||
142 | super(ZipFile, self).__init__(filepath, tempdir, mode) |
||
143 | |||
144 | def create_archive(self, withdir: bool = False): |
||
145 | zip_file = PyZipFile(self._filepath, mode=self._open_mode) |
||
146 | for root, dirs, files in os.walk(self._tempdir.dirpath): |
||
147 | if files: |
||
148 | for file in files: |
||
149 | _fp = os.path.join(root, file) |
||
150 | if withdir: |
||
151 | zip_file.write(_fp, arcname='{}/{}'.format( |
||
152 | self.filename(with_suffix=False), _fp.replace(self._tempdir.dirpath, ''))) |
||
153 | else: |
||
154 | zip_file.write(_fp, arcname=_fp.replace(self._tempdir.dirpath, '')) |
||
155 | |||
156 | else: |
||
157 | if withdir: |
||
158 | zip_file.write(root, arcname='{}/{}'.format( |
||
159 | self.filename(with_suffix=False), root.replace(self._tempdir.dirpath, ''))) |
||
160 | else: |
||
161 | zip_file.write(root, arcname=root.replace(self._tempdir.dirpath, '')) |
||
162 |