dirutility.walk.multiprocess.Sprinter.__init__()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 17

Duplication

Lines 23
Ratio 100 %

Importance

Changes 0
Metric Value
cc 3
eloc 17
nop 6
dl 23
loc 23
rs 9.55
c 0
b 0
f 0
1
import os
2
from multiprocessing import Manager
3
from multiprocessing.pool import Pool
4
5
6 View Code Duplication
class Sprinter:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
7
8
    def __init__(self, directory, filters, full_paths, pool_size, _printer):
9
        """DirPaths sub class for directory parsing using parallel processing."""
10
        self.directory = directory
11
        self.filters = filters
12
        self.pool_size = pool_size
13
        self._printer = _printer
14
15
        if self.filters:
16
            self._printer('Filtering enabled')
17
            self.explore_path = self.explore_path_filter
18
        else:
19
            self._printer('Filtering disabled')
20
            self.explore_path = self.explore_path_encompass
21
22
        self.filepaths = Manager().list()
23
        self.unsearched = Manager().Queue()
24
25
        if full_paths:
26
            self._printer('Absolute paths')
27
            self.add_path = self._add_filepath_absolute
28
        else:
29
            self._printer('Relative paths')
30
            self.add_path = self._add_filepath_relative
31
32
    def __iter__(self):
33
        return iter(self.filepaths)
34
35
    def __len__(self):
36
        return len(self.filepaths)
37
38
    def _add_filepath_relative(self, paths):
39
        for directory, fullname in paths:
40
            self.filepaths.append(fullname)
41
42
    def _add_filepath_absolute(self, paths):
43
        for directory, fullname in paths:
44
            self.filepaths.append(os.path.join(directory, fullname))
45
46
    def _get_root_files(self, directory):
47
        """Retrieve files within the root directory"""
48
        if len(self.filepaths) == 0:
49
            if self.filters:
50
                root_files = [(directory, f) for f in os.listdir(directory)
51
                              if os.path.isfile(os.path.join(directory, f)) and self.filters.validate(f)
52
                              and self.filters.get_level(f) == self.filters.max_level]
53
            else:
54
                root_files = [(directory, f) for f in os.listdir(directory)
55
                              if os.path.isfile(os.path.join(directory, f))]
56
            self.add_path(root_files)
57
58
    def explore_path_filter(self, task_num, dirpath):
59
        """
60
        Explore path to discover unsearched directories and save filepaths
61
        :param task_num: Processor ID
62
        :param dirpath: Tuple (base directory, path), path information pulled from unsearched Queue
63
        :return: Directories to add to unsearched Queue
64
        """
65
        base, path = dirpath
66
        directories = []
67
        nondirectories = []
68
        if self.filters.validate(path):
69
            self._printer("Task: " + str(task_num) + " >>> Explored path: " + path, stream=True)
70
            # Loop through paths
71
            for filename in os.listdir(base + os.sep + path):
72
                fullname = os.path.join(path, filename)
73
                if self.filters.validate(fullname):
74
                    # Check that non-empty folders flag is on and we're at the max directory level
75
                    if self.filters.non_empty_folders and self.filters.get_level(fullname) == self.filters.max_level:
76
                        # Check that the path is not an empty folder
77
                        if os.path.isdir(base + os.sep + fullname):
78
                            # Get paths in folder without walking directory
79
                            paths = os.listdir(base + os.sep + fullname)
80
81
                            # Check that any of the paths are files and not just directories
82
                            if paths and any(os.path.isfile(os.path.join(base, fullname, p)) for p in paths):
83
                                nondirectories.append((base, fullname))
84
                    else:
85
                        # Append to directories if dir
86
                        if os.path.isdir(base + os.sep + fullname):
87
                            directories.append((base, fullname))
88
                        # Pass filters and append to nondirectories if file
89
                        elif self.filters.validate(fullname):
90
                            nondirectories.append((base, fullname))
91
            self.add_path(nondirectories)
92
        return directories
93
94
    def explore_path_encompass(self, task_num, dirpath):
95
        """
96
        Explore path to discover unsearched directories and save filepaths
97
        :param task_num: Processor ID
98
        :param dirpath: Tuple (base directory, path), path information pulled from unsearched Queue
99
        :return: Directories to add to unsearched Queue
100
        """
101
        base, path = dirpath
102
        directories = []
103
        nondirectories = []
104
        self._printer("Task: " + str(task_num) + " >>> Explored path: " + path, stream=True)
105
        # Loop through paths
106
        for filename in os.listdir(base + os.sep + path):
107
            fullname = os.path.join(path, filename)
108
            # Append to directories if dir
109
            if os.path.isdir(base + os.sep + fullname):
110
                directories.append((base, fullname))
111
            # Pass filters and append to nondirectories if file
112
            else:
113
                nondirectories.append((base, fullname))
114
        self.add_path(nondirectories)
115
        return directories
116
117
    def parallel_worker(self, task_num):
118
        """
119
        Continuously pulls directories from the Queue until it is empty.
120
        Gets child directories from parent and adds them to Queue.
121
        Executes task_done() to remove directory from unsearched Queue
122
        :param task_num: Processor ID
123
        """
124
        while True:
125
            base, path = self.unsearched.get()
126
            dirs = self.explore_path(task_num, (base, path))
127
            for newdir in dirs:
128
                self.unsearched.put(newdir)
129
            self.unsearched.task_done()
130
131
    def sprinter(self):
132
        """
133
        Called when parallelize is True.
134
        This function will generate the file names in a directory tree by adding directories to a Queue and
135
        continuously exploring directories in the Queue until Queue is emptied.
136
        Significantly faster than crawler method for larger directory trees.
137
        """
138
        self._printer('Multiprocess Walk')
139
        # Loop through directories in case there is more than one (1)
140
        for directory in self.directory:
141
            self._get_root_files(directory)    # Add file within root directory if filepaths is empty
142
            # acquire the list of paths
143
            first_level_dirs = next(os.walk(directory))[1]
144
            for path in first_level_dirs:
145
                self.unsearched.put((directory, path))
146
        self._printer('Pool Processing STARTED')
147
        pool = Pool(self.pool_size)
148
        pool.map_async(self.parallel_worker, range(self.pool_size))
149
        pool.close()
150
        self.unsearched.join()
151
        self._printer('Pool Processing ENDED')
152
        return self.filepaths
153