Passed
Pull Request — master (#351)
by
unknown
01:59
created

elodie.filesystem.FileSystem.__init__()   A

Complexity

Conditions 1

Size

Total Lines 32
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 17
nop 1
dl 0
loc 32
rs 9.55
c 0
b 0
f 0
1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
from elodie.plugins.plugins import Plugins
21
from elodie.external.pyexiftool import ExifTool
22
from elodie.dependencies import get_exiftool
23
from elodie import constants
24
25
class FileSystem(object):
26
    """A class for interacting with the file system."""
27
28
    def __init__(self):
29
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
30
        self.default_file_name_definition = {
31
            'date': '%Y-%m-%d_%H-%M-%S',
32
            'name': '%date-%original_name-%title.%extension',
33
        }
34
        # The default folder path is along the lines of 2015-01-Jan/Chicago
35
        self.default_folder_path_definition = {
36
            'date': '%Y-%m-%b',
37
            'location': '%city',
38
            'full_path': '%date/%album|%location|"{}"'.format(
39
                            geolocation.__DEFAULT_LOCATION__
40
                         ),
41
        }
42
        self.cached_file_name_definition = None
43
        self.cached_folder_path_definition = None
44
        # Python3 treats the regex \s differently than Python2.
45
        # It captures some additional characters like the unicode checkmark \u2713.
46
        # See build failures in Python3 here.
47
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
48
        self.whitespace_regex = '[ \t\n\r\f\v]+'
49
50
        # Instantiate a plugins object
51
        self.plugins = Plugins()
52
53
        #Initialize ExifTool Subprocess
54
        exiftool_addedargs = [
55
            u'-config',
56
            u'"{}"'.format(constants.exiftool_config)
57
        ]
58
59
        ExifTool(executable_=get_exiftool(), addedargs=exiftool_addedargs).start()
60
61
    def create_directory(self, directory_path):
62
        """Create a directory if it does not already exist.
63
64
        :param str directory_name: A fully qualified path of the
65
            to create.
66
        :returns: bool
67
        """
68
        try:
69
            if os.path.exists(directory_path):
70
                return True
71
            else:
72
                os.makedirs(directory_path)
73
                return True
74
        except OSError:
75
            # OSError is thrown for cases like no permission
76
            pass
77
78
        return False
79
80
    def delete_directory_if_empty(self, directory_path):
81
        """Delete a directory only if it's empty.
82
83
        Instead of checking first using `len([name for name in
84
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
85
86
        :param str directory_name: A fully qualified path of the directory
87
            to delete.
88
        """
89
        try:
90
            os.rmdir(directory_path)
91
            return True
92
        except OSError:
93
            pass
94
95
        return False
96
97
    def get_all_files(self, path, extensions=None, exclude_regex_list=set()):
98
        """Recursively get all files which match a path and extension.
99
100
        :param str path string: Path to start recursive file listing
101
        :param tuple(str) extensions: File extensions to include (whitelist)
102
        :returns: generator
103
        """
104
        # If extensions is None then we get all supported extensions
105
        if not extensions:
106
            extensions = set()
107
            subclasses = get_all_subclasses(Base)
108
            for cls in subclasses:
109
                extensions.update(cls.extensions)
110
111
        # Create a list of compiled regular expressions to match against the file path
112
        compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list]
113
        for dirname, dirnames, filenames in os.walk(path):
114
            for filename in filenames:
115
                # If file extension is in `extensions` 
116
                # And if file path is not in exclude regexes
117
                # Then append to the list
118
                filename_path = os.path.join(dirname, filename)
119
                if (
120
                        os.path.splitext(filename)[1][1:].lower() in extensions and
121
                        not self.should_exclude(filename_path, compiled_regex_list, False)
122
                    ):
123
                    yield filename_path
124
125
    def get_current_directory(self):
126
        """Get the current working directory.
127
128
        :returns: str
129
        """
130
        return os.getcwd()
131
132
    def get_file_name(self, metadata):
133
        """Generate file name for a photo or video using its metadata.
134
135
        Originally we hardcoded the file name to include an ISO date format.
136
        We use an ISO8601-like format for the file name prefix. Instead of
137
        colons as the separator for hours, minutes and seconds we use a hyphen.
138
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
139
140
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
141
        https://github.com/jmathai/elodie/pull/225
142
143
        :param media: A Photo or Video instance
144
        :type media: :class:`~elodie.media.photo.Photo` or
145
            :class:`~elodie.media.video.Video`
146
        :returns: str or None for non-photo or non-videos
147
        """
148
        if(metadata is None):
149
            return None
150
151
        # Get the name template and definition.
152
        # Name template is in the form %date-%original_name-%title.%extension
153
        # Definition is in the form
154
        #  [
155
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
156
        #    [('original_name', '')], [('title', '')], // contains a fallback
157
        #    [('extension', '')]
158
        #  ]
159
        name_template, definition = self.get_file_name_definition()
160
161
        name = name_template
162
        for parts in definition:
163
            this_value = None
164
            for this_part in parts:
165
                part, mask = this_part
166
                if part in ('date', 'day', 'month', 'year'):
167
                    this_value = time.strftime(mask, metadata['date_taken'])
168
                    break
169
                elif part in ('location', 'city', 'state', 'country'):
170
                    place_name = geolocation.place_name(
171
                        metadata['latitude'],
172
                        metadata['longitude']
173
                    )
174
175
                    location_parts = re.findall('(%[^%]+)', mask)
176
                    this_value = self.parse_mask_for_location(
177
                        mask,
178
                        location_parts,
179
                        place_name,
180
                    )
181
                    break
182
                elif part in ('album', 'extension', 'title'):
183
                    if metadata[part]:
184
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
185
                        break
186
                elif part in ('original_name'):
187
                    # First we check if we have metadata['original_name'].
188
                    # We have to do this for backwards compatibility because
189
                    #   we original did not store this back into EXIF.
190
                    if metadata[part]:
191
                        this_value = os.path.splitext(metadata['original_name'])[0]
192
                    else:
193
                        # We didn't always store original_name so this is 
194
                        #  for backwards compatability.
195
                        # We want to remove the hardcoded date prefix we used 
196
                        #  to add to the name.
197
                        # This helps when re-running the program on file 
198
                        #  which were already processed.
199
                        this_value = re.sub(
200
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
201
                            '',
202
                            metadata['base_name']
203
                        )
204
                        if(len(this_value) == 0):
205
                            this_value = metadata['base_name']
206
207
                    # Lastly we want to sanitize the name
208
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
209
                elif part.startswith('"') and part.endswith('"'):
210
                    this_value = part[1:-1]
211
                    break
212
213
            # Here we replace the placeholder with it's corresponding value.
214
            # Check if this_value was not set so that the placeholder
215
            #  can be removed completely.
216
            # For example, %title- will be replaced with ''
217
            # Else replace the placeholder (i.e. %title) with the value.
218
            if this_value is None:
219
                name = re.sub(
220
                    #'[^a-z_]+%{}'.format(part),
221
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by
The variable part does not seem to be defined for all execution paths.
Loading history...
222
                    '',
223
                    name,
224
                )
225
            else:
226
                name = re.sub(
227
                    '%{}'.format(part),
228
                    this_value,
229
                    name,
230
                )
231
232
        config = load_config()
233
234
        if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
235
            return name.upper()
236
        else:
237
            return name.lower()
238
239
    def get_file_name_definition(self):
240
        """Returns a list of folder definitions.
241
242
        Each element in the list represents a folder.
243
        Fallback folders are supported and are nested lists.
244
        Return values take the following form.
245
        [
246
            ('date', '%Y-%m-%d'),
247
            [
248
                ('location', '%city'),
249
                ('album', ''),
250
                ('"Unknown Location", '')
251
            ]
252
        ]
253
254
        :returns: list
255
        """
256
        # If we've done this already then return it immediately without
257
        # incurring any extra work
258
        if self.cached_file_name_definition is not None:
259
            return self.cached_file_name_definition
260
261
        config = load_config()
262
263
        # If File is in the config we assume name and its
264
        #  corresponding values are also present
265
        config_file = self.default_file_name_definition
266
        if('File' in config):
267
            config_file = config['File']
268
269
        # Find all subpatterns of name that map to the components of the file's
270
        #  name.
271
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
272
        path_parts = re.findall(
273
                         '(\%[a-z_]+)',
274
                         config_file['name']
275
                     )
276
277
        if not path_parts or len(path_parts) == 0:
278
            return (config_file['name'], self.default_file_name_definition)
279
280
        self.cached_file_name_definition = []
281
        for part in path_parts:
282
            if part in config_file:
283
                part = part[1:]
284
                self.cached_file_name_definition.append(
285
                    [(part, config_file[part])]
286
                )
287
            else:
288
                this_part = []
289
                for p in part.split('|'):
290
                    p = p[1:]
291
                    this_part.append(
292
                        (p, config_file[p] if p in config_file else '')
293
                    )
294
                self.cached_file_name_definition.append(this_part)
295
296
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
297
        return self.cached_file_name_definition
298
299
    def get_folder_path_definition(self):
300
        """Returns a list of folder definitions.
301
302
        Each element in the list represents a folder.
303
        Fallback folders are supported and are nested lists.
304
        Return values take the following form.
305
        [
306
            ('date', '%Y-%m-%d'),
307
            [
308
                ('location', '%city'),
309
                ('album', ''),
310
                ('"Unknown Location", '')
311
            ]
312
        ]
313
314
        :returns: list
315
        """
316
        # If we've done this already then return it immediately without
317
        # incurring any extra work
318
        if self.cached_folder_path_definition is not None:
319
            return self.cached_folder_path_definition
320
321
        config = load_config()
322
323
        # If Directory is in the config we assume full_path and its
324
        #  corresponding values (date, location) are also present
325
        config_directory = self.default_folder_path_definition
326
        if('Directory' in config):
327
            config_directory = config['Directory']
328
329
        # Find all subpatterns of full_path that map to directories.
330
        #  I.e. %foo/%bar => ['foo', 'bar']
331
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
332
        path_parts = re.findall(
333
                         '(\%[^/]+)',
334
                         config_directory['full_path']
335
                     )
336
337
        if not path_parts or len(path_parts) == 0:
338
            return self.default_folder_path_definition
339
340
        self.cached_folder_path_definition = []
341
        for part in path_parts:
342
            part = part.replace('%', '')
343
            if part in config_directory:
344
                self.cached_folder_path_definition.append(
345
                    [(part, config_directory[part])]
346
                )
347
            else:
348
                this_part = []
349
                for p in part.split('|'):
350
                    this_part.append(
351
                        (p, config_directory[p] if p in config_directory else '')
352
                    )
353
                self.cached_folder_path_definition.append(this_part)
354
355
        return self.cached_folder_path_definition
356
357
    def get_folder_path(self, metadata, path_parts=None):
358
        """Given a media's metadata this function returns the folder path as a string.
359
360
        :param dict metadata: Metadata dictionary.
361
        :returns: str
362
        """
363
        if path_parts is None:
364
            path_parts = self.get_folder_path_definition()
365
        path = []
366
        for path_part in path_parts:
367
            # We support fallback values so that
368
            #  'album|city|"Unknown Location"
369
            #  %album|%city|"Unknown Location" results in
370
            #  My Album - when an album exists
371
            #  Sunnyvale - when no album exists but a city exists
372
            #  Unknown Location - when neither an album nor location exist
373
            for this_part in path_part:
374
                part, mask = this_part
375
                this_path = self.get_dynamic_path(part, mask, metadata)
376
                if this_path:
377
                    path.append(this_path.strip())
378
                    # We break as soon as we have a value to append
379
                    # Else we continue for fallbacks
380
                    break
381
        return os.path.join(*path)
382
383
    def get_dynamic_path(self, part, mask, metadata):
384
        """Parse a specific folder's name given a mask and metadata.
385
386
        :param part: Name of the part as defined in the path (i.e. date from %date)
387
        :param mask: Mask representing the template for the path (i.e. %city %state
388
        :param metadata: Metadata dictionary.
389
        :returns: str
390
        """
391
392
        # Each part has its own custom logic and we evaluate a single part and return
393
        #  the evaluated string.
394
        if part in ('custom'):
395
            custom_parts = re.findall('(%[a-z_]+)', mask)
396
            folder = mask
397
            for i in custom_parts:
398
                folder = folder.replace(
399
                    i,
400
                    self.get_dynamic_path(i[1:], i, metadata)
401
                )
402
            return folder
403
        elif part in ('date'):
404
            config = load_config()
405
            # If Directory is in the config we assume full_path and its
406
            #  corresponding values (date, location) are also present
407
            config_directory = self.default_folder_path_definition
408
            if('Directory' in config):
409
                config_directory = config['Directory']
410
            date_mask = ''
411
            if 'date' in config_directory:
412
                date_mask = config_directory['date']
413
            return time.strftime(date_mask, metadata['date_taken'])
414
        elif part in ('day', 'month', 'year'):
415
            return time.strftime(mask, metadata['date_taken'])
416
        elif part in ('location', 'city', 'state', 'country'):
417
            place_name = geolocation.place_name(
418
                metadata['latitude'],
419
                metadata['longitude']
420
            )
421
422
            location_parts = re.findall('(%[^%]+)', mask)
423
            parsed_folder_name = self.parse_mask_for_location(
424
                mask,
425
                location_parts,
426
                place_name,
427
            )
428
            return parsed_folder_name
429
        elif part in ('album', 'camera_make', 'camera_model'):
430
            if metadata[part]:
431
                return metadata[part]
432
        elif part.startswith('"') and part.endswith('"'):
433
            # Fallback string
434
            return part[1:-1]
435
436
        return ''
437
438
    def parse_mask_for_location(self, mask, location_parts, place_name):
439
        """Takes a mask for a location and interpolates the actual place names.
440
441
        Given these parameters here are the outputs.
442
443
        mask=%city
444
        location_parts=[('%city','%city','city')]
445
        place_name={'city': u'Sunnyvale'}
446
        output=Sunnyvale
447
448
        mask=%city-%state
449
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
450
        place_name={'city': u'Sunnyvale', 'state': u'California'}
451
        output=Sunnyvale-California
452
453
        mask=%country
454
        location_parts=[('%country','%country','country')]
455
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
456
        output=Sunnyvale
457
458
459
        :param str mask: The location mask in the form of %city-%state, etc
460
        :param list location_parts: A list of tuples in the form of
461
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
462
        :param dict place_name: A dictionary of place keywords and names like
463
            {'default': u'California', 'state': u'California'}
464
        :returns: str
465
        """
466
        found = False
467
        folder_name = mask
468
        for loc_part in location_parts:
469
            # We assume the search returns a tuple of length 2.
470
            # If not then it's a bad mask in config.ini.
471
            # loc_part = '%country-random'
472
            # component_full = '%country-random'
473
            # component = '%country'
474
            # key = 'country
475
            component_full, component, key = re.search(
476
                '((%([a-z]+))[^%]*)',
477
                loc_part
478
            ).groups()
479
480
            if(key in place_name):
481
                found = True
482
                replace_target = component
483
                replace_with = place_name[key]
484
            else:
485
                replace_target = component_full
486
                replace_with = ''
487
488
            folder_name = folder_name.replace(
489
                replace_target,
490
                replace_with,
491
            )
492
493
        if(not found and folder_name == ''):
494
            folder_name = place_name['default']
495
496
        return folder_name
497
498
    def process_checksum(self, _file, allow_duplicate):
499
        db = Db()
500
        checksum = db.checksum(_file)
501
        if(checksum is None):
502
            log.info('Could not get checksum for %s.' % _file)
503
            return None
504
505
        # If duplicates are not allowed then we check if we've seen this file
506
        #  before via checksum. We also check that the file exists at the
507
        #   location we believe it to be.
508
        # If we find a checksum match but the file doesn't exist where we
509
        #  believe it to be then we write a debug log and proceed to import.
510
        checksum_file = db.get_hash(checksum)
511
        if(allow_duplicate is False and checksum_file is not None):
512
            if(os.path.isfile(checksum_file)):
513
                log.info('%s already at %s.' % (
514
                    _file,
515
                    checksum_file
516
                ))
517
                return None
518
            else:
519
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
520
                    _file,
521
                    checksum_file
522
                ))
523
        return checksum
524
525
    def process_file(self, _file, destination, media, **kwargs):
526
        move = False
527
        if('move' in kwargs):
528
            move = kwargs['move']
529
530
        allow_duplicate = False
531
        if('allowDuplicate' in kwargs):
532
            allow_duplicate = kwargs['allowDuplicate']
533
534
        stat_info_original = os.stat(_file)
535
        metadata = media.get_metadata()
536
537
        if(not media.is_valid()):
538
            print('%s is not a valid media file. Skipping...' % _file)
539
            return
540
541
        checksum = self.process_checksum(_file, allow_duplicate)
542
        if(checksum is None):
543
            log.info('Original checksum returned None for %s. Skipping...' %
544
                     _file)
545
            return
546
547
        # Run `before()` for every loaded plugin and if any of them raise an exception
548
        #  then we skip importing the file and log a message.
549
        plugins_run_before_status = self.plugins.run_all_before(_file, destination)
550
        if(plugins_run_before_status == False):
551
            log.warn('At least one plugin pre-run failed for %s' % _file)
552
            return
553
554
        directory_name = self.get_folder_path(metadata)
555
        dest_directory = os.path.join(destination, directory_name)
556
        file_name = self.get_file_name(metadata)
557
        dest_path = os.path.join(dest_directory, file_name)
558
559
        media.set_original_name()
560
561
        # If source and destination are identical then
562
        #  we should not write the file. gh-210
563
        if(_file == dest_path):
564
            print('Final source and destination path should not be identical')
565
            return
566
567
        self.create_directory(dest_directory)
568
569
        # exiftool renames the original file by appending '_original' to the
570
        # file name. A new file is written with new tags with the initial file
571
        # name. See exiftool man page for more details.
572
        exif_original_file = _file + '_original'
573
574
        # Check if the source file was processed by exiftool and an _original
575
        # file was created.
576
        exif_original_file_exists = False
577
        if(os.path.exists(exif_original_file)):
578
            exif_original_file_exists = True
579
580
        if(move is True):
581
            stat = os.stat(_file)
582
            # Move the processed file into the destination directory
583
            shutil.move(_file, dest_path)
584
585
            if(exif_original_file_exists is True):
586
                # We can remove it as we don't need the initial file.
587
                os.remove(exif_original_file)
588
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
589
        else:
590
            if(exif_original_file_exists is True):
591
                # Move the newly processed file with any updated tags to the
592
                # destination directory
593
                shutil.move(_file, dest_path)
594
                # Move the exif _original back to the initial source file
595
                shutil.move(exif_original_file, _file)
596
            else:
597
                compatability._copyfile(_file, dest_path)
598
599
            # Set the utime based on what the original file contained 
600
            #  before we made any changes.
601
            # Then set the utime on the destination file based on metadata.
602
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
603
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
604
605
        db = Db()
606
        db.add_hash(checksum, dest_path)
607
        db.update_hash_db()
608
609
        # Run `after()` for every loaded plugin and if any of them raise an exception
610
        #  then we skip importing the file and log a message.
611
        plugins_run_after_status = self.plugins.run_all_after(_file, destination, dest_path, metadata)
612
        if(plugins_run_after_status == False):
613
            log.warn('At least one plugin pre-run failed for %s' % _file)
614
            return
615
616
617
        return dest_path
618
619
    def set_utime_from_metadata(self, metadata, file_path):
620
        """ Set the modification time on the file based on the file name.
621
        """
622
623
        # Initialize date taken to what's returned from the metadata function.
624
        # If the folder and file name follow a time format of
625
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
626
        date_taken = metadata['date_taken']
627
        base_name = metadata['base_name']
628
        year_month_day_match = re.search(
629
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
630
            base_name
631
        )
632
        if(year_month_day_match is not None):
633
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
634
            date_taken = time.strptime(
635
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
636
                '%Y-%m-%d %H:%M:%S'
637
            )
638
639
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
640
        else:
641
            # We don't make any assumptions about time zones and
642
            # assume local time zone.
643
            date_taken_in_seconds = time.mktime(date_taken)
644
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
645
646
    def should_exclude(self, path, regex_list=set(), needs_compiled=False):
647
        if(len(regex_list) == 0):
648
            return False
649
650
        if(needs_compiled):
651
            compiled_list = []
652
            for regex in regex_list:
653
                compiled_list.append(re.compile(regex))
654
            regex_list = compiled_list
655
656
        return any(regex.search(path) for regex in regex_list)
657
658
    def __del__(self):
659
        ExifTool().terminate