elodie.filesystem   F
last analyzed

Complexity

Total Complexity 85

Size/Duplication

Total Lines 609
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 302
dl 0
loc 609
rs 2
c 0
b 0
f 0
wmc 85

14 Methods

Rating   Name   Duplication   Size   Complexity  
D FileSystem.get_dynamic_path() 0 54 12
F FileSystem.get_file_name() 0 105 15
C FileSystem.get_folder_path_definition() 0 57 9
A FileSystem.get_current_directory() 0 6 1
B FileSystem.parse_mask_for_location() 0 59 5
A FileSystem.__init__() 0 21 1
A FileSystem.delete_directory_if_empty() 0 16 2
C FileSystem.get_file_name_definition() 0 59 9
A FileSystem.get_folder_path() 0 25 5
A FileSystem.create_directory() 0 18 3
B FileSystem.get_all_files() 0 19 6
A FileSystem.process_checksum() 0 26 5
C FileSystem.process_file() 0 80 10
A FileSystem.set_utime_from_metadata() 0 26 2

How to fix   Complexity   

Complexity

Complex classes like elodie.filesystem often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
27
        self.default_file_name_definition = {
28
            'date': '%Y-%m-%d_%H-%M-%S',
29
            'name': '%date-%original_name-%title.%extension',
30
        }
31
        # The default folder path is along the lines of 2015-01-Jan/Chicago
32
        self.default_folder_path_definition = {
33
            'date': '%Y-%m-%b',
34
            'location': '%city',
35
            'full_path': '%date/%album|%location|"{}"'.format(
36
                            geolocation.__DEFAULT_LOCATION__
37
                         ),
38
        }
39
        self.cached_file_name_definition = None
40
        self.cached_folder_path_definition = None
41
        # Python3 treats the regex \s differently than Python2.
42
        # It captures some additional characters like the unicode checkmark \u2713.
43
        # See build failures in Python3 here.
44
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
45
        self.whitespace_regex = '[ \t\n\r\f\v]+'
46
47
    def create_directory(self, directory_path):
48
        """Create a directory if it does not already exist.
49
50
        :param str directory_name: A fully qualified path of the
51
            to create.
52
        :returns: bool
53
        """
54
        try:
55
            if os.path.exists(directory_path):
56
                return True
57
            else:
58
                os.makedirs(directory_path)
59
                return True
60
        except OSError:
61
            # OSError is thrown for cases like no permission
62
            pass
63
64
        return False
65
66
    def delete_directory_if_empty(self, directory_path):
67
        """Delete a directory only if it's empty.
68
69
        Instead of checking first using `len([name for name in
70
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
71
72
        :param str directory_name: A fully qualified path of the directory
73
            to delete.
74
        """
75
        try:
76
            os.rmdir(directory_path)
77
            return True
78
        except OSError:
79
            pass
80
81
        return False
82
83
    def get_all_files(self, path, extensions=None):
84
        """Recursively get all files which match a path and extension.
85
86
        :param str path string: Path to start recursive file listing
87
        :param tuple(str) extensions: File extensions to include (whitelist)
88
        :returns: generator
89
        """
90
        # If extensions is None then we get all supported extensions
91
        if not extensions:
92
            extensions = set()
93
            subclasses = get_all_subclasses(Base)
94
            for cls in subclasses:
95
                extensions.update(cls.extensions)
96
97
        for dirname, dirnames, filenames in os.walk(path):
98
            for filename in filenames:
99
                # If file extension is in `extensions` then append to the list
100
                if os.path.splitext(filename)[1][1:].lower() in extensions:
101
                    yield os.path.join(dirname, filename)
102
103
    def get_current_directory(self):
104
        """Get the current working directory.
105
106
        :returns: str
107
        """
108
        return os.getcwd()
109
110
    def get_file_name(self, media):
111
        """Generate file name for a photo or video using its metadata.
112
113
        Originally we hardcoded the file name to include an ISO date format.
114
        We use an ISO8601-like format for the file name prefix. Instead of
115
        colons as the separator for hours, minutes and seconds we use a hyphen.
116
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
117
118
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
119
        https://github.com/jmathai/elodie/pull/225
120
121
        :param media: A Photo or Video instance
122
        :type media: :class:`~elodie.media.photo.Photo` or
123
            :class:`~elodie.media.video.Video`
124
        :returns: str or None for non-photo or non-videos
125
        """
126
        if(not media.is_valid()):
127
            return None
128
129
        metadata = media.get_metadata()
130
        if(metadata is None):
131
            return None
132
133
        # Get the name template and definition.
134
        # Name template is in the form %date-%original_name-%title.%extension
135
        # Definition is in the form
136
        #  [
137
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
138
        #    [('original_name', '')], [('title', '')], // contains a fallback
139
        #    [('extension', '')]
140
        #  ]
141
        name_template, definition = self.get_file_name_definition()
142
143
        name = name_template
144
        for parts in definition:
145
            this_value = None
146
            for this_part in parts:
147
                part, mask = this_part
148
                if part in ('date', 'day', 'month', 'year'):
149
                    this_value = time.strftime(mask, metadata['date_taken'])
150
                    break
151
                elif part in ('location', 'city', 'state', 'country'):
152
                    place_name = geolocation.place_name(
153
                        metadata['latitude'],
154
                        metadata['longitude']
155
                    )
156
157
                    location_parts = re.findall('(%[^%]+)', mask)
158
                    this_value = self.parse_mask_for_location(
159
                        mask,
160
                        location_parts,
161
                        place_name,
162
                    )
163
                    break
164
                elif part in ('album', 'extension', 'title'):
165
                    if metadata[part]:
166
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
167
                        break
168
                elif part in ('original_name'):
169
                    # First we check if we have metadata['original_name'].
170
                    # We have to do this for backwards compatibility because
171
                    #   we original did not store this back into EXIF.
172
                    if metadata[part]:
173
                        this_value = os.path.splitext(metadata['original_name'])[0]
174
                    else:
175
                        # We didn't always store original_name so this is 
176
                        #  for backwards compatability.
177
                        # We want to remove the hardcoded date prefix we used 
178
                        #  to add to the name.
179
                        # This helps when re-running the program on file 
180
                        #  which were already processed.
181
                        this_value = re.sub(
182
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
183
                            '',
184
                            metadata['base_name']
185
                        )
186
                        if(len(this_value) == 0):
187
                            this_value = metadata['base_name']
188
189
                    # Lastly we want to sanitize the name
190
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
191
                elif part.startswith('"') and part.endswith('"'):
192
                    this_value = part[1:-1]
193
                    break
194
195
            # Here we replace the placeholder with it's corresponding value.
196
            # Check if this_value was not set so that the placeholder
197
            #  can be removed completely.
198
            # For example, %title- will be replaced with ''
199
            # Else replace the placeholder (i.e. %title) with the value.
200
            if this_value is None:
201
                name = re.sub(
202
                    #'[^a-z_]+%{}'.format(part),
203
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by Jaisen Mathai
The variable part does not seem to be defined for all execution paths.
Loading history...
204
                    '',
205
                    name,
206
                )
207
            else:
208
                name = re.sub(
209
                    '%{}'.format(part),
210
                    this_value,
211
                    name,
212
                )
213
214
        return name.lower()
215
216
    def get_file_name_definition(self):
217
        """Returns a list of folder definitions.
218
219
        Each element in the list represents a folder.
220
        Fallback folders are supported and are nested lists.
221
        Return values take the following form.
222
        [
223
            ('date', '%Y-%m-%d'),
224
            [
225
                ('location', '%city'),
226
                ('album', ''),
227
                ('"Unknown Location", '')
228
            ]
229
        ]
230
231
        :returns: list
232
        """
233
        # If we've done this already then return it immediately without
234
        # incurring any extra work
235
        if self.cached_file_name_definition is not None:
236
            return self.cached_file_name_definition
237
238
        config = load_config()
239
240
        # If File is in the config we assume name and its
241
        #  corresponding values are also present
242
        config_file = self.default_file_name_definition
243
        if('File' in config):
244
            config_file = config['File']
245
246
        # Find all subpatterns of name that map to the components of the file's
247
        #  name.
248
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
249
        path_parts = re.findall(
250
                         '(\%[a-z_]+)',
251
                         config_file['name']
252
                     )
253
254
        if not path_parts or len(path_parts) == 0:
255
            return (config_file['name'], self.default_file_name_definition)
256
257
        self.cached_file_name_definition = []
258
        for part in path_parts:
259
            if part in config_file:
260
                part = part[1:]
261
                self.cached_file_name_definition.append(
262
                    [(part, config_file[part])]
263
                )
264
            else:
265
                this_part = []
266
                for p in part.split('|'):
267
                    p = p[1:]
268
                    this_part.append(
269
                        (p, config_file[p] if p in config_file else '')
270
                    )
271
                self.cached_file_name_definition.append(this_part)
272
273
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
274
        return self.cached_file_name_definition
275
276
    def get_folder_path_definition(self):
277
        """Returns a list of folder definitions.
278
279
        Each element in the list represents a folder.
280
        Fallback folders are supported and are nested lists.
281
        Return values take the following form.
282
        [
283
            ('date', '%Y-%m-%d'),
284
            [
285
                ('location', '%city'),
286
                ('album', ''),
287
                ('"Unknown Location", '')
288
            ]
289
        ]
290
291
        :returns: list
292
        """
293
        # If we've done this already then return it immediately without
294
        # incurring any extra work
295
        if self.cached_folder_path_definition is not None:
296
            return self.cached_folder_path_definition
297
298
        config = load_config()
299
300
        # If Directory is in the config we assume full_path and its
301
        #  corresponding values (date, location) are also present
302
        config_directory = self.default_folder_path_definition
303
        if('Directory' in config):
304
            config_directory = config['Directory']
305
306
        # Find all subpatterns of full_path that map to directories.
307
        #  I.e. %foo/%bar => ['foo', 'bar']
308
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
309
        path_parts = re.findall(
310
                         '(\%[^/]+)',
311
                         config_directory['full_path']
312
                     )
313
314
        if not path_parts or len(path_parts) == 0:
315
            return self.default_folder_path_definition
316
317
        self.cached_folder_path_definition = []
318
        for part in path_parts:
319
            part = part.replace('%', '')
320
            if part in config_directory:
321
                self.cached_folder_path_definition.append(
322
                    [(part, config_directory[part])]
323
                )
324
            else:
325
                this_part = []
326
                for p in part.split('|'):
327
                    this_part.append(
328
                        (p, config_directory[p] if p in config_directory else '')
329
                    )
330
                self.cached_folder_path_definition.append(this_part)
331
332
        return self.cached_folder_path_definition
333
334
    def get_folder_path(self, metadata, path_parts=None):
335
        """Given a media's metadata this function returns the folder path as a string.
336
337
        :param dict metadata: Metadata dictionary.
338
        :returns: str
339
        """
340
        if path_parts is None:
341
            path_parts = self.get_folder_path_definition()
342
        path = []
343
        for path_part in path_parts:
344
            # We support fallback values so that
345
            #  'album|city|"Unknown Location"
346
            #  %album|%city|"Unknown Location" results in
347
            #  My Album - when an album exists
348
            #  Sunnyvale - when no album exists but a city exists
349
            #  Unknown Location - when neither an album nor location exist
350
            for this_part in path_part:
351
                part, mask = this_part
352
                this_path = self.get_dynamic_path(part, mask, metadata)
353
                if this_path:
354
                    path.append(this_path.strip())
355
                    # We break as soon as we have a value to append
356
                    # Else we continue for fallbacks
357
                    break
358
        return os.path.join(*path)
359
360
    def get_dynamic_path(self, part, mask, metadata):
361
        """Parse a specific folder's name given a mask and metadata.
362
363
        :param part: Name of the part as defined in the path (i.e. date from %date)
364
        :param mask: Mask representing the template for the path (i.e. %city %state
365
        :param metadata: Metadata dictionary.
366
        :returns: str
367
        """
368
369
        # Each part has its own custom logic and we evaluate a single part and return
370
        #  the evaluated string.
371
        if part in ('custom'):
372
            custom_parts = re.findall('(%[a-z_]+)', mask)
373
            folder = mask
374
            for i in custom_parts:
375
                folder = folder.replace(
376
                    i,
377
                    self.get_dynamic_path(i[1:], i, metadata)
378
                )
379
            return folder
380
        elif part in ('date'):
381
            config = load_config()
382
            # If Directory is in the config we assume full_path and its
383
            #  corresponding values (date, location) are also present
384
            config_directory = self.default_folder_path_definition
385
            if('Directory' in config):
386
                config_directory = config['Directory']
387
            date_mask = ''
388
            if 'date' in config_directory:
389
                date_mask = config_directory['date']
390
            return time.strftime(date_mask, metadata['date_taken'])
391
        elif part in ('day', 'month', 'year'):
392
            return time.strftime(mask, metadata['date_taken'])
393
        elif part in ('location', 'city', 'state', 'country'):
394
            place_name = geolocation.place_name(
395
                metadata['latitude'],
396
                metadata['longitude']
397
            )
398
399
            location_parts = re.findall('(%[^%]+)', mask)
400
            parsed_folder_name = self.parse_mask_for_location(
401
                mask,
402
                location_parts,
403
                place_name,
404
            )
405
            return parsed_folder_name
406
        elif part in ('album', 'camera_make', 'camera_model'):
407
            if metadata[part]:
408
                return metadata[part]
409
        elif part.startswith('"') and part.endswith('"'):
410
            # Fallback string
411
            return part[1:-1]
412
413
        return ''
414
415
    def parse_mask_for_location(self, mask, location_parts, place_name):
416
        """Takes a mask for a location and interpolates the actual place names.
417
418
        Given these parameters here are the outputs.
419
420
        mask=%city
421
        location_parts=[('%city','%city','city')]
422
        place_name={'city': u'Sunnyvale'}
423
        output=Sunnyvale
424
425
        mask=%city-%state
426
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
427
        place_name={'city': u'Sunnyvale', 'state': u'California'}
428
        output=Sunnyvale-California
429
430
        mask=%country
431
        location_parts=[('%country','%country','country')]
432
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
433
        output=Sunnyvale
434
435
436
        :param str mask: The location mask in the form of %city-%state, etc
437
        :param list location_parts: A list of tuples in the form of
438
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
439
        :param dict place_name: A dictionary of place keywords and names like
440
            {'default': u'California', 'state': u'California'}
441
        :returns: str
442
        """
443
        found = False
444
        folder_name = mask
445
        for loc_part in location_parts:
446
            # We assume the search returns a tuple of length 2.
447
            # If not then it's a bad mask in config.ini.
448
            # loc_part = '%country-random'
449
            # component_full = '%country-random'
450
            # component = '%country'
451
            # key = 'country
452
            component_full, component, key = re.search(
453
                '((%([a-z]+))[^%]*)',
454
                loc_part
455
            ).groups()
456
457
            if(key in place_name):
458
                found = True
459
                replace_target = component
460
                replace_with = place_name[key]
461
            else:
462
                replace_target = component_full
463
                replace_with = ''
464
465
            folder_name = folder_name.replace(
466
                replace_target,
467
                replace_with,
468
            )
469
470
        if(not found and folder_name == ''):
471
            folder_name = place_name['default']
472
473
        return folder_name
474
475
    def process_checksum(self, _file, allow_duplicate):
476
        db = Db()
477
        checksum = db.checksum(_file)
478
        if(checksum is None):
479
            log.info('Could not get checksum for %s.' % _file)
480
            return None
481
482
        # If duplicates are not allowed then we check if we've seen this file
483
        #  before via checksum. We also check that the file exists at the
484
        #   location we believe it to be.
485
        # If we find a checksum match but the file doesn't exist where we
486
        #  believe it to be then we write a debug log and proceed to import.
487
        checksum_file = db.get_hash(checksum)
488
        if(allow_duplicate is False and checksum_file is not None):
489
            if(os.path.isfile(checksum_file)):
490
                log.info('%s already at %s.' % (
491
                    _file,
492
                    checksum_file
493
                ))
494
                return None
495
            else:
496
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
497
                    _file,
498
                    checksum_file
499
                ))
500
        return checksum
501
502
    def process_file(self, _file, destination, media, **kwargs):
503
504
        move = False
505
        if('move' in kwargs):
506
            move = kwargs['move']
507
508
        allow_duplicate = False
509
        if('allowDuplicate' in kwargs):
510
            allow_duplicate = kwargs['allowDuplicate']
511
512
        stat_info_original = os.stat(_file)
513
514
        if(not media.is_valid()):
515
            print('%s is not a valid media file. Skipping...' % _file)
516
            return
517
518
        checksum = self.process_checksum(_file, allow_duplicate)
519
        if(checksum is None):
520
            log.info('Original checksum returned None for %s. Skipping...' %
521
                     _file)
522
            return
523
524
        media.set_original_name()
525
        metadata = media.get_metadata()
526
527
        directory_name = self.get_folder_path(metadata)
528
529
        dest_directory = os.path.join(destination, directory_name)
530
        file_name = self.get_file_name(media)
531
        dest_path = os.path.join(dest_directory, file_name)
532
533
        # If source and destination are identical then
534
        #  we should not write the file. gh-210
535
        if(_file == dest_path):
536
            print('Final source and destination path should not be identical')
537
            return
538
539
        self.create_directory(dest_directory)
540
541
        # exiftool renames the original file by appending '_original' to the
542
        # file name. A new file is written with new tags with the initial file
543
        # name. See exiftool man page for more details.
544
        exif_original_file = _file + '_original'
545
546
        # Check if the source file was processed by exiftool and an _original
547
        # file was created.
548
        exif_original_file_exists = False
549
        if(os.path.exists(exif_original_file)):
550
            exif_original_file_exists = True
551
552
        if(move is True):
553
            stat = os.stat(_file)
554
            # Move the processed file into the destination directory
555
            shutil.move(_file, dest_path)
556
557
            if(exif_original_file_exists is True):
558
                # We can remove it as we don't need the initial file.
559
                os.remove(exif_original_file)
560
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
561
        else:
562
            if(exif_original_file_exists is True):
563
                # Move the newly processed file with any updated tags to the
564
                # destination directory
565
                shutil.move(_file, dest_path)
566
                # Move the exif _original back to the initial source file
567
                shutil.move(exif_original_file, _file)
568
            else:
569
                compatability._copyfile(_file, dest_path)
570
571
            # Set the utime based on what the original file contained 
572
            #  before we made any changes.
573
            # Then set the utime on the destination file based on metadata.
574
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
575
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
576
577
        db = Db()
578
        db.add_hash(checksum, dest_path)
579
        db.update_hash_db()
580
581
        return dest_path
582
583
    def set_utime_from_metadata(self, metadata, file_path):
584
        """ Set the modification time on the file based on the file name.
585
        """
586
587
        # Initialize date taken to what's returned from the metadata function.
588
        # If the folder and file name follow a time format of
589
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
590
        date_taken = metadata['date_taken']
591
        base_name = metadata['base_name']
592
        year_month_day_match = re.search(
593
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
594
            base_name
595
        )
596
        if(year_month_day_match is not None):
597
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
598
            date_taken = time.strptime(
599
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
600
                '%Y-%m-%d %H:%M:%S'
601
            )
602
603
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
604
        else:
605
            # We don't make any assumptions about time zones and
606
            # assume local time zone.
607
            date_taken_in_seconds = time.mktime(date_taken)
608
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
609