elodie.filesystem.FileSystem.get_file_name()   F
last analyzed

Complexity

Conditions 18

Size

Total Lines 110
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
eloc 55
nop 2
dl 0
loc 110
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like elodie.filesystem.FileSystem.get_file_name() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
27
        self.default_file_name_definition = {
28
            'date': '%Y-%m-%d_%H-%M-%S',
29
            'name': '%date-%original_name-%title.%extension',
30
        }
31
        # The default folder path is along the lines of 2015-01-Jan/Chicago
32
        self.default_folder_path_definition = {
33
            'date': '%Y-%m-%b',
34
            'location': '%city',
35
            'full_path': '%date/%album|%location|"{}"'.format(
36
                            geolocation.__DEFAULT_LOCATION__
37
                         ),
38
        }
39
        self.cached_file_name_definition = None
40
        self.cached_folder_path_definition = None
41
        # Python3 treats the regex \s differently than Python2.
42
        # It captures some additional characters like the unicode checkmark \u2713.
43
        # See build failures in Python3 here.
44
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
45
        self.whitespace_regex = '[ \t\n\r\f\v]+'
46
47
    def create_directory(self, directory_path):
48
        """Create a directory if it does not already exist.
49
50
        :param str directory_name: A fully qualified path of the
51
            to create.
52
        :returns: bool
53
        """
54
        try:
55
            if os.path.exists(directory_path):
56
                return True
57
            else:
58
                os.makedirs(directory_path)
59
                return True
60
        except OSError:
61
            # OSError is thrown for cases like no permission
62
            pass
63
64
        return False
65
66
    def delete_directory_if_empty(self, directory_path):
67
        """Delete a directory only if it's empty.
68
69
        Instead of checking first using `len([name for name in
70
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
71
72
        :param str directory_name: A fully qualified path of the directory
73
            to delete.
74
        """
75
        try:
76
            os.rmdir(directory_path)
77
            return True
78
        except OSError:
79
            pass
80
81
        return False
82
83
    def get_all_files(self, path, extensions=None):
84
        """Recursively get all files which match a path and extension.
85
86
        :param str path string: Path to start recursive file listing
87
        :param tuple(str) extensions: File extensions to include (whitelist)
88
        :returns: generator
89
        """
90
        # If extensions is None then we get all supported extensions
91
        if not extensions:
92
            extensions = set()
93
            subclasses = get_all_subclasses(Base)
94
            for cls in subclasses:
95
                extensions.update(cls.extensions)
96
97
        for dirname, dirnames, filenames in os.walk(path):
98
            for filename in filenames:
99
                # If file extension is in `extensions` then append to the list
100
                if os.path.splitext(filename)[1][1:].lower() in extensions:
101
                    yield os.path.join(dirname, filename)
102
103
    def get_current_directory(self):
104
        """Get the current working directory.
105
106
        :returns: str
107
        """
108
        return os.getcwd()
109
110
    def get_file_name(self, media):
111
        """Generate file name for a photo or video using its metadata.
112
113
        Originally we hardcoded the file name to include an ISO date format.
114
        We use an ISO8601-like format for the file name prefix. Instead of
115
        colons as the separator for hours, minutes and seconds we use a hyphen.
116
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
117
118
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
119
        https://github.com/jmathai/elodie/pull/225
120
121
        :param media: A Photo or Video instance
122
        :type media: :class:`~elodie.media.photo.Photo` or
123
            :class:`~elodie.media.video.Video`
124
        :returns: str or None for non-photo or non-videos
125
        """
126
        if(not media.is_valid()):
127
            return None
128
129
        metadata = media.get_metadata()
130
        if(metadata is None):
131
            return None
132
133
        # Get the name template and definition.
134
        # Name template is in the form %date-%original_name-%title.%extension
135
        # Definition is in the form
136
        #  [
137
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
138
        #    [('original_name', '')], [('title', '')], // contains a fallback
139
        #    [('extension', '')]
140
        #  ]
141
        name_template, definition = self.get_file_name_definition()
142
143
        name = name_template
144
        for parts in definition:
145
            this_value = None
146
            for this_part in parts:
147
                part, mask = this_part
148
                if part in ('date', 'day', 'month', 'year'):
149
                    this_value = time.strftime(mask, metadata['date_taken'])
150
                    break
151
                elif part in ('location', 'city', 'state', 'country'):
152
                    place_name = geolocation.place_name(
153
                        metadata['latitude'],
154
                        metadata['longitude']
155
                    )
156
157
                    location_parts = re.findall('(%[^%]+)', mask)
158
                    this_value = self.parse_mask_for_location(
159
                        mask,
160
                        location_parts,
161
                        place_name,
162
                    )
163
                    break
164
                elif part in ('album', 'extension', 'title'):
165
                    if metadata[part]:
166
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
167
                        break
168
                elif part in ('original_name'):
169
                    # First we check if we have metadata['original_name'].
170
                    # We have to do this for backwards compatibility because
171
                    #   we original did not store this back into EXIF.
172
                    if metadata[part]:
173
                        this_value = os.path.splitext(metadata['original_name'])[0]
174
                    else:
175
                        # We didn't always store original_name so this is 
176
                        #  for backwards compatability.
177
                        # We want to remove the hardcoded date prefix we used 
178
                        #  to add to the name.
179
                        # This helps when re-running the program on file 
180
                        #  which were already processed.
181
                        this_value = re.sub(
182
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
183
                            '',
184
                            metadata['base_name']
185
                        )
186
                        if(len(this_value) == 0):
187
                            this_value = metadata['base_name']
188
189
                    # Lastly we want to sanitize the name
190
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
191
                elif part.startswith('"') and part.endswith('"'):
192
                    this_value = part[1:-1]
193
                    break
194
195
            # Here we replace the placeholder with it's corresponding value.
196
            # Check if this_value was not set so that the placeholder
197
            #  can be removed completely.
198
            # For example, %title- will be replaced with ''
199
            # Else replace the placeholder (i.e. %title) with the value.
200
            if this_value is None:
201
                name = re.sub(
202
                    #'[^a-z_]+%{}'.format(part),
203
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by Jaisen Mathai
The variable part does not seem to be defined for all execution paths.
Loading history...
204
                    '',
205
                    name,
206
                )
207
            else:
208
                name = re.sub(
209
                    '%{}'.format(part),
210
                    this_value,
211
                    name,
212
                )
213
214
        config = load_config()
215
216
        if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
217
            return name.upper()
218
        else:
219
            return name.lower()
220
221
    def get_file_name_definition(self):
222
        """Returns a list of folder definitions.
223
224
        Each element in the list represents a folder.
225
        Fallback folders are supported and are nested lists.
226
        Return values take the following form.
227
        [
228
            ('date', '%Y-%m-%d'),
229
            [
230
                ('location', '%city'),
231
                ('album', ''),
232
                ('"Unknown Location", '')
233
            ]
234
        ]
235
236
        :returns: list
237
        """
238
        # If we've done this already then return it immediately without
239
        # incurring any extra work
240
        if self.cached_file_name_definition is not None:
241
            return self.cached_file_name_definition
242
243
        config = load_config()
244
245
        # If File is in the config we assume name and its
246
        #  corresponding values are also present
247
        config_file = self.default_file_name_definition
248
        if('File' in config):
249
            config_file = config['File']
250
251
        # Find all subpatterns of name that map to the components of the file's
252
        #  name.
253
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
254
        path_parts = re.findall(
255
                         '(\%[a-z_]+)',
256
                         config_file['name']
257
                     )
258
259
        if not path_parts or len(path_parts) == 0:
260
            return (config_file['name'], self.default_file_name_definition)
261
262
        self.cached_file_name_definition = []
263
        for part in path_parts:
264
            if part in config_file:
265
                part = part[1:]
266
                self.cached_file_name_definition.append(
267
                    [(part, config_file[part])]
268
                )
269
            else:
270
                this_part = []
271
                for p in part.split('|'):
272
                    p = p[1:]
273
                    this_part.append(
274
                        (p, config_file[p] if p in config_file else '')
275
                    )
276
                self.cached_file_name_definition.append(this_part)
277
278
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
279
        return self.cached_file_name_definition
280
281
    def get_folder_path_definition(self):
282
        """Returns a list of folder definitions.
283
284
        Each element in the list represents a folder.
285
        Fallback folders are supported and are nested lists.
286
        Return values take the following form.
287
        [
288
            ('date', '%Y-%m-%d'),
289
            [
290
                ('location', '%city'),
291
                ('album', ''),
292
                ('"Unknown Location", '')
293
            ]
294
        ]
295
296
        :returns: list
297
        """
298
        # If we've done this already then return it immediately without
299
        # incurring any extra work
300
        if self.cached_folder_path_definition is not None:
301
            return self.cached_folder_path_definition
302
303
        config = load_config()
304
305
        # If Directory is in the config we assume full_path and its
306
        #  corresponding values (date, location) are also present
307
        config_directory = self.default_folder_path_definition
308
        if('Directory' in config):
309
            config_directory = config['Directory']
310
311
        # Find all subpatterns of full_path that map to directories.
312
        #  I.e. %foo/%bar => ['foo', 'bar']
313
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
314
        path_parts = re.findall(
315
                         '(\%[^/]+)',
316
                         config_directory['full_path']
317
                     )
318
319
        if not path_parts or len(path_parts) == 0:
320
            return self.default_folder_path_definition
321
322
        self.cached_folder_path_definition = []
323
        for part in path_parts:
324
            part = part.replace('%', '')
325
            if part in config_directory:
326
                self.cached_folder_path_definition.append(
327
                    [(part, config_directory[part])]
328
                )
329
            else:
330
                this_part = []
331
                for p in part.split('|'):
332
                    this_part.append(
333
                        (p, config_directory[p] if p in config_directory else '')
334
                    )
335
                self.cached_folder_path_definition.append(this_part)
336
337
        return self.cached_folder_path_definition
338
339
    def get_folder_path(self, metadata, path_parts=None):
340
        """Given a media's metadata this function returns the folder path as a string.
341
342
        :param dict metadata: Metadata dictionary.
343
        :returns: str
344
        """
345
        if path_parts is None:
346
            path_parts = self.get_folder_path_definition()
347
        path = []
348
        for path_part in path_parts:
349
            # We support fallback values so that
350
            #  'album|city|"Unknown Location"
351
            #  %album|%city|"Unknown Location" results in
352
            #  My Album - when an album exists
353
            #  Sunnyvale - when no album exists but a city exists
354
            #  Unknown Location - when neither an album nor location exist
355
            for this_part in path_part:
356
                part, mask = this_part
357
                this_path = self.get_dynamic_path(part, mask, metadata)
358
                if this_path:
359
                    path.append(this_path.strip())
360
                    # We break as soon as we have a value to append
361
                    # Else we continue for fallbacks
362
                    break
363
        return os.path.join(*path)
364
365
    def get_dynamic_path(self, part, mask, metadata):
366
        """Parse a specific folder's name given a mask and metadata.
367
368
        :param part: Name of the part as defined in the path (i.e. date from %date)
369
        :param mask: Mask representing the template for the path (i.e. %city %state
370
        :param metadata: Metadata dictionary.
371
        :returns: str
372
        """
373
374
        # Each part has its own custom logic and we evaluate a single part and return
375
        #  the evaluated string.
376
        if part in ('custom'):
377
            custom_parts = re.findall('(%[a-z_]+)', mask)
378
            folder = mask
379
            for i in custom_parts:
380
                folder = folder.replace(
381
                    i,
382
                    self.get_dynamic_path(i[1:], i, metadata)
383
                )
384
            return folder
385
        elif part in ('date'):
386
            config = load_config()
387
            # If Directory is in the config we assume full_path and its
388
            #  corresponding values (date, location) are also present
389
            config_directory = self.default_folder_path_definition
390
            if('Directory' in config):
391
                config_directory = config['Directory']
392
            date_mask = ''
393
            if 'date' in config_directory:
394
                date_mask = config_directory['date']
395
            return time.strftime(date_mask, metadata['date_taken'])
396
        elif part in ('day', 'month', 'year'):
397
            return time.strftime(mask, metadata['date_taken'])
398
        elif part in ('location', 'city', 'state', 'country'):
399
            place_name = geolocation.place_name(
400
                metadata['latitude'],
401
                metadata['longitude']
402
            )
403
404
            location_parts = re.findall('(%[^%]+)', mask)
405
            parsed_folder_name = self.parse_mask_for_location(
406
                mask,
407
                location_parts,
408
                place_name,
409
            )
410
            return parsed_folder_name
411
        elif part in ('album', 'camera_make', 'camera_model'):
412
            if metadata[part]:
413
                return metadata[part]
414
        elif part.startswith('"') and part.endswith('"'):
415
            # Fallback string
416
            return part[1:-1]
417
418
        return ''
419
420
    def parse_mask_for_location(self, mask, location_parts, place_name):
421
        """Takes a mask for a location and interpolates the actual place names.
422
423
        Given these parameters here are the outputs.
424
425
        mask=%city
426
        location_parts=[('%city','%city','city')]
427
        place_name={'city': u'Sunnyvale'}
428
        output=Sunnyvale
429
430
        mask=%city-%state
431
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
432
        place_name={'city': u'Sunnyvale', 'state': u'California'}
433
        output=Sunnyvale-California
434
435
        mask=%country
436
        location_parts=[('%country','%country','country')]
437
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
438
        output=Sunnyvale
439
440
441
        :param str mask: The location mask in the form of %city-%state, etc
442
        :param list location_parts: A list of tuples in the form of
443
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
444
        :param dict place_name: A dictionary of place keywords and names like
445
            {'default': u'California', 'state': u'California'}
446
        :returns: str
447
        """
448
        found = False
449
        folder_name = mask
450
        for loc_part in location_parts:
451
            # We assume the search returns a tuple of length 2.
452
            # If not then it's a bad mask in config.ini.
453
            # loc_part = '%country-random'
454
            # component_full = '%country-random'
455
            # component = '%country'
456
            # key = 'country
457
            component_full, component, key = re.search(
458
                '((%([a-z]+))[^%]*)',
459
                loc_part
460
            ).groups()
461
462
            if(key in place_name):
463
                found = True
464
                replace_target = component
465
                replace_with = place_name[key]
466
            else:
467
                replace_target = component_full
468
                replace_with = ''
469
470
            folder_name = folder_name.replace(
471
                replace_target,
472
                replace_with,
473
            )
474
475
        if(not found and folder_name == ''):
476
            folder_name = place_name['default']
477
478
        return folder_name
479
480
    def process_checksum(self, _file, allow_duplicate):
481
        db = Db()
482
        checksum = db.checksum(_file)
483
        if(checksum is None):
484
            log.info('Could not get checksum for %s.' % _file)
485
            return None
486
487
        # If duplicates are not allowed then we check if we've seen this file
488
        #  before via checksum. We also check that the file exists at the
489
        #   location we believe it to be.
490
        # If we find a checksum match but the file doesn't exist where we
491
        #  believe it to be then we write a debug log and proceed to import.
492
        checksum_file = db.get_hash(checksum)
493
        if(allow_duplicate is False and checksum_file is not None):
494
            if(os.path.isfile(checksum_file)):
495
                log.info('%s already at %s.' % (
496
                    _file,
497
                    checksum_file
498
                ))
499
                return None
500
            else:
501
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
502
                    _file,
503
                    checksum_file
504
                ))
505
        return checksum
506
507
    def process_file(self, _file, destination, media, **kwargs):
508
509
        move = False
510
        if('move' in kwargs):
511
            move = kwargs['move']
512
513
        allow_duplicate = False
514
        if('allowDuplicate' in kwargs):
515
            allow_duplicate = kwargs['allowDuplicate']
516
517
        stat_info_original = os.stat(_file)
518
519
        if(not media.is_valid()):
520
            print('%s is not a valid media file. Skipping...' % _file)
521
            return
522
523
        checksum = self.process_checksum(_file, allow_duplicate)
524
        if(checksum is None):
525
            log.info('Original checksum returned None for %s. Skipping...' %
526
                     _file)
527
            return
528
529
        media.set_original_name()
530
        metadata = media.get_metadata()
531
532
        directory_name = self.get_folder_path(metadata)
533
534
        dest_directory = os.path.join(destination, directory_name)
535
        file_name = self.get_file_name(media)
536
        dest_path = os.path.join(dest_directory, file_name)
537
538
        # If source and destination are identical then
539
        #  we should not write the file. gh-210
540
        if(_file == dest_path):
541
            print('Final source and destination path should not be identical')
542
            return
543
544
        self.create_directory(dest_directory)
545
546
        # exiftool renames the original file by appending '_original' to the
547
        # file name. A new file is written with new tags with the initial file
548
        # name. See exiftool man page for more details.
549
        exif_original_file = _file + '_original'
550
551
        # Check if the source file was processed by exiftool and an _original
552
        # file was created.
553
        exif_original_file_exists = False
554
        if(os.path.exists(exif_original_file)):
555
            exif_original_file_exists = True
556
557
        if(move is True):
558
            stat = os.stat(_file)
559
            # Move the processed file into the destination directory
560
            shutil.move(_file, dest_path)
561
562
            if(exif_original_file_exists is True):
563
                # We can remove it as we don't need the initial file.
564
                os.remove(exif_original_file)
565
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
566
        else:
567
            if(exif_original_file_exists is True):
568
                # Move the newly processed file with any updated tags to the
569
                # destination directory
570
                shutil.move(_file, dest_path)
571
                # Move the exif _original back to the initial source file
572
                shutil.move(exif_original_file, _file)
573
            else:
574
                compatability._copyfile(_file, dest_path)
575
576
            # Set the utime based on what the original file contained 
577
            #  before we made any changes.
578
            # Then set the utime on the destination file based on metadata.
579
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
580
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
581
582
        db = Db()
583
        db.add_hash(checksum, dest_path)
584
        db.update_hash_db()
585
586
        return dest_path
587
588
    def set_utime_from_metadata(self, metadata, file_path):
589
        """ Set the modification time on the file based on the file name.
590
        """
591
592
        # Initialize date taken to what's returned from the metadata function.
593
        # If the folder and file name follow a time format of
594
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
595
        date_taken = metadata['date_taken']
596
        base_name = metadata['base_name']
597
        year_month_day_match = re.search(
598
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
599
            base_name
600
        )
601
        if(year_month_day_match is not None):
602
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
603
            date_taken = time.strptime(
604
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
605
                '%Y-%m-%d %H:%M:%S'
606
            )
607
608
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
609
        else:
610
            # We don't make any assumptions about time zones and
611
            # assume local time zone.
612
            date_taken_in_seconds = time.mktime(date_taken)
613
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
614