Total Complexity | 65 |
Total Lines | 337 |
Duplicated Lines | 67.66 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like dirutility.walk.walk often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import os |
||
2 | import platform |
||
3 | import shutil |
||
4 | from datetime import datetime |
||
5 | from functools import reduce |
||
6 | from hashlib import md5 |
||
7 | from math import inf |
||
8 | from multiprocessing import cpu_count |
||
9 | from multiprocessing.pool import Pool |
||
10 | from operator import itemgetter |
||
11 | from pathlib import Path |
||
12 | |||
13 | from looptools import Timer |
||
14 | |||
15 | from dirutility.walk.filter import PathFilters |
||
16 | from dirutility.walk.multiprocess import Sprinter |
||
17 | from dirutility.walk.sequential import Crawler |
||
18 | |||
19 | |||
20 | View Code Duplication | class Printer: |
|
|
|||
21 | |||
22 | def __init__(self, console_output, console_stream): |
||
23 | """Printer function initialized with classes. Used for optional printing""" |
||
24 | self.console_output = console_output |
||
25 | self.console_stream = console_stream |
||
26 | |||
27 | def printer(self, message, stream=False): |
||
28 | if not stream: |
||
29 | if self.console_output: |
||
30 | print('\t' + message) |
||
31 | else: |
||
32 | if self.console_stream: |
||
33 | print('\t' + message) |
||
34 | |||
35 | |||
36 | def pool_process(func, iterable, process_name='Pool processing', cpus=cpu_count()): |
||
37 | """ |
||
38 | Apply a function to each element in an iterable and return a result list. |
||
39 | |||
40 | :param func: A function that returns a value |
||
41 | :param iterable: A list or set of elements to be passed to the func as the singular parameter |
||
42 | :param process_name: Name of the process, for printing purposes only |
||
43 | :param cpus: Number of CPUs |
||
44 | :return: Result list |
||
45 | """ |
||
46 | with Timer('\t{0} ({1}) completed in'.format(process_name, str(func))): |
||
47 | pool = Pool(cpus) |
||
48 | vals = pool.map(func, iterable) |
||
49 | pool.close() |
||
50 | return vals |
||
51 | |||
52 | |||
53 | def md5_hash(file_path): |
||
54 | """Open a file path and hash the contents.""" |
||
55 | with open(file_path, 'rb') as fp: |
||
56 | return md5(fp.read()).hexdigest() |
||
57 | |||
58 | |||
59 | def md5_tuple(file_path): |
||
60 | """Returns a file_path, hash tuple.""" |
||
61 | return file_path, md5_hash(file_path) |
||
62 | |||
63 | |||
64 | def pool_hash(path_list): |
||
65 | """Pool process file hashing.""" |
||
66 | return pool_process(md5_tuple, path_list, 'MD5 hashing') |
||
67 | |||
68 | |||
69 | def remover(file_path): |
||
70 | """Delete a file or directory path only if it exists.""" |
||
71 | if os.path.isfile(file_path): |
||
72 | os.remove(file_path) |
||
73 | return True |
||
74 | elif os.path.isdir(file_path): |
||
75 | shutil.rmtree(file_path) |
||
76 | return True |
||
77 | else: |
||
78 | return False |
||
79 | |||
80 | |||
81 | View Code Duplication | def creation_date(path_to_file, return_datetime=True): |
|
82 | """ |
||
83 | Retrieve a file's creation date. |
||
84 | |||
85 | Try to get the date that a file was created, falling back to when it was |
||
86 | last modified if that isn't possible. |
||
87 | |||
88 | See http://stackoverflow.com/a/39501288/1709587 for explanation. |
||
89 | |||
90 | :param path_to_file: File path |
||
91 | :param return_datetime: Bool, returns value in Datetime format |
||
92 | :return: Creation date |
||
93 | """ |
||
94 | if platform.system() == 'Windows': |
||
95 | created_at = os.path.getctime(path_to_file) |
||
96 | else: |
||
97 | stat = os.stat(path_to_file) |
||
98 | try: |
||
99 | created_at = stat.st_birthtime |
||
100 | except AttributeError: |
||
101 | # We're probably on Linux. No easy way to get creation dates here, |
||
102 | # so we'll settle for when its content was last modified. |
||
103 | created_at = stat.st_mtime |
||
104 | |||
105 | if return_datetime: |
||
106 | return datetime.fromtimestamp(created_at) |
||
107 | else: |
||
108 | return created_at |
||
109 | |||
110 | |||
111 | def creation_date_tuple(file_path): |
||
112 | """Returns a (file_path, creation_date) tuple.""" |
||
113 | return file_path, creation_date(file_path) |
||
114 | |||
115 | |||
116 | def pool_creation_date(path_list): |
||
117 | """Pool process file creation dates.""" |
||
118 | return pool_process(creation_date_tuple, path_list, 'File creation dates') |
||
119 | |||
120 | |||
121 | View Code Duplication | class DirPaths: |
|
122 | |||
123 | def __init__(self, |
||
124 | directory, |
||
125 | full_paths=False, |
||
126 | topdown=True, |
||
127 | to_include=None, |
||
128 | to_exclude=None, |
||
129 | min_level=0, |
||
130 | max_level=inf, |
||
131 | filters=None, |
||
132 | non_empty_folders=False, |
||
133 | parallelize=False, |
||
134 | pool_size=cpu_count(), |
||
135 | console_output=False, |
||
136 | console_stream=False, |
||
137 | hash_files=False): |
||
138 | """ |
||
139 | This class generates a list of either files and or folders within a root directory. |
||
140 | |||
141 | The walk method generates a directory list of files by walking the file tree top down or bottom up. The |
||
142 | files and folders method generate a list of files or folders in the top level of the tree. |
||
143 | |||
144 | :param directory: Starting directory file path |
||
145 | :param full_paths: Bool, when true full paths are concatenated to file paths list |
||
146 | :param topdown: Bool, when true walk method walks tree from the topdwon. When false tree is walked bottom up |
||
147 | :param to_include: None by default. List of filters acceptable to find within file path string return |
||
148 | :param to_exclude: None by default. List of filters NOT acceptable to return |
||
149 | :param min_level: 0 by default. Minimum directory level to save paths from |
||
150 | :param max_level: Infinity by default. Maximum directory level to save paths from |
||
151 | :param parallelize: Bool, when true pool processing is enabled within walk method |
||
152 | :param pool_size: Number of CPUs for pool processing, default is number of processors |
||
153 | :param console_output: Bool, when true console output is printed |
||
154 | :param console_stream: Bool, when true loops print live results |
||
155 | :param hash_files: Bool, when true walk() method return a dictionary file_paths and hashes |
||
156 | """ |
||
157 | self.timer = Timer() |
||
158 | self.full_paths = full_paths |
||
159 | self.topdown = topdown |
||
160 | |||
161 | # Exclude .DS_Store by default, set to_exclude to False to include .DS_Store |
||
162 | to_exclude = ['.DS_Store'] if to_exclude is None else to_exclude |
||
163 | if any(i for i in [to_include, to_exclude, filters]) or min_level != 0 or max_level != inf: |
||
164 | self.filters = PathFilters(to_include, to_exclude, min_level, max_level, filters, non_empty_folders) |
||
165 | else: |
||
166 | self.filters = False |
||
167 | |||
168 | self.console_output = console_output |
||
169 | self.console_stream = console_stream |
||
170 | self._hash_files = hash_files |
||
171 | |||
172 | self._printer = Printer(console_output, console_stream).printer |
||
173 | self._printer('DIRPATHS') |
||
174 | |||
175 | # Check that parallelization is enabled |
||
176 | if parallelize: |
||
177 | self.pool_size = pool_size |
||
178 | self.parallelize = parallelize |
||
179 | self.filepaths = [] |
||
180 | |||
181 | # Check if directory is a singular (1) string or if it is a list of strings (multiple) |
||
182 | try: |
||
183 | self.directory = [str(directory)] |
||
184 | except TypeError: |
||
185 | self.directory = [str(dirs) for dirs in directory] |
||
186 | |||
187 | def __iter__(self): |
||
188 | return iter(list(self.filepaths)) |
||
189 | |||
190 | def __str__(self): |
||
191 | return str(self.filepaths) |
||
192 | |||
193 | def __len__(self): |
||
194 | return len(self.filepaths) |
||
195 | |||
196 | def _get_filepaths(self): |
||
197 | """Filters list of file paths to remove non-included, remove excluded files and concatenate full paths.""" |
||
198 | self._printer(str(self.__len__()) + " file paths have been parsed in " + str(self.timer.end)) |
||
199 | if self._hash_files: |
||
200 | return pool_hash(self.filepaths) |
||
201 | else: |
||
202 | return self.filepaths |
||
203 | |||
204 | def creation_dates(self, sort=True): |
||
205 | """ |
||
206 | Return a list of (file_path, creation_date) tuples created from list of walked paths. |
||
207 | |||
208 | :param sort: Bool, sorts file_paths on created_date from newest to oldest. |
||
209 | :return: List of (file_path, created_date) tuples. |
||
210 | """ |
||
211 | if not sort: |
||
212 | return pool_creation_date(self.filepaths) |
||
213 | else: |
||
214 | pcd = pool_creation_date(self.filepaths) |
||
215 | pcd.sort(key=itemgetter(1), reverse=True) |
||
216 | return pcd |
||
217 | |||
218 | def walk(self): |
||
219 | """ |
||
220 | Default file path retrieval function. |
||
221 | sprinter() - Generates file path list using pool processing and Queues |
||
222 | crawler() - Generates file path list using os.walk() in sequence |
||
223 | """ |
||
224 | if self.parallelize: |
||
225 | self.filepaths = Sprinter(self.directory, self.filters, self.full_paths, self.pool_size, |
||
226 | self._printer).sprinter() |
||
227 | else: |
||
228 | self.filepaths = Crawler(self.directory, self.filters, self.full_paths, self.topdown, |
||
229 | self._printer).crawler() |
||
230 | return self._get_filepaths() |
||
231 | |||
232 | def files(self): |
||
233 | """Return list of files in root directory""" |
||
234 | self._printer('\tFiles Walk') |
||
235 | for directory in self.directory: |
||
236 | for path in os.listdir(directory): |
||
237 | full_path = os.path.join(directory, path) |
||
238 | if os.path.isfile(full_path): |
||
239 | if not path.startswith('.'): |
||
240 | self.filepaths.append(full_path) |
||
241 | return self._get_filepaths() |
||
242 | |||
243 | def folders(self): |
||
244 | """Return list of folders in root directory""" |
||
245 | for directory in self.directory: |
||
246 | for path in os.listdir(directory): |
||
247 | full_path = os.path.join(directory, path) |
||
248 | if os.path.isdir(full_path): |
||
249 | if not path.startswith('.'): |
||
250 | self.filepaths.append(full_path) |
||
251 | return self._get_filepaths() |
||
252 | |||
253 | |||
254 | View Code Duplication | class DirTree: |
|
255 | |||
256 | def __init__(self, root, branches=None): |
||
257 | """ |
||
258 | Generate a tree dictionary of the contents of a root directory. |
||
259 | :param root: Starting directory |
||
260 | :param branches: List of function tuples used for filtering |
||
261 | """ |
||
262 | self.tree_dict = {} |
||
263 | self.directory = Path(root) |
||
264 | self.start = str(self.directory).rfind(os.sep) + 1 |
||
265 | self.branches = branches |
||
266 | self.get() |
||
267 | |||
268 | def __iter__(self): |
||
269 | return iter(self.tree_dict.items()) |
||
270 | |||
271 | def __str__(self): |
||
272 | return str(self.tree_dict) |
||
273 | |||
274 | @property |
||
275 | def dict(self): |
||
276 | return self.tree_dict |
||
277 | |||
278 | def _filter(self, folders, folder_or_file): |
||
279 | for index in range(0, len(folders)): |
||
280 | filters = self.branches[index][folder_or_file] |
||
281 | if filters: |
||
282 | exclude = filters.get |
||
283 | include = filters.get |
||
284 | |||
285 | if exclude and folders[index] in exclude: |
||
286 | return False |
||
287 | if include and folders[index] not in include: |
||
288 | return False |
||
289 | return True |
||
290 | |||
291 | def get(self): |
||
292 | """ |
||
293 | Generate path, dirs, files tuple for each path in directory. Executes filters if branches are not None |
||
294 | :return: |
||
295 | """ |
||
296 | for path, dirs, files in os.walk(self.directory): |
||
297 | folders = path[self.start:].split(os.sep) |
||
298 | if self.branches: |
||
299 | if self._filter(folders, 'folders'): |
||
300 | files = dict.fromkeys(files) |
||
301 | parent = reduce(dict.get, folders[:-1], self.tree_dict) |
||
302 | parent[folders[-1]] = files |
||
303 | else: |
||
304 | files = dict.fromkeys(files) |
||
305 | parent = reduce(dict.get, folders[:-1], self.tree_dict) |
||
306 | parent[folders[-1]] = files |
||
307 | return self.tree_dict |
||
308 | |||
309 | |||
310 | View Code Duplication | def gui(): |
|
311 | from dirutility.gui import WalkGUI |
||
312 | gui = WalkGUI() |
||
313 | params = gui.parsing() |
||
314 | parse = params['parse'] |
||
315 | |||
316 | paths = DirPaths(parse['directory'], |
||
317 | console_stream=parse['console_stream'], |
||
318 | parallelize=parse['parallelize'], |
||
319 | max_level=parse['max_level'], |
||
320 | non_empty_folders=parse['non_empty_folders']).walk() |
||
321 | |||
322 | if params['save']: |
||
323 | from databasetools import CSVExport, DictTools |
||
324 | save = params['save'] |
||
325 | if save['csv']: |
||
326 | CSVExport(list(paths), |
||
327 | cols=['files'], |
||
328 | file_path=save['directory'], |
||
329 | file_name=os.path.basename(parse['directory'])) |
||
330 | if save['json']: |
||
331 | DictTools(save['directory'], os.path.basename(parse['directory'])).save(list(paths)) |
||
332 | print('Done!') |
||
333 | |||
334 | |||
335 | if __name__ == "__main__": |
||
336 | gui() |
||
337 |