Passed
Push — master ( 9e42ed...b2d36d )
by Jaisen
02:09
created

elodie.filesystem.FileSystem.process_file()   D

Complexity

Conditions 12

Size

Total Lines 94
Code Lines 54

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 54
nop 5
dl 0
loc 94
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like elodie.filesystem.FileSystem.process_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
from elodie.plugins.plugins import Plugins
21
22
23
class FileSystem(object):
24
    """A class for interacting with the file system."""
25
26
    def __init__(self):
27
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
28
        self.default_file_name_definition = {
29
            'date': '%Y-%m-%d_%H-%M-%S',
30
            'name': '%date-%original_name-%title.%extension',
31
        }
32
        # The default folder path is along the lines of 2015-01-Jan/Chicago
33
        self.default_folder_path_definition = {
34
            'date': '%Y-%m-%b',
35
            'location': '%city',
36
            'full_path': '%date/%album|%location|"{}"'.format(
37
                            geolocation.__DEFAULT_LOCATION__
38
                         ),
39
        }
40
        self.cached_file_name_definition = None
41
        self.cached_folder_path_definition = None
42
        # Python3 treats the regex \s differently than Python2.
43
        # It captures some additional characters like the unicode checkmark \u2713.
44
        # See build failures in Python3 here.
45
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
46
        self.whitespace_regex = '[ \t\n\r\f\v]+'
47
48
        # Instantiate a plugins object
49
        self.plugins = Plugins()
50
51
52
    def create_directory(self, directory_path):
53
        """Create a directory if it does not already exist.
54
55
        :param str directory_name: A fully qualified path of the
56
            to create.
57
        :returns: bool
58
        """
59
        try:
60
            if os.path.exists(directory_path):
61
                return True
62
            else:
63
                os.makedirs(directory_path)
64
                return True
65
        except OSError:
66
            # OSError is thrown for cases like no permission
67
            pass
68
69
        return False
70
71
    def delete_directory_if_empty(self, directory_path):
72
        """Delete a directory only if it's empty.
73
74
        Instead of checking first using `len([name for name in
75
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
76
77
        :param str directory_name: A fully qualified path of the directory
78
            to delete.
79
        """
80
        try:
81
            os.rmdir(directory_path)
82
            return True
83
        except OSError:
84
            pass
85
86
        return False
87
88
    def get_all_files(self, path, extensions=None):
89
        """Recursively get all files which match a path and extension.
90
91
        :param str path string: Path to start recursive file listing
92
        :param tuple(str) extensions: File extensions to include (whitelist)
93
        :returns: generator
94
        """
95
        # If extensions is None then we get all supported extensions
96
        if not extensions:
97
            extensions = set()
98
            subclasses = get_all_subclasses(Base)
99
            for cls in subclasses:
100
                extensions.update(cls.extensions)
101
102
        for dirname, dirnames, filenames in os.walk(path):
103
            for filename in filenames:
104
                # If file extension is in `extensions` then append to the list
105
                if os.path.splitext(filename)[1][1:].lower() in extensions:
106
                    yield os.path.join(dirname, filename)
107
108
    def get_current_directory(self):
109
        """Get the current working directory.
110
111
        :returns: str
112
        """
113
        return os.getcwd()
114
115
    def get_file_name(self, media):
116
        """Generate file name for a photo or video using its metadata.
117
118
        Originally we hardcoded the file name to include an ISO date format.
119
        We use an ISO8601-like format for the file name prefix. Instead of
120
        colons as the separator for hours, minutes and seconds we use a hyphen.
121
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
122
123
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
124
        https://github.com/jmathai/elodie/pull/225
125
126
        :param media: A Photo or Video instance
127
        :type media: :class:`~elodie.media.photo.Photo` or
128
            :class:`~elodie.media.video.Video`
129
        :returns: str or None for non-photo or non-videos
130
        """
131
        if(not media.is_valid()):
132
            return None
133
134
        metadata = media.get_metadata()
135
        if(metadata is None):
136
            return None
137
138
        # Get the name template and definition.
139
        # Name template is in the form %date-%original_name-%title.%extension
140
        # Definition is in the form
141
        #  [
142
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
143
        #    [('original_name', '')], [('title', '')], // contains a fallback
144
        #    [('extension', '')]
145
        #  ]
146
        name_template, definition = self.get_file_name_definition()
147
148
        name = name_template
149
        for parts in definition:
150
            this_value = None
151
            for this_part in parts:
152
                part, mask = this_part
153
                if part in ('date', 'day', 'month', 'year'):
154
                    this_value = time.strftime(mask, metadata['date_taken'])
155
                    break
156
                elif part in ('location', 'city', 'state', 'country'):
157
                    place_name = geolocation.place_name(
158
                        metadata['latitude'],
159
                        metadata['longitude']
160
                    )
161
162
                    location_parts = re.findall('(%[^%]+)', mask)
163
                    this_value = self.parse_mask_for_location(
164
                        mask,
165
                        location_parts,
166
                        place_name,
167
                    )
168
                    break
169
                elif part in ('album', 'extension', 'title'):
170
                    if metadata[part]:
171
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
172
                        break
173
                elif part in ('original_name'):
174
                    # First we check if we have metadata['original_name'].
175
                    # We have to do this for backwards compatibility because
176
                    #   we original did not store this back into EXIF.
177
                    if metadata[part]:
178
                        this_value = os.path.splitext(metadata['original_name'])[0]
179
                    else:
180
                        # We didn't always store original_name so this is 
181
                        #  for backwards compatability.
182
                        # We want to remove the hardcoded date prefix we used 
183
                        #  to add to the name.
184
                        # This helps when re-running the program on file 
185
                        #  which were already processed.
186
                        this_value = re.sub(
187
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
188
                            '',
189
                            metadata['base_name']
190
                        )
191
                        if(len(this_value) == 0):
192
                            this_value = metadata['base_name']
193
194
                    # Lastly we want to sanitize the name
195
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
196
                elif part.startswith('"') and part.endswith('"'):
197
                    this_value = part[1:-1]
198
                    break
199
200
            # Here we replace the placeholder with it's corresponding value.
201
            # Check if this_value was not set so that the placeholder
202
            #  can be removed completely.
203
            # For example, %title- will be replaced with ''
204
            # Else replace the placeholder (i.e. %title) with the value.
205
            if this_value is None:
206
                name = re.sub(
207
                    #'[^a-z_]+%{}'.format(part),
208
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by
The variable part does not seem to be defined for all execution paths.
Loading history...
209
                    '',
210
                    name,
211
                )
212
            else:
213
                name = re.sub(
214
                    '%{}'.format(part),
215
                    this_value,
216
                    name,
217
                )
218
219
        config = load_config()
220
221
        if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
222
            return name.upper()
223
        else:
224
            return name.lower()
225
226
    def get_file_name_definition(self):
227
        """Returns a list of folder definitions.
228
229
        Each element in the list represents a folder.
230
        Fallback folders are supported and are nested lists.
231
        Return values take the following form.
232
        [
233
            ('date', '%Y-%m-%d'),
234
            [
235
                ('location', '%city'),
236
                ('album', ''),
237
                ('"Unknown Location", '')
238
            ]
239
        ]
240
241
        :returns: list
242
        """
243
        # If we've done this already then return it immediately without
244
        # incurring any extra work
245
        if self.cached_file_name_definition is not None:
246
            return self.cached_file_name_definition
247
248
        config = load_config()
249
250
        # If File is in the config we assume name and its
251
        #  corresponding values are also present
252
        config_file = self.default_file_name_definition
253
        if('File' in config):
254
            config_file = config['File']
255
256
        # Find all subpatterns of name that map to the components of the file's
257
        #  name.
258
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
259
        path_parts = re.findall(
260
                         '(\%[a-z_]+)',
261
                         config_file['name']
262
                     )
263
264
        if not path_parts or len(path_parts) == 0:
265
            return (config_file['name'], self.default_file_name_definition)
266
267
        self.cached_file_name_definition = []
268
        for part in path_parts:
269
            if part in config_file:
270
                part = part[1:]
271
                self.cached_file_name_definition.append(
272
                    [(part, config_file[part])]
273
                )
274
            else:
275
                this_part = []
276
                for p in part.split('|'):
277
                    p = p[1:]
278
                    this_part.append(
279
                        (p, config_file[p] if p in config_file else '')
280
                    )
281
                self.cached_file_name_definition.append(this_part)
282
283
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
284
        return self.cached_file_name_definition
285
286
    def get_folder_path_definition(self):
287
        """Returns a list of folder definitions.
288
289
        Each element in the list represents a folder.
290
        Fallback folders are supported and are nested lists.
291
        Return values take the following form.
292
        [
293
            ('date', '%Y-%m-%d'),
294
            [
295
                ('location', '%city'),
296
                ('album', ''),
297
                ('"Unknown Location", '')
298
            ]
299
        ]
300
301
        :returns: list
302
        """
303
        # If we've done this already then return it immediately without
304
        # incurring any extra work
305
        if self.cached_folder_path_definition is not None:
306
            return self.cached_folder_path_definition
307
308
        config = load_config()
309
310
        # If Directory is in the config we assume full_path and its
311
        #  corresponding values (date, location) are also present
312
        config_directory = self.default_folder_path_definition
313
        if('Directory' in config):
314
            config_directory = config['Directory']
315
316
        # Find all subpatterns of full_path that map to directories.
317
        #  I.e. %foo/%bar => ['foo', 'bar']
318
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
319
        path_parts = re.findall(
320
                         '(\%[^/]+)',
321
                         config_directory['full_path']
322
                     )
323
324
        if not path_parts or len(path_parts) == 0:
325
            return self.default_folder_path_definition
326
327
        self.cached_folder_path_definition = []
328
        for part in path_parts:
329
            part = part.replace('%', '')
330
            if part in config_directory:
331
                self.cached_folder_path_definition.append(
332
                    [(part, config_directory[part])]
333
                )
334
            else:
335
                this_part = []
336
                for p in part.split('|'):
337
                    this_part.append(
338
                        (p, config_directory[p] if p in config_directory else '')
339
                    )
340
                self.cached_folder_path_definition.append(this_part)
341
342
        return self.cached_folder_path_definition
343
344
    def get_folder_path(self, metadata, path_parts=None):
345
        """Given a media's metadata this function returns the folder path as a string.
346
347
        :param dict metadata: Metadata dictionary.
348
        :returns: str
349
        """
350
        if path_parts is None:
351
            path_parts = self.get_folder_path_definition()
352
        path = []
353
        for path_part in path_parts:
354
            # We support fallback values so that
355
            #  'album|city|"Unknown Location"
356
            #  %album|%city|"Unknown Location" results in
357
            #  My Album - when an album exists
358
            #  Sunnyvale - when no album exists but a city exists
359
            #  Unknown Location - when neither an album nor location exist
360
            for this_part in path_part:
361
                part, mask = this_part
362
                this_path = self.get_dynamic_path(part, mask, metadata)
363
                if this_path:
364
                    path.append(this_path.strip())
365
                    # We break as soon as we have a value to append
366
                    # Else we continue for fallbacks
367
                    break
368
        return os.path.join(*path)
369
370
    def get_dynamic_path(self, part, mask, metadata):
371
        """Parse a specific folder's name given a mask and metadata.
372
373
        :param part: Name of the part as defined in the path (i.e. date from %date)
374
        :param mask: Mask representing the template for the path (i.e. %city %state
375
        :param metadata: Metadata dictionary.
376
        :returns: str
377
        """
378
379
        # Each part has its own custom logic and we evaluate a single part and return
380
        #  the evaluated string.
381
        if part in ('custom'):
382
            custom_parts = re.findall('(%[a-z_]+)', mask)
383
            folder = mask
384
            for i in custom_parts:
385
                folder = folder.replace(
386
                    i,
387
                    self.get_dynamic_path(i[1:], i, metadata)
388
                )
389
            return folder
390
        elif part in ('date'):
391
            config = load_config()
392
            # If Directory is in the config we assume full_path and its
393
            #  corresponding values (date, location) are also present
394
            config_directory = self.default_folder_path_definition
395
            if('Directory' in config):
396
                config_directory = config['Directory']
397
            date_mask = ''
398
            if 'date' in config_directory:
399
                date_mask = config_directory['date']
400
            return time.strftime(date_mask, metadata['date_taken'])
401
        elif part in ('day', 'month', 'year'):
402
            return time.strftime(mask, metadata['date_taken'])
403
        elif part in ('location', 'city', 'state', 'country'):
404
            place_name = geolocation.place_name(
405
                metadata['latitude'],
406
                metadata['longitude']
407
            )
408
409
            location_parts = re.findall('(%[^%]+)', mask)
410
            parsed_folder_name = self.parse_mask_for_location(
411
                mask,
412
                location_parts,
413
                place_name,
414
            )
415
            return parsed_folder_name
416
        elif part in ('album', 'camera_make', 'camera_model'):
417
            if metadata[part]:
418
                return metadata[part]
419
        elif part.startswith('"') and part.endswith('"'):
420
            # Fallback string
421
            return part[1:-1]
422
423
        return ''
424
425
    def parse_mask_for_location(self, mask, location_parts, place_name):
426
        """Takes a mask for a location and interpolates the actual place names.
427
428
        Given these parameters here are the outputs.
429
430
        mask=%city
431
        location_parts=[('%city','%city','city')]
432
        place_name={'city': u'Sunnyvale'}
433
        output=Sunnyvale
434
435
        mask=%city-%state
436
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
437
        place_name={'city': u'Sunnyvale', 'state': u'California'}
438
        output=Sunnyvale-California
439
440
        mask=%country
441
        location_parts=[('%country','%country','country')]
442
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
443
        output=Sunnyvale
444
445
446
        :param str mask: The location mask in the form of %city-%state, etc
447
        :param list location_parts: A list of tuples in the form of
448
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
449
        :param dict place_name: A dictionary of place keywords and names like
450
            {'default': u'California', 'state': u'California'}
451
        :returns: str
452
        """
453
        found = False
454
        folder_name = mask
455
        for loc_part in location_parts:
456
            # We assume the search returns a tuple of length 2.
457
            # If not then it's a bad mask in config.ini.
458
            # loc_part = '%country-random'
459
            # component_full = '%country-random'
460
            # component = '%country'
461
            # key = 'country
462
            component_full, component, key = re.search(
463
                '((%([a-z]+))[^%]*)',
464
                loc_part
465
            ).groups()
466
467
            if(key in place_name):
468
                found = True
469
                replace_target = component
470
                replace_with = place_name[key]
471
            else:
472
                replace_target = component_full
473
                replace_with = ''
474
475
            folder_name = folder_name.replace(
476
                replace_target,
477
                replace_with,
478
            )
479
480
        if(not found and folder_name == ''):
481
            folder_name = place_name['default']
482
483
        return folder_name
484
485
    def process_checksum(self, _file, allow_duplicate):
486
        db = Db()
487
        checksum = db.checksum(_file)
488
        if(checksum is None):
489
            log.info('Could not get checksum for %s.' % _file)
490
            return None
491
492
        # If duplicates are not allowed then we check if we've seen this file
493
        #  before via checksum. We also check that the file exists at the
494
        #   location we believe it to be.
495
        # If we find a checksum match but the file doesn't exist where we
496
        #  believe it to be then we write a debug log and proceed to import.
497
        checksum_file = db.get_hash(checksum)
498
        if(allow_duplicate is False and checksum_file is not None):
499
            if(os.path.isfile(checksum_file)):
500
                log.info('%s already at %s.' % (
501
                    _file,
502
                    checksum_file
503
                ))
504
                return None
505
            else:
506
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
507
                    _file,
508
                    checksum_file
509
                ))
510
        return checksum
511
512
    def process_file(self, _file, destination, media, **kwargs):
513
        move = False
514
        if('move' in kwargs):
515
            move = kwargs['move']
516
517
        allow_duplicate = False
518
        if('allowDuplicate' in kwargs):
519
            allow_duplicate = kwargs['allowDuplicate']
520
521
        stat_info_original = os.stat(_file)
522
523
        if(not media.is_valid()):
524
            print('%s is not a valid media file. Skipping...' % _file)
525
            return
526
527
        checksum = self.process_checksum(_file, allow_duplicate)
528
        if(checksum is None):
529
            log.info('Original checksum returned None for %s. Skipping...' %
530
                     _file)
531
            return
532
533
        # Run `before()` for every loaded plugin and if any of them raise an exception
534
        #  then we skip importing the file and log a message.
535
        plugins_run_before_status = self.plugins.run_all_before(_file, destination)
536
        if(plugins_run_before_status == False):
537
            log.warn('At least one plugin pre-run failed for %s' % _file)
538
            return
539
540
        media.set_original_name()
541
        metadata = media.get_metadata()
542
543
        directory_name = self.get_folder_path(metadata)
544
545
        dest_directory = os.path.join(destination, directory_name)
546
        file_name = self.get_file_name(media)
547
        dest_path = os.path.join(dest_directory, file_name)
548
549
        # If source and destination are identical then
550
        #  we should not write the file. gh-210
551
        if(_file == dest_path):
552
            print('Final source and destination path should not be identical')
553
            return
554
555
        self.create_directory(dest_directory)
556
557
        # exiftool renames the original file by appending '_original' to the
558
        # file name. A new file is written with new tags with the initial file
559
        # name. See exiftool man page for more details.
560
        exif_original_file = _file + '_original'
561
562
        # Check if the source file was processed by exiftool and an _original
563
        # file was created.
564
        exif_original_file_exists = False
565
        if(os.path.exists(exif_original_file)):
566
            exif_original_file_exists = True
567
568
        if(move is True):
569
            stat = os.stat(_file)
570
            # Move the processed file into the destination directory
571
            shutil.move(_file, dest_path)
572
573
            if(exif_original_file_exists is True):
574
                # We can remove it as we don't need the initial file.
575
                os.remove(exif_original_file)
576
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
577
        else:
578
            if(exif_original_file_exists is True):
579
                # Move the newly processed file with any updated tags to the
580
                # destination directory
581
                shutil.move(_file, dest_path)
582
                # Move the exif _original back to the initial source file
583
                shutil.move(exif_original_file, _file)
584
            else:
585
                compatability._copyfile(_file, dest_path)
586
587
            # Set the utime based on what the original file contained 
588
            #  before we made any changes.
589
            # Then set the utime on the destination file based on metadata.
590
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
591
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
592
593
        db = Db()
594
        db.add_hash(checksum, dest_path)
595
        db.update_hash_db()
596
597
        # Run `after()` for every loaded plugin and if any of them raise an exception
598
        #  then we skip importing the file and log a message.
599
        plugins_run_after_status = self.plugins.run_all_after(_file, destination, dest_path, metadata)
600
        if(plugins_run_after_status == False):
601
            log.warn('At least one plugin pre-run failed for %s' % _file)
602
            return
603
604
605
        return dest_path
606
607
    def set_utime_from_metadata(self, metadata, file_path):
608
        """ Set the modification time on the file based on the file name.
609
        """
610
611
        # Initialize date taken to what's returned from the metadata function.
612
        # If the folder and file name follow a time format of
613
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
614
        date_taken = metadata['date_taken']
615
        base_name = metadata['base_name']
616
        year_month_day_match = re.search(
617
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
618
            base_name
619
        )
620
        if(year_month_day_match is not None):
621
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
622
            date_taken = time.strptime(
623
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
624
                '%Y-%m-%d %H:%M:%S'
625
            )
626
627
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
628
        else:
629
            # We don't make any assumptions about time zones and
630
            # assume local time zone.
631
            date_taken_in_seconds = time.mktime(date_taken)
632
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
633