dirutility.walk.walk.Printer.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 4
Ratio 100 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 4
loc 4
rs 10
c 0
b 0
f 0
1
import os
2
import platform
3
import shutil
4
from datetime import datetime
5
from functools import reduce
6
from hashlib import md5
7
from math import inf
8
from multiprocessing import cpu_count
9
from multiprocessing.pool import Pool
10
from operator import itemgetter
11
from pathlib import Path
12
13
from looptools import Timer
14
15
from dirutility.walk.filter import PathFilters
16
from dirutility.walk.multiprocess import Sprinter
17
from dirutility.walk.sequential import Crawler
18
19
20 View Code Duplication
class Printer:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
21
22
    def __init__(self, console_output, console_stream):
23
        """Printer function initialized with classes. Used for optional printing"""
24
        self.console_output = console_output
25
        self.console_stream = console_stream
26
27
    def printer(self, message, stream=False):
28
        if not stream:
29
            if self.console_output:
30
                print('\t' + message)
31
        else:
32
            if self.console_stream:
33
                print('\t' + message)
34
35
36
def pool_process(func, iterable, process_name='Pool processing', cpus=cpu_count()):
37
    """
38
    Apply a function to each element in an iterable and return a result list.
39
40
    :param func: A function that returns a value
41
    :param iterable: A list or set of elements to be passed to the func as the singular parameter
42
    :param process_name: Name of the process, for printing purposes only
43
    :param cpus: Number of CPUs
44
    :return: Result list
45
    """
46
    with Timer('\t{0} ({1}) completed in'.format(process_name, str(func))):
47
        pool = Pool(cpus)
48
        vals = pool.map(func, iterable)
49
        pool.close()
50
    return vals
51
52
53
def md5_hash(file_path):
54
    """Open a file path and hash the contents."""
55
    with open(file_path, 'rb') as fp:
56
        return md5(fp.read()).hexdigest()
57
58
59
def md5_tuple(file_path):
60
    """Returns a file_path, hash tuple."""
61
    return file_path, md5_hash(file_path)
62
63
64
def pool_hash(path_list):
65
    """Pool process file hashing."""
66
    return pool_process(md5_tuple, path_list, 'MD5 hashing')
67
68
69
def remover(file_path):
70
    """Delete a file or directory path only if it exists."""
71
    if os.path.isfile(file_path):
72
        os.remove(file_path)
73
        return True
74
    elif os.path.isdir(file_path):
75
        shutil.rmtree(file_path)
76
        return True
77
    else:
78
        return False
79
80
81 View Code Duplication
def creation_date(path_to_file, return_datetime=True):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
82
    """
83
    Retrieve a file's creation date.
84
85
    Try to get the date that a file was created, falling back to when it was
86
    last modified if that isn't possible.
87
88
    See http://stackoverflow.com/a/39501288/1709587 for explanation.
89
90
    :param path_to_file: File path
91
    :param return_datetime: Bool, returns value in Datetime format
92
    :return: Creation date
93
    """
94
    if platform.system() == 'Windows':
95
        created_at = os.path.getctime(path_to_file)
96
    else:
97
        stat = os.stat(path_to_file)
98
        try:
99
            created_at = stat.st_birthtime
100
        except AttributeError:
101
            # We're probably on Linux. No easy way to get creation dates here,
102
            # so we'll settle for when its content was last modified.
103
            created_at = stat.st_mtime
104
105
    if return_datetime:
106
        return datetime.fromtimestamp(created_at)
107
    else:
108
        return created_at
109
110
111
def creation_date_tuple(file_path):
112
    """Returns a (file_path, creation_date) tuple."""
113
    return file_path, creation_date(file_path)
114
115
116
def pool_creation_date(path_list):
117
    """Pool process file creation dates."""
118
    return pool_process(creation_date_tuple, path_list, 'File creation dates')
119
120
121 View Code Duplication
class DirPaths:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
122
123
    def __init__(self,
124
                 directory,
125
                 full_paths=False,
126
                 topdown=True,
127
                 to_include=None,
128
                 to_exclude=None,
129
                 min_level=0,
130
                 max_level=inf,
131
                 filters=None,
132
                 non_empty_folders=False,
133
                 parallelize=False,
134
                 pool_size=cpu_count(),
135
                 console_output=False,
136
                 console_stream=False,
137
                 hash_files=False):
138
        """
139
        This class generates a list of either files and or folders within a root directory.
140
141
        The walk method generates a directory list of files by walking the file tree top down or bottom up.  The
142
        files and folders method generate a list of files or folders in the top level of the  tree.
143
144
        :param directory: Starting directory file path
145
        :param full_paths: Bool, when true full paths are concatenated to file paths list
146
        :param topdown: Bool, when true walk method walks tree from the topdwon. When false tree is walked bottom up
147
        :param to_include: None by default.  List of filters acceptable to find within file path string return
148
        :param to_exclude: None by default.  List of filters NOT acceptable to return
149
        :param min_level: 0 by default.  Minimum directory level to save paths from
150
        :param max_level: Infinity by default.  Maximum directory level to save paths from
151
        :param parallelize: Bool, when true pool processing is enabled within walk method
152
        :param pool_size: Number of CPUs for pool processing, default is number of processors
153
        :param console_output: Bool, when true console output is printed
154
        :param console_stream: Bool, when true loops print live results
155
        :param hash_files: Bool, when true walk() method return a dictionary file_paths and hashes
156
        """
157
        self.timer = Timer()
158
        self.full_paths = full_paths
159
        self.topdown = topdown
160
161
        # Exclude .DS_Store by default, set to_exclude to False to include .DS_Store
162
        to_exclude = ['.DS_Store'] if to_exclude is None else to_exclude
163
        if any(i for i in [to_include, to_exclude, filters]) or min_level != 0 or max_level != inf:
164
            self.filters = PathFilters(to_include, to_exclude, min_level, max_level, filters, non_empty_folders)
165
        else:
166
            self.filters = False
167
168
        self.console_output = console_output
169
        self.console_stream = console_stream
170
        self._hash_files = hash_files
171
172
        self._printer = Printer(console_output, console_stream).printer
173
        self._printer('DIRPATHS')
174
175
        # Check that parallelization is enabled
176
        if parallelize:
177
            self.pool_size = pool_size
178
        self.parallelize = parallelize
179
        self.filepaths = []
180
181
        # Check if directory is a singular (1) string or if it is a list of strings (multiple)
182
        try:
183
            self.directory = [str(directory)]
184
        except TypeError:
185
            self.directory = [str(dirs) for dirs in directory]
186
187
    def __iter__(self):
188
        return iter(list(self.filepaths))
189
190
    def __str__(self):
191
        return str(self.filepaths)
192
193
    def __len__(self):
194
        return len(self.filepaths)
195
196
    def _get_filepaths(self):
197
        """Filters list of file paths to remove non-included, remove excluded files and concatenate full paths."""
198
        self._printer(str(self.__len__()) + " file paths have been parsed in " + str(self.timer.end))
199
        if self._hash_files:
200
            return pool_hash(self.filepaths)
201
        else:
202
            return self.filepaths
203
204
    def creation_dates(self, sort=True):
205
        """
206
        Return a list of (file_path, creation_date) tuples created from list of walked paths.
207
208
        :param sort: Bool, sorts file_paths on created_date from newest to oldest.
209
        :return: List of (file_path, created_date) tuples.
210
        """
211
        if not sort:
212
            return pool_creation_date(self.filepaths)
213
        else:
214
            pcd = pool_creation_date(self.filepaths)
215
            pcd.sort(key=itemgetter(1), reverse=True)
216
            return pcd
217
218
    def walk(self):
219
        """
220
        Default file path retrieval function.
221
        sprinter() - Generates file path list using pool processing and Queues
222
        crawler() - Generates file path list using os.walk() in sequence
223
        """
224
        if self.parallelize:
225
            self.filepaths = Sprinter(self.directory, self.filters, self.full_paths, self.pool_size,
226
                                      self._printer).sprinter()
227
        else:
228
            self.filepaths = Crawler(self.directory, self.filters, self.full_paths, self.topdown,
229
                                     self._printer).crawler()
230
        return self._get_filepaths()
231
232
    def files(self):
233
        """Return list of files in root directory"""
234
        self._printer('\tFiles Walk')
235
        for directory in self.directory:
236
            for path in os.listdir(directory):
237
                full_path = os.path.join(directory, path)
238
                if os.path.isfile(full_path):
239
                    if not path.startswith('.'):
240
                        self.filepaths.append(full_path)
241
        return self._get_filepaths()
242
243
    def folders(self):
244
        """Return list of folders in root directory"""
245
        for directory in self.directory:
246
            for path in os.listdir(directory):
247
                full_path = os.path.join(directory, path)
248
                if os.path.isdir(full_path):
249
                    if not path.startswith('.'):
250
                        self.filepaths.append(full_path)
251
        return self._get_filepaths()
252
253
254 View Code Duplication
class DirTree:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
255
256
    def __init__(self, root, branches=None):
257
        """
258
        Generate a tree dictionary of the contents of a root directory.
259
        :param root: Starting directory
260
        :param branches: List of function tuples used for filtering
261
        """
262
        self.tree_dict = {}
263
        self.directory = Path(root)
264
        self.start = str(self.directory).rfind(os.sep) + 1
265
        self.branches = branches
266
        self.get()
267
268
    def __iter__(self):
269
        return iter(self.tree_dict.items())
270
271
    def __str__(self):
272
        return str(self.tree_dict)
273
274
    @property
275
    def dict(self):
276
        return self.tree_dict
277
278
    def _filter(self, folders, folder_or_file):
279
        for index in range(0, len(folders)):
280
            filters = self.branches[index][folder_or_file]
281
            if filters:
282
                exclude = filters.get
283
                include = filters.get
284
285
                if exclude and folders[index] in exclude:
286
                    return False
287
                if include and folders[index] not in include:
288
                    return False
289
        return True
290
291
    def get(self):
292
        """
293
        Generate path, dirs, files tuple for each path in directory.  Executes filters if branches are not None
294
        :return:
295
        """
296
        for path, dirs, files in os.walk(self.directory):
297
            folders = path[self.start:].split(os.sep)
298
            if self.branches:
299
                if self._filter(folders, 'folders'):
300
                    files = dict.fromkeys(files)
301
                    parent = reduce(dict.get, folders[:-1], self.tree_dict)
302
                    parent[folders[-1]] = files
303
            else:
304
                files = dict.fromkeys(files)
305
                parent = reduce(dict.get, folders[:-1], self.tree_dict)
306
                parent[folders[-1]] = files
307
        return self.tree_dict
308
309
310 View Code Duplication
def gui():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
311
    from dirutility.gui import WalkGUI
312
    gui = WalkGUI()
313
    params = gui.parsing()
314
    parse = params['parse']
315
316
    paths = DirPaths(parse['directory'],
317
                     console_stream=parse['console_stream'],
318
                     parallelize=parse['parallelize'],
319
                     max_level=parse['max_level'],
320
                     non_empty_folders=parse['non_empty_folders']).walk()
321
322
    if params['save']:
323
        from databasetools import CSVExport, DictTools
324
        save = params['save']
325
        if save['csv']:
326
            CSVExport(list(paths),
327
                      cols=['files'],
328
                      file_path=save['directory'],
329
                      file_name=os.path.basename(parse['directory']))
330
        if save['json']:
331
            DictTools(save['directory'], os.path.basename(parse['directory'])).save(list(paths))
332
    print('Done!')
333
334
335
if __name__ == "__main__":
336
    gui()
337