Completed
Push — master ( e31238...9be5ab )
by Jaisen
01:04
created

FileSystem   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 399
Duplicated Lines 0 %

Importance

Changes 7
Bugs 0 Features 1
Metric Value
c 7
b 0
f 1
dl 0
loc 399
rs 4.8387
wmc 58

11 Methods

Rating   Name   Duplication   Size   Complexity  
B get_all_files() 0 19 6
C get_file_name() 0 55 9
A create_directory() 0 18 3
A get_current_directory() 0 6 1
A __init__() 0 11 1
A delete_directory_if_empty() 0 16 2
D get_folder_path_definition() 0 63 10
F get_folder_path() 0 44 9
B parse_mask_for_location() 0 59 5
D process_file() 0 67 10
B set_utime() 0 28 2

How to fix   Complexity   

Complex Class

Complex classes like FileSystem often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2015-01-Jan/Chicago
27
        self.default_folder_path_definition = {
28
            'date': '%Y-%m-%b',
29
            'location': '%city',
30
            'full_path': '%date/%album|%location|"{}"'.format(
31
                            geolocation.__DEFAULT_LOCATION__
32
                         ),
33
        }
34
        self.cached_folder_path_definition = None
35
        self.default_parts = ['album', 'city', 'state', 'country']
36
37
    def create_directory(self, directory_path):
38
        """Create a directory if it does not already exist.
39
40
        :param str directory_name: A fully qualified path of the
41
            to create.
42
        :returns: bool
43
        """
44
        try:
45
            if os.path.exists(directory_path):
46
                return True
47
            else:
48
                os.makedirs(directory_path)
49
                return True
50
        except OSError:
51
            # OSError is thrown for cases like no permission
52
            pass
53
54
        return False
55
56
    def delete_directory_if_empty(self, directory_path):
57
        """Delete a directory only if it's empty.
58
59
        Instead of checking first using `len([name for name in
60
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
61
62
        :param str directory_name: A fully qualified path of the directory
63
            to delete.
64
        """
65
        try:
66
            os.rmdir(directory_path)
67
            return True
68
        except OSError:
69
            pass
70
71
        return False
72
73
    def get_all_files(self, path, extensions=None):
74
        """Recursively get all files which match a path and extension.
75
76
        :param str path string: Path to start recursive file listing
77
        :param tuple(str) extensions: File extensions to include (whitelist)
78
        :returns: generator
79
        """
80
        # If extensions is None then we get all supported extensions
81
        if not extensions:
82
            extensions = set()
83
            subclasses = get_all_subclasses(Base)
84
            for cls in subclasses:
85
                extensions.update(cls.extensions)
86
87
        for dirname, dirnames, filenames in os.walk(path):
88
            for filename in filenames:
89
                # If file extension is in `extensions` then append to the list
90
                if os.path.splitext(filename)[1][1:].lower() in extensions:
91
                    yield os.path.join(dirname, filename)
92
93
    def get_current_directory(self):
94
        """Get the current working directory.
95
96
        :returns: str
97
        """
98
        return os.getcwd()
99
100
    def get_file_name(self, media):
101
        """Generate file name for a photo or video using its metadata.
102
103
        We use an ISO8601-like format for the file name prefix. Instead of
104
        colons as the separator for hours, minutes and seconds we use a hyphen.
105
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
106
107
        :param media: A Photo or Video instance
108
        :type media: :class:`~elodie.media.photo.Photo` or
109
            :class:`~elodie.media.video.Video`
110
        :returns: str or None for non-photo or non-videos
111
        """
112
        if(not media.is_valid()):
113
            return None
114
115
        metadata = media.get_metadata()
116
        if(metadata is None):
117
            return None
118
119
        # First we check if we have metadata['original_name'].
120
        # We have to do this for backwards compatibility because
121
        #   we original did not store this back into EXIF.
122
        if('original_name' in metadata and metadata['original_name']):
123
            base_name = os.path.splitext(metadata['original_name'])[0]
124
        else:
125
            # If the file has EXIF title we use that in the file name
126
            #   (i.e. my-favorite-photo-img_1234.jpg)
127
            # We want to remove the date prefix we add to the name.
128
            # This helps when re-running the program on file which were already
129
            #   processed.
130
            base_name = re.sub(
131
                '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
132
                '',
133
                metadata['base_name']
134
            )
135
            if(len(base_name) == 0):
136
                base_name = metadata['base_name']
137
138
        if(
139
            'title' in metadata and
140
            metadata['title'] is not None and
141
            len(metadata['title']) > 0
142
        ):
143
            title_sanitized = re.sub('\W+', '-', metadata['title'].strip())
144
            base_name = base_name.replace('-%s' % title_sanitized, '')
145
            base_name = '%s-%s' % (base_name, title_sanitized)
146
147
        file_name = '%s-%s.%s' % (
148
            time.strftime(
149
                '%Y-%m-%d_%H-%M-%S',
150
                metadata['date_taken']
151
            ),
152
            base_name,
153
            metadata['extension'])
154
        return file_name.lower()
155
156
    def get_folder_path_definition(self):
157
        """Returns a list of folder definitions.
158
159
        Each element in the list represents a folder.
160
        Fallback folders are supported and are nested lists.
161
        Return values take the following form.
162
        [
163
            ('date', '%Y-%m-%d'),
164
            [
165
                ('location', '%city'),
166
                ('album', ''),
167
                ('"Unknown Location", '')
168
            ]
169
        ]
170
171
        :returns: list
172
        """
173
        # If we've done this already then return it immediately without
174
        # incurring any extra work
175
        if self.cached_folder_path_definition is not None:
176
            return self.cached_folder_path_definition
177
178
        config = load_config()
179
180
        # If Directory is in the config we assume full_path and its
181
        #  corresponding values (date, location) are also present
182
        config_directory = self.default_folder_path_definition
183
        if('Directory' in config):
184
            config_directory = config['Directory']
185
186
        # Find all subpatterns of full_path that map to directories.
187
        #  I.e. %foo/%bar => ['foo', 'bar']
188
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
189
        path_parts = re.findall(
190
                         '(\%[^/]+)',
191
                         config_directory['full_path']
192
                     )
193
194
        if not path_parts or len(path_parts) == 0:
195
            return self.default_folder_path_definition
196
197
        self.cached_folder_path_definition = []
198
        for part in path_parts:
199
            if part in config_directory:
200
                part = part[1:]
201
                self.cached_folder_path_definition.append(
202
                    [(part, config_directory[part])]
203
                )
204
            elif part in self.default_parts:
205
                part = part[1:]
206
                self.cached_folder_path_definition.append(
207
                    [(part, '')]
208
                )
209
            else:
210
                this_part = []
211
                for p in part.split('|'):
212
                    p = p[1:]
213
                    this_part.append(
214
                        (p, config_directory[p] if p in config_directory else '')
215
                    )
216
                self.cached_folder_path_definition.append(this_part)
217
218
        return self.cached_folder_path_definition
219
220
    def get_folder_path(self, metadata):
221
        """Given a media's metadata this function returns the folder path as a string.
222
223
        :param metadata dict: Metadata dictionary.
224
        :returns: str
225
        """
226
        path_parts = self.get_folder_path_definition()
227
        path = []
228
        for path_part in path_parts:
229
            # We support fallback values so that
230
            #  'album|city|"Unknown Location"
231
            #  %album|%city|"Unknown Location" results in
232
            #  My Album - when an album exists
233
            #  Sunnyvale - when no album exists but a city exists
234
            #  Unknown Location - when neither an album nor location exist
235
            for this_part in path_part:
236
                part, mask = this_part
237
                if part in ('date', 'day', 'month', 'year'):
238
                    path.append(
239
                        time.strftime(mask, metadata['date_taken'])
240
                    )
241
                    break
242
                elif part in ('location', 'city', 'state', 'country'):
243
                    place_name = geolocation.place_name(
244
                        metadata['latitude'],
245
                        metadata['longitude']
246
                    )
247
248
                    location_parts = re.findall('(%[^%]+)', mask)
249
                    parsed_folder_name = self.parse_mask_for_location(
250
                        mask,
251
                        location_parts,
252
                        place_name,
253
                    )
254
                    path.append(parsed_folder_name)
255
                    break
256
                elif part in ('album'):
257
                    if metadata['album']:
258
                        path.append(metadata['album'])
259
                        break
260
                elif part.startswith('"') and part.endswith('"'):
261
                    path.append(part[1:-1])
262
263
        return os.path.join(*path)
264
265
    def parse_mask_for_location(self, mask, location_parts, place_name):
266
        """Takes a mask for a location and interpolates the actual place names.
267
268
        Given these parameters here are the outputs.
269
270
        mask=%city
271
        location_parts=[('%city','%city','city')]
272
        place_name={'city': u'Sunnyvale'}
273
        output=Sunnyvale
274
275
        mask=%city-%state
276
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
277
        place_name={'city': u'Sunnyvale', 'state': u'California'}
278
        output=Sunnyvale-California
279
280
        mask=%country
281
        location_parts=[('%country','%country','country')]
282
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
283
        output=Sunnyvale
284
285
286
        :param str mask: The location mask in the form of %city-%state, etc
287
        :param list location_parts: A list of tuples in the form of
288
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
289
        :param dict place_name: A dictionary of place keywords and names like
290
            {'default': u'California', 'state': u'California'}
291
        :returns: str
292
        """
293
        found = False
294
        folder_name = mask
295
        for loc_part in location_parts:
296
            # We assume the search returns a tuple of length 2.
297
            # If not then it's a bad mask in config.ini.
298
            # loc_part = '%country-random'
299
            # component_full = '%country-random'
300
            # component = '%country'
301
            # key = 'country
302
            component_full, component, key = re.search(
303
                '((%([a-z]+))[^%]*)',
304
                loc_part
305
            ).groups()
306
307
            if(key in place_name):
308
                found = True
309
                replace_target = component
310
                replace_with = place_name[key]
311
            else:
312
                replace_target = component_full
313
                replace_with = ''
314
315
            folder_name = folder_name.replace(
316
                replace_target,
317
                replace_with,
318
            )
319
320
        if(not found and folder_name == ''):
321
            folder_name = place_name['default']
322
323
        return folder_name
324
325
    def process_file(self, _file, destination, media, **kwargs):
326
        move = False
327
        if('move' in kwargs):
328
            move = kwargs['move']
329
330
        allow_duplicate = False
331
        if('allowDuplicate' in kwargs):
332
            allow_duplicate = kwargs['allowDuplicate']
333
334
        if(not media.is_valid()):
335
            print('%s is not a valid media file. Skipping...' % _file)
336
            return
337
338
        media.set_original_name()
339
        metadata = media.get_metadata()
340
341
        directory_name = self.get_folder_path(metadata)
342
343
        dest_directory = os.path.join(destination, directory_name)
344
        file_name = self.get_file_name(media)
345
        dest_path = os.path.join(dest_directory, file_name)
346
347
        db = Db()
348
        checksum = db.checksum(_file)
349
        if(checksum is None):
350
            log.info('Could not get checksum for %s. Skipping...' % _file)
351
            return
352
353
        # If duplicates are not allowed then we check if we've seen this file
354
        #  before via checksum. We also check that the file exists at the
355
        #   location we believe it to be.
356
        # If we find a checksum match but the file doesn't exist where we
357
        #  believe it to be then we write a debug log and proceed to import.
358
        checksum_file = db.get_hash(checksum)
359
        if(allow_duplicate is False and checksum_file is not None):
360
            if(os.path.isfile(checksum_file)):
361
                log.info('%s already exists at %s. Skipping...' % (
362
                    _file,
363
                    checksum_file
364
                ))
365
                return
366
            else:
367
                log.info('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
368
                    _file,
369
                    checksum_file
370
                ))
371
372
        # If source and destination are identical then
373
        #  we should not write the file. gh-210
374
        if(_file == dest_path):
375
            print('Final source and destination path should not be identical')
376
            return
377
378
        self.create_directory(dest_directory)
379
380
        if(move is True):
381
            stat = os.stat(_file)
382
            shutil.move(_file, dest_path)
383
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
384
        else:
385
            compatability._copyfile(_file, dest_path)
386
            self.set_utime(media)
387
388
        db.add_hash(checksum, dest_path)
389
        db.update_hash_db()
390
391
        return dest_path
392
393
    def set_utime(self, media):
394
        """ Set the modification time on the file based on the file name.
395
        """
396
397
        # Initialize date taken to what's returned from the metadata function.
398
        # If the folder and file name follow a time format of
399
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
400
        file_path = media.get_file_path()
401
        metadata = media.get_metadata()
402
        date_taken = metadata['date_taken']
403
        base_name = metadata['base_name']
404
        year_month_day_match = re.search(
405
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
406
            base_name
407
        )
408
        if(year_month_day_match is not None):
409
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
410
            date_taken = time.strptime(
411
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
412
                '%Y-%m-%d %H:%M:%S'
413
            )
414
415
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
416
        else:
417
            # We don't make any assumptions about time zones and
418
            # assume local time zone.
419
            date_taken_in_seconds = time.mktime(date_taken)
420
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
421