elodie.filesystem.FileSystem.get_file_name()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 106
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 52
nop 2
dl 0
loc 106
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like elodie.filesystem.FileSystem.get_file_name() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
from elodie.plugins.plugins import Plugins
21
22
23
class FileSystem(object):
24
    """A class for interacting with the file system."""
25
26
    def __init__(self):
27
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
28
        self.default_file_name_definition = {
29
            'date': '%Y-%m-%d_%H-%M-%S',
30
            'name': '%date-%original_name-%title.%extension',
31
        }
32
        # The default folder path is along the lines of 2015-01-Jan/Chicago
33
        self.default_folder_path_definition = {
34
            'date': '%Y-%m-%b',
35
            'location': '%city',
36
            'full_path': '%date/%album|%location|"{}"'.format(
37
                            geolocation.__DEFAULT_LOCATION__
38
                         ),
39
        }
40
        self.cached_file_name_definition = None
41
        self.cached_folder_path_definition = None
42
        # Python3 treats the regex \s differently than Python2.
43
        # It captures some additional characters like the unicode checkmark \u2713.
44
        # See build failures in Python3 here.
45
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
46
        self.whitespace_regex = '[ \t\n\r\f\v]+'
47
48
        # Instantiate a plugins object
49
        self.plugins = Plugins()
50
51
52
    def create_directory(self, directory_path):
53
        """Create a directory if it does not already exist.
54
55
        :param str directory_name: A fully qualified path of the
56
            to create.
57
        :returns: bool
58
        """
59
        try:
60
            if os.path.exists(directory_path):
61
                return True
62
            else:
63
                os.makedirs(directory_path)
64
                return True
65
        except OSError:
66
            # OSError is thrown for cases like no permission
67
            pass
68
69
        return False
70
71
    def delete_directory_if_empty(self, directory_path):
72
        """Delete a directory only if it's empty.
73
74
        Instead of checking first using `len([name for name in
75
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
76
77
        :param str directory_name: A fully qualified path of the directory
78
            to delete.
79
        """
80
        try:
81
            os.rmdir(directory_path)
82
            return True
83
        except OSError:
84
            pass
85
86
        return False
87
88
    def get_all_files(self, path, extensions=None):
89
        """Recursively get all files which match a path and extension.
90
91
        :param str path string: Path to start recursive file listing
92
        :param tuple(str) extensions: File extensions to include (whitelist)
93
        :returns: generator
94
        """
95
        # If extensions is None then we get all supported extensions
96
        if not extensions:
97
            extensions = set()
98
            subclasses = get_all_subclasses(Base)
99
            for cls in subclasses:
100
                extensions.update(cls.extensions)
101
102
        for dirname, dirnames, filenames in os.walk(path):
103
            for filename in filenames:
104
                # If file extension is in `extensions` then append to the list
105
                if os.path.splitext(filename)[1][1:].lower() in extensions:
106
                    yield os.path.join(dirname, filename)
107
108
    def get_current_directory(self):
109
        """Get the current working directory.
110
111
        :returns: str
112
        """
113
        return os.getcwd()
114
115
    def get_file_name(self, metadata):
116
        """Generate file name for a photo or video using its metadata.
117
118
        Originally we hardcoded the file name to include an ISO date format.
119
        We use an ISO8601-like format for the file name prefix. Instead of
120
        colons as the separator for hours, minutes and seconds we use a hyphen.
121
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
122
123
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
124
        https://github.com/jmathai/elodie/pull/225
125
126
        :param media: A Photo or Video instance
127
        :type media: :class:`~elodie.media.photo.Photo` or
128
            :class:`~elodie.media.video.Video`
129
        :returns: str or None for non-photo or non-videos
130
        """
131
        if(metadata is None):
132
            return None
133
134
        # Get the name template and definition.
135
        # Name template is in the form %date-%original_name-%title.%extension
136
        # Definition is in the form
137
        #  [
138
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
139
        #    [('original_name', '')], [('title', '')], // contains a fallback
140
        #    [('extension', '')]
141
        #  ]
142
        name_template, definition = self.get_file_name_definition()
143
144
        name = name_template
145
        for parts in definition:
146
            this_value = None
147
            for this_part in parts:
148
                part, mask = this_part
149
                if part in ('date', 'day', 'month', 'year'):
150
                    this_value = time.strftime(mask, metadata['date_taken'])
151
                    break
152
                elif part in ('location', 'city', 'state', 'country'):
153
                    place_name = geolocation.place_name(
154
                        metadata['latitude'],
155
                        metadata['longitude']
156
                    )
157
158
                    location_parts = re.findall('(%[^%]+)', mask)
159
                    this_value = self.parse_mask_for_location(
160
                        mask,
161
                        location_parts,
162
                        place_name,
163
                    )
164
                    break
165
                elif part in ('album', 'extension', 'title'):
166
                    if metadata[part]:
167
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
168
                        break
169
                elif part in ('original_name'):
170
                    # First we check if we have metadata['original_name'].
171
                    # We have to do this for backwards compatibility because
172
                    #   we original did not store this back into EXIF.
173
                    if metadata[part]:
174
                        this_value = os.path.splitext(metadata['original_name'])[0]
175
                    else:
176
                        # We didn't always store original_name so this is 
177
                        #  for backwards compatability.
178
                        # We want to remove the hardcoded date prefix we used 
179
                        #  to add to the name.
180
                        # This helps when re-running the program on file 
181
                        #  which were already processed.
182
                        this_value = re.sub(
183
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
184
                            '',
185
                            metadata['base_name']
186
                        )
187
                        if(len(this_value) == 0):
188
                            this_value = metadata['base_name']
189
190
                    # Lastly we want to sanitize the name
191
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
192
                elif part.startswith('"') and part.endswith('"'):
193
                    this_value = part[1:-1]
194
                    break
195
196
            # Here we replace the placeholder with it's corresponding value.
197
            # Check if this_value was not set so that the placeholder
198
            #  can be removed completely.
199
            # For example, %title- will be replaced with ''
200
            # Else replace the placeholder (i.e. %title) with the value.
201
            if this_value is None:
202
                name = re.sub(
203
                    #'[^a-z_]+%{}'.format(part),
204
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by Jaisen Mathai
The variable part does not seem to be defined for all execution paths.
Loading history...
205
                    '',
206
                    name,
207
                )
208
            else:
209
                name = re.sub(
210
                    '%{}'.format(part),
211
                    this_value,
212
                    name,
213
                )
214
215
        config = load_config()
216
217
        if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
218
            return name.upper()
219
        else:
220
            return name.lower()
221
222
    def get_file_name_definition(self):
223
        """Returns a list of folder definitions.
224
225
        Each element in the list represents a folder.
226
        Fallback folders are supported and are nested lists.
227
        Return values take the following form.
228
        [
229
            ('date', '%Y-%m-%d'),
230
            [
231
                ('location', '%city'),
232
                ('album', ''),
233
                ('"Unknown Location", '')
234
            ]
235
        ]
236
237
        :returns: list
238
        """
239
        # If we've done this already then return it immediately without
240
        # incurring any extra work
241
        if self.cached_file_name_definition is not None:
242
            return self.cached_file_name_definition
243
244
        config = load_config()
245
246
        # If File is in the config we assume name and its
247
        #  corresponding values are also present
248
        config_file = self.default_file_name_definition
249
        if('File' in config):
250
            config_file = config['File']
251
252
        # Find all subpatterns of name that map to the components of the file's
253
        #  name.
254
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
255
        path_parts = re.findall(
256
                         '(\%[a-z_]+)',
257
                         config_file['name']
258
                     )
259
260
        if not path_parts or len(path_parts) == 0:
261
            return (config_file['name'], self.default_file_name_definition)
262
263
        self.cached_file_name_definition = []
264
        for part in path_parts:
265
            if part in config_file:
266
                part = part[1:]
267
                self.cached_file_name_definition.append(
268
                    [(part, config_file[part])]
269
                )
270
            else:
271
                this_part = []
272
                for p in part.split('|'):
273
                    p = p[1:]
274
                    this_part.append(
275
                        (p, config_file[p] if p in config_file else '')
276
                    )
277
                self.cached_file_name_definition.append(this_part)
278
279
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
280
        return self.cached_file_name_definition
281
282
    def get_folder_path_definition(self):
283
        """Returns a list of folder definitions.
284
285
        Each element in the list represents a folder.
286
        Fallback folders are supported and are nested lists.
287
        Return values take the following form.
288
        [
289
            ('date', '%Y-%m-%d'),
290
            [
291
                ('location', '%city'),
292
                ('album', ''),
293
                ('"Unknown Location", '')
294
            ]
295
        ]
296
297
        :returns: list
298
        """
299
        # If we've done this already then return it immediately without
300
        # incurring any extra work
301
        if self.cached_folder_path_definition is not None:
302
            return self.cached_folder_path_definition
303
304
        config = load_config()
305
306
        # If Directory is in the config we assume full_path and its
307
        #  corresponding values (date, location) are also present
308
        config_directory = self.default_folder_path_definition
309
        if('Directory' in config):
310
            config_directory = config['Directory']
311
312
        # Find all subpatterns of full_path that map to directories.
313
        #  I.e. %foo/%bar => ['foo', 'bar']
314
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
315
        path_parts = re.findall(
316
                         '(\%[^/]+)',
317
                         config_directory['full_path']
318
                     )
319
320
        if not path_parts or len(path_parts) == 0:
321
            return self.default_folder_path_definition
322
323
        self.cached_folder_path_definition = []
324
        for part in path_parts:
325
            part = part.replace('%', '')
326
            if part in config_directory:
327
                self.cached_folder_path_definition.append(
328
                    [(part, config_directory[part])]
329
                )
330
            else:
331
                this_part = []
332
                for p in part.split('|'):
333
                    this_part.append(
334
                        (p, config_directory[p] if p in config_directory else '')
335
                    )
336
                self.cached_folder_path_definition.append(this_part)
337
338
        return self.cached_folder_path_definition
339
340
    def get_folder_path(self, metadata, path_parts=None):
341
        """Given a media's metadata this function returns the folder path as a string.
342
343
        :param dict metadata: Metadata dictionary.
344
        :returns: str
345
        """
346
        if path_parts is None:
347
            path_parts = self.get_folder_path_definition()
348
        path = []
349
        for path_part in path_parts:
350
            # We support fallback values so that
351
            #  'album|city|"Unknown Location"
352
            #  %album|%city|"Unknown Location" results in
353
            #  My Album - when an album exists
354
            #  Sunnyvale - when no album exists but a city exists
355
            #  Unknown Location - when neither an album nor location exist
356
            for this_part in path_part:
357
                part, mask = this_part
358
                this_path = self.get_dynamic_path(part, mask, metadata)
359
                if this_path:
360
                    path.append(this_path.strip())
361
                    # We break as soon as we have a value to append
362
                    # Else we continue for fallbacks
363
                    break
364
        return os.path.join(*path)
365
366
    def get_dynamic_path(self, part, mask, metadata):
367
        """Parse a specific folder's name given a mask and metadata.
368
369
        :param part: Name of the part as defined in the path (i.e. date from %date)
370
        :param mask: Mask representing the template for the path (i.e. %city %state
371
        :param metadata: Metadata dictionary.
372
        :returns: str
373
        """
374
375
        # Each part has its own custom logic and we evaluate a single part and return
376
        #  the evaluated string.
377
        if part in ('custom'):
378
            custom_parts = re.findall('(%[a-z_]+)', mask)
379
            folder = mask
380
            for i in custom_parts:
381
                folder = folder.replace(
382
                    i,
383
                    self.get_dynamic_path(i[1:], i, metadata)
384
                )
385
            return folder
386
        elif part in ('date'):
387
            config = load_config()
388
            # If Directory is in the config we assume full_path and its
389
            #  corresponding values (date, location) are also present
390
            config_directory = self.default_folder_path_definition
391
            if('Directory' in config):
392
                config_directory = config['Directory']
393
            date_mask = ''
394
            if 'date' in config_directory:
395
                date_mask = config_directory['date']
396
            return time.strftime(date_mask, metadata['date_taken'])
397
        elif part in ('day', 'month', 'year'):
398
            return time.strftime(mask, metadata['date_taken'])
399
        elif part in ('location', 'city', 'state', 'country'):
400
            place_name = geolocation.place_name(
401
                metadata['latitude'],
402
                metadata['longitude']
403
            )
404
405
            location_parts = re.findall('(%[^%]+)', mask)
406
            parsed_folder_name = self.parse_mask_for_location(
407
                mask,
408
                location_parts,
409
                place_name,
410
            )
411
            return parsed_folder_name
412
        elif part in ('album', 'camera_make', 'camera_model'):
413
            if metadata[part]:
414
                return metadata[part]
415
        elif part.startswith('"') and part.endswith('"'):
416
            # Fallback string
417
            return part[1:-1]
418
419
        return ''
420
421
    def parse_mask_for_location(self, mask, location_parts, place_name):
422
        """Takes a mask for a location and interpolates the actual place names.
423
424
        Given these parameters here are the outputs.
425
426
        mask=%city
427
        location_parts=[('%city','%city','city')]
428
        place_name={'city': u'Sunnyvale'}
429
        output=Sunnyvale
430
431
        mask=%city-%state
432
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
433
        place_name={'city': u'Sunnyvale', 'state': u'California'}
434
        output=Sunnyvale-California
435
436
        mask=%country
437
        location_parts=[('%country','%country','country')]
438
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
439
        output=Sunnyvale
440
441
442
        :param str mask: The location mask in the form of %city-%state, etc
443
        :param list location_parts: A list of tuples in the form of
444
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
445
        :param dict place_name: A dictionary of place keywords and names like
446
            {'default': u'California', 'state': u'California'}
447
        :returns: str
448
        """
449
        found = False
450
        folder_name = mask
451
        for loc_part in location_parts:
452
            # We assume the search returns a tuple of length 2.
453
            # If not then it's a bad mask in config.ini.
454
            # loc_part = '%country-random'
455
            # component_full = '%country-random'
456
            # component = '%country'
457
            # key = 'country
458
            component_full, component, key = re.search(
459
                '((%([a-z]+))[^%]*)',
460
                loc_part
461
            ).groups()
462
463
            if(key in place_name):
464
                found = True
465
                replace_target = component
466
                replace_with = place_name[key]
467
            else:
468
                replace_target = component_full
469
                replace_with = ''
470
471
            folder_name = folder_name.replace(
472
                replace_target,
473
                replace_with,
474
            )
475
476
        if(not found and folder_name == ''):
477
            folder_name = place_name['default']
478
479
        return folder_name
480
481
    def process_checksum(self, _file, allow_duplicate):
482
        db = Db()
483
        checksum = db.checksum(_file)
484
        if(checksum is None):
485
            log.info('Could not get checksum for %s.' % _file)
486
            return None
487
488
        # If duplicates are not allowed then we check if we've seen this file
489
        #  before via checksum. We also check that the file exists at the
490
        #   location we believe it to be.
491
        # If we find a checksum match but the file doesn't exist where we
492
        #  believe it to be then we write a debug log and proceed to import.
493
        checksum_file = db.get_hash(checksum)
494
        if(allow_duplicate is False and checksum_file is not None):
495
            if(os.path.isfile(checksum_file)):
496
                log.info('%s already at %s.' % (
497
                    _file,
498
                    checksum_file
499
                ))
500
                return None
501
            else:
502
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
503
                    _file,
504
                    checksum_file
505
                ))
506
        return checksum
507
508
    def process_file(self, _file, destination, media, **kwargs):
509
        move = False
510
        if('move' in kwargs):
511
            move = kwargs['move']
512
513
        allow_duplicate = False
514
        if('allowDuplicate' in kwargs):
515
            allow_duplicate = kwargs['allowDuplicate']
516
517
        stat_info_original = os.stat(_file)
518
        metadata = media.get_metadata()
519
520
        if(not media.is_valid()):
521
            print('%s is not a valid media file. Skipping...' % _file)
522
            return
523
524
        checksum = self.process_checksum(_file, allow_duplicate)
525
        if(checksum is None):
526
            log.info('Original checksum returned None for %s. Skipping...' %
527
                     _file)
528
            return
529
530
        # Run `before()` for every loaded plugin and if any of them raise an exception
531
        #  then we skip importing the file and log a message.
532
        plugins_run_before_status = self.plugins.run_all_before(_file, destination)
533
        if(plugins_run_before_status == False):
534
            log.warn('At least one plugin pre-run failed for %s' % _file)
535
            return
536
537
        directory_name = self.get_folder_path(metadata)
538
        dest_directory = os.path.join(destination, directory_name)
539
        file_name = self.get_file_name(metadata)
540
        dest_path = os.path.join(dest_directory, file_name)
541
542
        media.set_original_name()
543
544
        # If source and destination are identical then
545
        #  we should not write the file. gh-210
546
        if(_file == dest_path):
547
            print('Final source and destination path should not be identical')
548
            return
549
550
        self.create_directory(dest_directory)
551
552
        # exiftool renames the original file by appending '_original' to the
553
        # file name. A new file is written with new tags with the initial file
554
        # name. See exiftool man page for more details.
555
        exif_original_file = _file + '_original'
556
557
        # Check if the source file was processed by exiftool and an _original
558
        # file was created.
559
        exif_original_file_exists = False
560
        if(os.path.exists(exif_original_file)):
561
            exif_original_file_exists = True
562
563
        if(move is True):
564
            stat = os.stat(_file)
565
            # Move the processed file into the destination directory
566
            shutil.move(_file, dest_path)
567
568
            if(exif_original_file_exists is True):
569
                # We can remove it as we don't need the initial file.
570
                os.remove(exif_original_file)
571
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
572
        else:
573
            if(exif_original_file_exists is True):
574
                # Move the newly processed file with any updated tags to the
575
                # destination directory
576
                shutil.move(_file, dest_path)
577
                # Move the exif _original back to the initial source file
578
                shutil.move(exif_original_file, _file)
579
            else:
580
                compatability._copyfile(_file, dest_path)
581
582
            # Set the utime based on what the original file contained 
583
            #  before we made any changes.
584
            # Then set the utime on the destination file based on metadata.
585
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
586
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
587
588
        db = Db()
589
        db.add_hash(checksum, dest_path)
590
        db.update_hash_db()
591
592
        # Run `after()` for every loaded plugin and if any of them raise an exception
593
        #  then we skip importing the file and log a message.
594
        plugins_run_after_status = self.plugins.run_all_after(_file, destination, dest_path, metadata)
595
        if(plugins_run_after_status == False):
596
            log.warn('At least one plugin pre-run failed for %s' % _file)
597
            return
598
599
600
        return dest_path
601
602
    def set_utime_from_metadata(self, metadata, file_path):
603
        """ Set the modification time on the file based on the file name.
604
        """
605
606
        # Initialize date taken to what's returned from the metadata function.
607
        # If the folder and file name follow a time format of
608
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
609
        date_taken = metadata['date_taken']
610
        base_name = metadata['base_name']
611
        year_month_day_match = re.search(
612
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
613
            base_name
614
        )
615
        if(year_month_day_match is not None):
616
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
617
            date_taken = time.strptime(
618
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
619
                '%Y-%m-%d %H:%M:%S'
620
            )
621
622
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
623
        else:
624
            # We don't make any assumptions about time zones and
625
            # assume local time zone.
626
            date_taken_in_seconds = time.mktime(date_taken)
627
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
628