Total Complexity | 47 |
Total Lines | 168 |
Duplicated Lines | 83.33 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like dirutility.open.clazz often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | from __future__ import annotations |
||
2 | |||
3 | import abc |
||
4 | import os |
||
5 | import tarfile |
||
6 | from typing import Union, Dict, TextIO, List |
||
7 | from zipfile import PyZipFile |
||
8 | |||
9 | from dirutility.error import InvalidFileNameError, InvalidDirStructureError |
||
10 | |||
11 | |||
12 | View Code Duplication | class DirStructure: |
|
|
|||
13 | |||
14 | def __init__(self, **paths: Dict[str, Union[dict, str, TextIO, None]]): |
||
15 | self._content = {} |
||
16 | for pathname, _c in paths.items(): |
||
17 | if isinstance(_c, dict): |
||
18 | self._content[pathname] = DirStructure(**_c) |
||
19 | else: |
||
20 | self._content[pathname] = _c |
||
21 | |||
22 | @property |
||
23 | def content(self): |
||
24 | return self._content.copy() |
||
25 | |||
26 | @property |
||
27 | def dirsname(self) -> List[str]: |
||
28 | res = list(self._content.keys()) |
||
29 | res.sort() |
||
30 | return res |
||
31 | |||
32 | def _is_filename_valid(self, filename): |
||
33 | if isinstance(filename, str) and filename.strip(): |
||
34 | return True |
||
35 | else: |
||
36 | return False |
||
37 | |||
38 | def validate_filename(self, filename): |
||
39 | if not self._is_filename_valid(filename): |
||
40 | raise InvalidFileNameError(filename) |
||
41 | |||
42 | def create(self, prefix_dir: str): |
||
43 | for filename, _c in self._content.items(): |
||
44 | self.validate_filename(filename) |
||
45 | path = '{}/{}'.format(prefix_dir, filename) |
||
46 | if _c is None: |
||
47 | fd = open(path, 'w') |
||
48 | fd.close() |
||
49 | if isinstance(_c, str): |
||
50 | with open(path, 'w') as fd: |
||
51 | fd.write(_c) |
||
52 | elif isinstance(_c, dict): |
||
53 | os.mkdir(path) |
||
54 | elif isinstance(_c, DirStructure): |
||
55 | os.mkdir(path) |
||
56 | _c.create(path) |
||
57 | else: |
||
58 | with open(path, 'w') as fd: |
||
59 | for _l in _c: |
||
60 | fd.write(_l) |
||
61 | |||
62 | |||
63 | View Code Duplication | class DirBase(metaclass=abc.ABCMeta): |
|
64 | |||
65 | def __init__(self, dirpath: str, structure: Union[DirStructure, None] = None): |
||
66 | self._dirpath = dirpath |
||
67 | self._structure = structure |
||
68 | |||
69 | @property |
||
70 | def dirname(self) -> str: |
||
71 | return self.dirpath.rsplit('/', 1)[1] |
||
72 | |||
73 | @property |
||
74 | def dirpath(self) -> str: |
||
75 | return self._dirpath |
||
76 | |||
77 | @property |
||
78 | def dirparent(self) -> str: |
||
79 | return self.dirpath.rsplit('/', 1)[0] |
||
80 | |||
81 | def dirs(self, relative=True) -> List[str]: |
||
82 | if relative: |
||
83 | return self._structure.dirsname |
||
84 | else: |
||
85 | return ['{}/{}'.format(self._dirpath, _name) for _name in self._structure.dirsname] |
||
86 | |||
87 | def create_structure(self, **paths: Dict[str, Union[dict, str, TextIO, None]]): |
||
88 | self._structure = DirStructure(**paths) |
||
89 | self._structure.create(self._dirpath) |
||
90 | |||
91 | |||
92 | class TempDir(DirBase): |
||
93 | pass |
||
94 | |||
95 | |||
96 | View Code Duplication | class TarFile(object): |
|
97 | |||
98 | def __init__(self, filepath: str, tempdir: TempDir, mode: str = 'w:gz'): |
||
99 | |||
100 | self._mode = mode.strip() |
||
101 | _ar = self._mode.split(':') |
||
102 | if len(_ar) > 1 and _ar[-1].strip(): |
||
103 | self._compress_type = _ar[-1].strip() |
||
104 | else: |
||
105 | self._compress_type = None |
||
106 | self._open_mode = _ar[0] |
||
107 | self._filepath = filepath.strip() |
||
108 | self._filename = self._filepath.rsplit('/', 1)[-1] |
||
109 | self._filename_without_suffix = self._filename.rsplit('.', 1)[0] |
||
110 | self._validate_filename(self._filepath) |
||
111 | self._tempdir = tempdir |
||
112 | |||
113 | def _is_filename_valid(self, filename): |
||
114 | if filename.endswith(self._compress_type): |
||
115 | return True |
||
116 | else: |
||
117 | return False |
||
118 | |||
119 | def _validate_filename(self, filename): |
||
120 | if not self._is_filename_valid(filename): |
||
121 | raise InvalidFileNameError(filename) |
||
122 | |||
123 | @property |
||
124 | def filepath(self) -> str: |
||
125 | return self._filepath |
||
126 | |||
127 | def filename(self, with_suffix=True) -> str: |
||
128 | if with_suffix: |
||
129 | return self._filename |
||
130 | else: |
||
131 | return self._filename_without_suffix |
||
132 | |||
133 | @property |
||
134 | def tempdir(self) -> TempDir: |
||
135 | return self._tempdir |
||
136 | |||
137 | def create_archive(self, withdir: bool = False): |
||
138 | with tarfile.open(self._filepath, mode=self._mode) as tar: |
||
139 | for dirpath, dirname in zip(self._tempdir.dirs(relative=False), self._tempdir.dirs()): |
||
140 | tar.add(dirpath, arcname=dirname, recursive=True) |
||
141 | |||
142 | |||
143 | View Code Duplication | class ZipFile(TarFile): |
|
144 | |||
145 | def __init__(self, filepath: str, tempdir: TempDir, mode: str = 'w:zip'): |
||
146 | super(ZipFile, self).__init__(filepath, tempdir, mode) |
||
147 | |||
148 | def create_archive(self, withdir: bool = False): |
||
149 | zip_file = PyZipFile(self._filepath, mode=self._open_mode) |
||
150 | for root, dirs, files in os.walk(self._tempdir.dirpath): |
||
151 | if files: |
||
152 | for file in files: |
||
153 | _fp = os.path.join(root, file) |
||
154 | if withdir: |
||
155 | zip_file.write(_fp, |
||
156 | arcname='{}/{}'.format(self.filename(with_suffix=False), |
||
157 | _fp.replace(self._tempdir.dirpath, ''))) |
||
158 | else: |
||
159 | zip_file.write(_fp, arcname=_fp.replace(self._tempdir.dirpath, '')) |
||
160 | |||
161 | else: |
||
162 | if withdir: |
||
163 | zip_file.write(root, |
||
164 | arcname='{}/{}'.format(self.filename(with_suffix=False), |
||
165 | root.replace(self._tempdir.dirpath, ''))) |
||
166 | else: |
||
167 | zip_file.write(root, arcname=root.replace(self._tempdir.dirpath, '')) |
||
168 |