FileSystem.get_folder_path()   F
last analyzed

↳ Parent: FileSystem

Complexity

Conditions 11

Duplication

Lines 0
Ratio 0 %

Size

Total Lines 45

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 11
c 1
b 0
f 0
dl 0
loc 45
rs 3.1764

How to fix   Complexity   

Complexity

Complex classes like FileSystem.get_folder_path() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2015-01-Jan/Chicago
27
        self.default_folder_path_definition = [
28
            ('date', '%Y-%m-%b'), ('location', '%city')
29
        ]
30
        self.cached_folder_path_definition = None
31
32
    def create_directory(self, directory_path):
33
        """Create a directory if it does not already exist.
34
35
        :param str directory_name: A fully qualified path of the
36
            to create.
37
        :returns: bool
38
        """
39
        try:
40
            if os.path.exists(directory_path):
41
                return True
42
            else:
43
                os.makedirs(directory_path)
44
                return True
45
        except OSError:
46
            # OSError is thrown for cases like no permission
47
            pass
48
49
        return False
50
51
    def delete_directory_if_empty(self, directory_path):
52
        """Delete a directory only if it's empty.
53
54
        Instead of checking first using `len([name for name in
55
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
56
57
        :param str directory_name: A fully qualified path of the directory
58
            to delete.
59
        """
60
        try:
61
            os.rmdir(directory_path)
62
            return True
63
        except OSError:
64
            pass
65
66
        return False
67
68
    def get_all_files(self, path, extensions=None):
69
        """Recursively get all files which match a path and extension.
70
71
        :param str path string: Path to start recursive file listing
72
        :param tuple(str) extensions: File extensions to include (whitelist)
73
        :returns: generator
74
        """
75
        # If extensions is None then we get all supported extensions
76
        if not extensions:
77
            extensions = set()
78
            subclasses = get_all_subclasses(Base)
79
            for cls in subclasses:
80
                extensions.update(cls.extensions)
81
82
        for dirname, dirnames, filenames in os.walk(path):
83
            for filename in filenames:
84
                # If file extension is in `extensions` then append to the list
85
                if os.path.splitext(filename)[1][1:].lower() in extensions:
86
                    yield os.path.join(dirname, filename)
87
88
    def get_current_directory(self):
89
        """Get the current working directory.
90
91
        :returns: str
92
        """
93
        return os.getcwd()
94
95
    def get_file_name(self, media):
96
        """Generate file name for a photo or video using its metadata.
97
98
        We use an ISO8601-like format for the file name prefix. Instead of
99
        colons as the separator for hours, minutes and seconds we use a hyphen.
100
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
101
102
        :param media: A Photo or Video instance
103
        :type media: :class:`~elodie.media.photo.Photo` or
104
            :class:`~elodie.media.video.Video`
105
        :returns: str or None for non-photo or non-videos
106
        """
107
        if(not media.is_valid()):
108
            return None
109
110
        metadata = media.get_metadata()
111
        if(metadata is None):
112
            return None
113
114
        # First we check if we have metadata['original_name'].
115
        # We have to do this for backwards compatibility because
116
        #   we original did not store this back into EXIF.
117
        if(metadata['original_name'] is not None):
118
            base_name = os.path.splitext(metadata['original_name'])[0]
119
        else:
120
            # If the file has EXIF title we use that in the file name
121
            #   (i.e. my-favorite-photo-img_1234.jpg)
122
            # We want to remove the date prefix we add to the name.
123
            # This helps when re-running the program on file which were already
124
            #   processed.
125
            base_name = re.sub(
126
                '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
127
                '',
128
                metadata['base_name']
129
            )
130
            if(len(base_name) == 0):
131
                base_name = metadata['base_name']
132
133
        if(
134
            'title' in metadata and
135
            metadata['title'] is not None and
136
            len(metadata['title']) > 0
137
        ):
138
            title_sanitized = re.sub('\W+', '-', metadata['title'].strip())
139
            base_name = base_name.replace('-%s' % title_sanitized, '')
140
            base_name = '%s-%s' % (base_name, title_sanitized)
141
142
        file_name = '%s-%s.%s' % (
143
            time.strftime(
144
                '%Y-%m-%d_%H-%M-%S',
145
                metadata['date_taken']
146
            ),
147
            base_name,
148
            metadata['extension'])
149
        return file_name.lower()
150
151
    def get_folder_path_definition(self):
152
        # If we've done this already then return it immediately without
153
        # incurring any extra work
154
        if self.cached_folder_path_definition is not None:
155
            return self.cached_folder_path_definition
156
157
        config = load_config()
158
159
        # If Directory is in the config we assume full_path and its
160
        #  corresponding values (date, location) are also present
161
        if('Directory' not in config):
162
            return self.default_folder_path_definition
163
164
        config_directory = config['Directory']
165
166
        path_parts = re.search(
167
                         '\%([^/]+)\/\%([^/]+)',
168
                         config_directory['full_path']
169
                     )
170
171
        if not path_parts or len(path_parts.groups()) != 2:
172
            return self.default_folder_path_definition
173
174
        path_part_groups = path_parts.groups()
175
        self.cached_folder_path_definition = [
176
            (path_part_groups[0], config_directory[path_part_groups[0]]),
177
            (path_part_groups[1], config_directory[path_part_groups[1]]),
178
        ]
179
        return self.cached_folder_path_definition
180
181
    def get_folder_path(self, metadata):
182
        """Get folder path by various parameters.
183
184
        :param metadata dict: Metadata dictionary.
185
        :returns: str
186
        """
187
        path_parts = self.get_folder_path_definition()
188
        path = []
189
        for path_part in path_parts:
190
            part, mask = path_part
191
            if part == 'date':
192
                path.append(time.strftime(mask, metadata['date_taken']))
193
            elif part == 'location':
194
                if(
195
                    metadata['latitude'] is not None and
196
                    metadata['longitude'] is not None
197
                ):
198
                    place_name = geolocation.place_name(
199
                        metadata['latitude'],
200
                        metadata['longitude']
201
                    )
202
                    if(place_name is not None):
203
                        location_parts = re.findall('(%[^%]+)', mask)
204
                        parsed_folder_name = self.parse_mask_for_location(
205
                            mask,
206
                            location_parts,
207
                            place_name,
208
                        )
209
                        path.append(parsed_folder_name)
210
211
        # For now we always make the leaf folder an album if it's in the EXIF.
212
        # This is to preserve backwards compatability until we figure out how
213
        # to include %album in the config.ini syntax.
214
        if(metadata['album'] is not None):
215
            if(len(path) == 1):
216
                path.append(metadata['album'])
217
            elif(len(path) == 2):
218
                path[1] = metadata['album']
219
220
        # if we don't have a 2nd level directory we use 'Unknown Location'
221
        if(len(path) < 2):
222
            path.append('Unknown Location')
223
224
        # return '/'.join(path[::-1])
225
        return os.path.join(*path)
226
227
    def parse_mask_for_location(self, mask, location_parts, place_name):
228
        """Takes a mask for a location and interpolates the actual place names.
229
230
        Given these parameters here are the outputs.
231
232
        mask=%city
233
        location_parts=[('%city','%city','city')]
234
        place_name={'city': u'Sunnyvale'}
235
        output=Sunnyvale
236
237
        mask=%city-%state
238
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
239
        place_name={'city': u'Sunnyvale', 'state': u'California'}
240
        output=Sunnyvale-California
241
242
        mask=%country
243
        location_parts=[('%country','%country','country')]
244
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
245
        output=Sunnyvale
246
247
248
        :param str mask: The location mask in the form of %city-%state, etc
249
        :param list location_parts: A list of tuples in the form of
250
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
251
        :param dict place_name: A dictionary of place keywords and names like
252
            {'default': u'California', 'state': u'California'}
253
        :returns: str
254
        """
255
        found = False
256
        folder_name = mask
257
        for loc_part in location_parts:
258
            # We assume the search returns a tuple of length 2.
259
            # If not then it's a bad mask in config.ini.
260
            # loc_part = '%country-random'
261
            # component_full = '%country-random'
262
            # component = '%country'
263
            # key = 'country
264
            component_full, component, key = re.search(
265
                '((%([a-z]+))[^%]*)',
266
                loc_part
267
            ).groups()
268
269
            if(key in place_name):
270
                found = True
271
                replace_target = component
272
                replace_with = place_name[key]
273
            else:
274
                replace_target = component_full
275
                replace_with = ''
276
277
            folder_name = folder_name.replace(
278
                replace_target,
279
                replace_with,
280
            )
281
282
        if(not found and folder_name == ''):
283
            folder_name = place_name['default']
284
285
        return folder_name
286
287
    def process_file(self, _file, destination, media, **kwargs):
288
        move = False
289
        if('move' in kwargs):
290
            move = kwargs['move']
291
292
        allow_duplicate = False
293
        if('allowDuplicate' in kwargs):
294
            allow_duplicate = kwargs['allowDuplicate']
295
296
        if(not media.is_valid()):
297
            print('%s is not a valid media file. Skipping...' % _file)
298
            return
299
300
        media.set_original_name()
301
        metadata = media.get_metadata()
302
303
        directory_name = self.get_folder_path(metadata)
304
305
        dest_directory = os.path.join(destination, directory_name)
306
        file_name = self.get_file_name(media)
307
        dest_path = os.path.join(dest_directory, file_name)
308
309
        db = Db()
310
        checksum = db.checksum(_file)
311
        if(checksum is None):
312
            log.info('Could not get checksum for %s. Skipping...' % _file)
313
            return
314
315
        # If duplicates are not allowed then we check if we've seen this file
316
        #  before via checksum. We also check that the file exists at the
317
        #   location we believe it to be.
318
        # If we find a checksum match but the file doesn't exist where we
319
        #  believe it to be then we write a debug log and proceed to import.
320
        checksum_file = db.get_hash(checksum)
321
        if(allow_duplicate is False and checksum_file is not None):
322
            if(os.path.isfile(checksum_file)):
323
                log.info('%s already exists at %s. Skipping...' % (
324
                    _file,
325
                    checksum_file
326
                ))
327
                return
328
            else:
329
                log.info('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
330
                    _file,
331
                    checksum_file
332
                ))
333
334
        self.create_directory(dest_directory)
335
336
        if(move is True):
337
            stat = os.stat(_file)
338
            shutil.move(_file, dest_path)
339
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
340
        else:
341
            # Do not use copy2(), will have an issue when copying to a
342
            # network/mounted drive using copy and manual
343
            # set_date_from_filename gets the job done
344
            # shutil.copy seems slow, changing to streaming according to
345
            # http://stackoverflow.com/questions/22078621/python-how-to-copy-files-fast  # noqa
346
            compatability._copyfile(_file, dest_path)
347
            self.set_utime(media)
348
349
        db.add_hash(checksum, dest_path)
350
        db.update_hash_db()
351
352
        return dest_path
353
354
    def set_utime(self, media):
355
        """ Set the modification time on the file base on the file name.
356
        """
357
358
        # Initialize date taken to what's returned from the metadata function.
359
        # If the folder and file name follow a time format of
360
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
361
        file_path = media.get_file_path()
362
        metadata = media.get_metadata()
363
        date_taken = metadata['date_taken']
364
        base_name = metadata['base_name']
365
        year_month_day_match = re.search(
366
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
367
            base_name
368
        )
369
        if(year_month_day_match is not None):
370
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
371
            date_taken = time.strptime(
372
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
373
                '%Y-%m-%d %H:%M:%S'
374
            )
375
376
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
377
        else:
378
            # We don't make any assumptions about time zones and
379
            # assume local time zone.
380
            date_taken_in_seconds = time.mktime(date_taken)
381
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
382