Passed
Push — master ( 3ad6c0...75e659 )
by Jaisen
01:58
created

elodie.filesystem.FileSystem.should_exclude()   A

Complexity

Conditions 4

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
nop 4
dl 0
loc 11
rs 9.95
c 0
b 0
f 0
1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
from elodie.plugins.plugins import Plugins
21
22
23
class FileSystem(object):
24
    """A class for interacting with the file system."""
25
26
    def __init__(self):
27
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
28
        self.default_file_name_definition = {
29
            'date': '%Y-%m-%d_%H-%M-%S',
30
            'name': '%date-%original_name-%title.%extension',
31
        }
32
        # The default folder path is along the lines of 2015-01-Jan/Chicago
33
        self.default_folder_path_definition = {
34
            'date': '%Y-%m-%b',
35
            'location': '%city',
36
            'full_path': '%date/%album|%location|"{}"'.format(
37
                            geolocation.__DEFAULT_LOCATION__
38
                         ),
39
        }
40
        self.cached_file_name_definition = None
41
        self.cached_folder_path_definition = None
42
        # Python3 treats the regex \s differently than Python2.
43
        # It captures some additional characters like the unicode checkmark \u2713.
44
        # See build failures in Python3 here.
45
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
46
        self.whitespace_regex = '[ \t\n\r\f\v]+'
47
48
        # Instantiate a plugins object
49
        self.plugins = Plugins()
50
51
52
    def create_directory(self, directory_path):
53
        """Create a directory if it does not already exist.
54
55
        :param str directory_name: A fully qualified path of the
56
            to create.
57
        :returns: bool
58
        """
59
        try:
60
            if os.path.exists(directory_path):
61
                return True
62
            else:
63
                os.makedirs(directory_path)
64
                return True
65
        except OSError:
66
            # OSError is thrown for cases like no permission
67
            pass
68
69
        return False
70
71
    def delete_directory_if_empty(self, directory_path):
72
        """Delete a directory only if it's empty.
73
74
        Instead of checking first using `len([name for name in
75
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
76
77
        :param str directory_name: A fully qualified path of the directory
78
            to delete.
79
        """
80
        try:
81
            os.rmdir(directory_path)
82
            return True
83
        except OSError:
84
            pass
85
86
        return False
87
88
    def get_all_files(self, path, extensions=None, exclude_regex_list=set()):
89
        """Recursively get all files which match a path and extension.
90
91
        :param str path string: Path to start recursive file listing
92
        :param tuple(str) extensions: File extensions to include (whitelist)
93
        :returns: generator
94
        """
95
        # If extensions is None then we get all supported extensions
96
        if not extensions:
97
            extensions = set()
98
            subclasses = get_all_subclasses(Base)
99
            for cls in subclasses:
100
                extensions.update(cls.extensions)
101
102
        # Create a list of compiled regular expressions to match against the file path
103
        compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list]
104
        for dirname, dirnames, filenames in os.walk(path):
105
            for filename in filenames:
106
                # If file extension is in `extensions` 
107
                # And if file path is not in exclude regexes
108
                # Then append to the list
109
                filename_path = os.path.join(dirname, filename)
110
                if (
111
                        os.path.splitext(filename)[1][1:].lower() in extensions and
112
                        not self.should_exclude(filename_path, compiled_regex_list, False)
113
                    ):
114
                    yield filename_path
115
116
    def get_current_directory(self):
117
        """Get the current working directory.
118
119
        :returns: str
120
        """
121
        return os.getcwd()
122
123
    def get_file_name(self, metadata):
124
        """Generate file name for a photo or video using its metadata.
125
126
        Originally we hardcoded the file name to include an ISO date format.
127
        We use an ISO8601-like format for the file name prefix. Instead of
128
        colons as the separator for hours, minutes and seconds we use a hyphen.
129
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
130
131
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
132
        https://github.com/jmathai/elodie/pull/225
133
134
        :param media: A Photo or Video instance
135
        :type media: :class:`~elodie.media.photo.Photo` or
136
            :class:`~elodie.media.video.Video`
137
        :returns: str or None for non-photo or non-videos
138
        """
139
        if(metadata is None):
140
            return None
141
142
        # Get the name template and definition.
143
        # Name template is in the form %date-%original_name-%title.%extension
144
        # Definition is in the form
145
        #  [
146
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
147
        #    [('original_name', '')], [('title', '')], // contains a fallback
148
        #    [('extension', '')]
149
        #  ]
150
        name_template, definition = self.get_file_name_definition()
151
152
        name = name_template
153
        for parts in definition:
154
            this_value = None
155
            for this_part in parts:
156
                part, mask = this_part
157
                if part in ('date', 'day', 'month', 'year'):
158
                    this_value = time.strftime(mask, metadata['date_taken'])
159
                    break
160
                elif part in ('location', 'city', 'state', 'country'):
161
                    place_name = geolocation.place_name(
162
                        metadata['latitude'],
163
                        metadata['longitude']
164
                    )
165
166
                    location_parts = re.findall('(%[^%]+)', mask)
167
                    this_value = self.parse_mask_for_location(
168
                        mask,
169
                        location_parts,
170
                        place_name,
171
                    )
172
                    break
173
                elif part in ('album', 'extension', 'title'):
174
                    if metadata[part]:
175
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
176
                        break
177
                elif part in ('original_name'):
178
                    # First we check if we have metadata['original_name'].
179
                    # We have to do this for backwards compatibility because
180
                    #   we original did not store this back into EXIF.
181
                    if metadata[part]:
182
                        this_value = os.path.splitext(metadata['original_name'])[0]
183
                    else:
184
                        # We didn't always store original_name so this is 
185
                        #  for backwards compatability.
186
                        # We want to remove the hardcoded date prefix we used 
187
                        #  to add to the name.
188
                        # This helps when re-running the program on file 
189
                        #  which were already processed.
190
                        this_value = re.sub(
191
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
192
                            '',
193
                            metadata['base_name']
194
                        )
195
                        if(len(this_value) == 0):
196
                            this_value = metadata['base_name']
197
198
                    # Lastly we want to sanitize the name
199
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
200
                elif part.startswith('"') and part.endswith('"'):
201
                    this_value = part[1:-1]
202
                    break
203
204
            # Here we replace the placeholder with it's corresponding value.
205
            # Check if this_value was not set so that the placeholder
206
            #  can be removed completely.
207
            # For example, %title- will be replaced with ''
208
            # Else replace the placeholder (i.e. %title) with the value.
209
            if this_value is None:
210
                name = re.sub(
211
                    #'[^a-z_]+%{}'.format(part),
212
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by
The variable part does not seem to be defined for all execution paths.
Loading history...
213
                    '',
214
                    name,
215
                )
216
            else:
217
                name = re.sub(
218
                    '%{}'.format(part),
219
                    this_value,
220
                    name,
221
                )
222
223
        config = load_config()
224
225
        if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
226
            return name.upper()
227
        else:
228
            return name.lower()
229
230
    def get_file_name_definition(self):
231
        """Returns a list of folder definitions.
232
233
        Each element in the list represents a folder.
234
        Fallback folders are supported and are nested lists.
235
        Return values take the following form.
236
        [
237
            ('date', '%Y-%m-%d'),
238
            [
239
                ('location', '%city'),
240
                ('album', ''),
241
                ('"Unknown Location", '')
242
            ]
243
        ]
244
245
        :returns: list
246
        """
247
        # If we've done this already then return it immediately without
248
        # incurring any extra work
249
        if self.cached_file_name_definition is not None:
250
            return self.cached_file_name_definition
251
252
        config = load_config()
253
254
        # If File is in the config we assume name and its
255
        #  corresponding values are also present
256
        config_file = self.default_file_name_definition
257
        if('File' in config):
258
            config_file = config['File']
259
260
        # Find all subpatterns of name that map to the components of the file's
261
        #  name.
262
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
263
        path_parts = re.findall(
264
                         '(\%[a-z_]+)',
265
                         config_file['name']
266
                     )
267
268
        if not path_parts or len(path_parts) == 0:
269
            return (config_file['name'], self.default_file_name_definition)
270
271
        self.cached_file_name_definition = []
272
        for part in path_parts:
273
            if part in config_file:
274
                part = part[1:]
275
                self.cached_file_name_definition.append(
276
                    [(part, config_file[part])]
277
                )
278
            else:
279
                this_part = []
280
                for p in part.split('|'):
281
                    p = p[1:]
282
                    this_part.append(
283
                        (p, config_file[p] if p in config_file else '')
284
                    )
285
                self.cached_file_name_definition.append(this_part)
286
287
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
288
        return self.cached_file_name_definition
289
290
    def get_folder_path_definition(self):
291
        """Returns a list of folder definitions.
292
293
        Each element in the list represents a folder.
294
        Fallback folders are supported and are nested lists.
295
        Return values take the following form.
296
        [
297
            ('date', '%Y-%m-%d'),
298
            [
299
                ('location', '%city'),
300
                ('album', ''),
301
                ('"Unknown Location", '')
302
            ]
303
        ]
304
305
        :returns: list
306
        """
307
        # If we've done this already then return it immediately without
308
        # incurring any extra work
309
        if self.cached_folder_path_definition is not None:
310
            return self.cached_folder_path_definition
311
312
        config = load_config()
313
314
        # If Directory is in the config we assume full_path and its
315
        #  corresponding values (date, location) are also present
316
        config_directory = self.default_folder_path_definition
317
        if('Directory' in config):
318
            config_directory = config['Directory']
319
320
        # Find all subpatterns of full_path that map to directories.
321
        #  I.e. %foo/%bar => ['foo', 'bar']
322
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
323
        path_parts = re.findall(
324
                         '(\%[^/]+)',
325
                         config_directory['full_path']
326
                     )
327
328
        if not path_parts or len(path_parts) == 0:
329
            return self.default_folder_path_definition
330
331
        self.cached_folder_path_definition = []
332
        for part in path_parts:
333
            part = part.replace('%', '')
334
            if part in config_directory:
335
                self.cached_folder_path_definition.append(
336
                    [(part, config_directory[part])]
337
                )
338
            else:
339
                this_part = []
340
                for p in part.split('|'):
341
                    this_part.append(
342
                        (p, config_directory[p] if p in config_directory else '')
343
                    )
344
                self.cached_folder_path_definition.append(this_part)
345
346
        return self.cached_folder_path_definition
347
348
    def get_folder_path(self, metadata, path_parts=None):
349
        """Given a media's metadata this function returns the folder path as a string.
350
351
        :param dict metadata: Metadata dictionary.
352
        :returns: str
353
        """
354
        if path_parts is None:
355
            path_parts = self.get_folder_path_definition()
356
        path = []
357
        for path_part in path_parts:
358
            # We support fallback values so that
359
            #  'album|city|"Unknown Location"
360
            #  %album|%city|"Unknown Location" results in
361
            #  My Album - when an album exists
362
            #  Sunnyvale - when no album exists but a city exists
363
            #  Unknown Location - when neither an album nor location exist
364
            for this_part in path_part:
365
                part, mask = this_part
366
                this_path = self.get_dynamic_path(part, mask, metadata)
367
                if this_path:
368
                    path.append(this_path.strip())
369
                    # We break as soon as we have a value to append
370
                    # Else we continue for fallbacks
371
                    break
372
        return os.path.join(*path)
373
374
    def get_dynamic_path(self, part, mask, metadata):
375
        """Parse a specific folder's name given a mask and metadata.
376
377
        :param part: Name of the part as defined in the path (i.e. date from %date)
378
        :param mask: Mask representing the template for the path (i.e. %city %state
379
        :param metadata: Metadata dictionary.
380
        :returns: str
381
        """
382
383
        # Each part has its own custom logic and we evaluate a single part and return
384
        #  the evaluated string.
385
        if part in ('custom'):
386
            custom_parts = re.findall('(%[a-z_]+)', mask)
387
            folder = mask
388
            for i in custom_parts:
389
                folder = folder.replace(
390
                    i,
391
                    self.get_dynamic_path(i[1:], i, metadata)
392
                )
393
            return folder
394
        elif part in ('date'):
395
            config = load_config()
396
            # If Directory is in the config we assume full_path and its
397
            #  corresponding values (date, location) are also present
398
            config_directory = self.default_folder_path_definition
399
            if('Directory' in config):
400
                config_directory = config['Directory']
401
            date_mask = ''
402
            if 'date' in config_directory:
403
                date_mask = config_directory['date']
404
            return time.strftime(date_mask, metadata['date_taken'])
405
        elif part in ('day', 'month', 'year'):
406
            return time.strftime(mask, metadata['date_taken'])
407
        elif part in ('location', 'city', 'state', 'country'):
408
            place_name = geolocation.place_name(
409
                metadata['latitude'],
410
                metadata['longitude']
411
            )
412
413
            location_parts = re.findall('(%[^%]+)', mask)
414
            parsed_folder_name = self.parse_mask_for_location(
415
                mask,
416
                location_parts,
417
                place_name,
418
            )
419
            return parsed_folder_name
420
        elif part in ('album', 'camera_make', 'camera_model'):
421
            if metadata[part]:
422
                return metadata[part]
423
        elif part.startswith('"') and part.endswith('"'):
424
            # Fallback string
425
            return part[1:-1]
426
427
        return ''
428
429
    def parse_mask_for_location(self, mask, location_parts, place_name):
430
        """Takes a mask for a location and interpolates the actual place names.
431
432
        Given these parameters here are the outputs.
433
434
        mask=%city
435
        location_parts=[('%city','%city','city')]
436
        place_name={'city': u'Sunnyvale'}
437
        output=Sunnyvale
438
439
        mask=%city-%state
440
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
441
        place_name={'city': u'Sunnyvale', 'state': u'California'}
442
        output=Sunnyvale-California
443
444
        mask=%country
445
        location_parts=[('%country','%country','country')]
446
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
447
        output=Sunnyvale
448
449
450
        :param str mask: The location mask in the form of %city-%state, etc
451
        :param list location_parts: A list of tuples in the form of
452
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
453
        :param dict place_name: A dictionary of place keywords and names like
454
            {'default': u'California', 'state': u'California'}
455
        :returns: str
456
        """
457
        found = False
458
        folder_name = mask
459
        for loc_part in location_parts:
460
            # We assume the search returns a tuple of length 2.
461
            # If not then it's a bad mask in config.ini.
462
            # loc_part = '%country-random'
463
            # component_full = '%country-random'
464
            # component = '%country'
465
            # key = 'country
466
            component_full, component, key = re.search(
467
                '((%([a-z]+))[^%]*)',
468
                loc_part
469
            ).groups()
470
471
            if(key in place_name):
472
                found = True
473
                replace_target = component
474
                replace_with = place_name[key]
475
            else:
476
                replace_target = component_full
477
                replace_with = ''
478
479
            folder_name = folder_name.replace(
480
                replace_target,
481
                replace_with,
482
            )
483
484
        if(not found and folder_name == ''):
485
            folder_name = place_name['default']
486
487
        return folder_name
488
489
    def process_checksum(self, _file, allow_duplicate):
490
        db = Db()
491
        checksum = db.checksum(_file)
492
        if(checksum is None):
493
            log.info('Could not get checksum for %s.' % _file)
494
            return None
495
496
        # If duplicates are not allowed then we check if we've seen this file
497
        #  before via checksum. We also check that the file exists at the
498
        #   location we believe it to be.
499
        # If we find a checksum match but the file doesn't exist where we
500
        #  believe it to be then we write a debug log and proceed to import.
501
        checksum_file = db.get_hash(checksum)
502
        if(allow_duplicate is False and checksum_file is not None):
503
            if(os.path.isfile(checksum_file)):
504
                log.info('%s already at %s.' % (
505
                    _file,
506
                    checksum_file
507
                ))
508
                return None
509
            else:
510
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
511
                    _file,
512
                    checksum_file
513
                ))
514
        return checksum
515
516
    def process_file(self, _file, destination, media, **kwargs):
517
        move = False
518
        if('move' in kwargs):
519
            move = kwargs['move']
520
521
        allow_duplicate = False
522
        if('allowDuplicate' in kwargs):
523
            allow_duplicate = kwargs['allowDuplicate']
524
525
        stat_info_original = os.stat(_file)
526
        metadata = media.get_metadata()
527
528
        if(not media.is_valid()):
529
            print('%s is not a valid media file. Skipping...' % _file)
530
            return
531
532
        checksum = self.process_checksum(_file, allow_duplicate)
533
        if(checksum is None):
534
            log.info('Original checksum returned None for %s. Skipping...' %
535
                     _file)
536
            return
537
538
        # Run `before()` for every loaded plugin and if any of them raise an exception
539
        #  then we skip importing the file and log a message.
540
        plugins_run_before_status = self.plugins.run_all_before(_file, destination)
541
        if(plugins_run_before_status == False):
542
            log.warn('At least one plugin pre-run failed for %s' % _file)
543
            return
544
545
        directory_name = self.get_folder_path(metadata)
546
        dest_directory = os.path.join(destination, directory_name)
547
        file_name = self.get_file_name(metadata)
548
        dest_path = os.path.join(dest_directory, file_name)
549
550
        media.set_original_name()
551
552
        # If source and destination are identical then
553
        #  we should not write the file. gh-210
554
        if(_file == dest_path):
555
            print('Final source and destination path should not be identical')
556
            return
557
558
        self.create_directory(dest_directory)
559
560
        # exiftool renames the original file by appending '_original' to the
561
        # file name. A new file is written with new tags with the initial file
562
        # name. See exiftool man page for more details.
563
        exif_original_file = _file + '_original'
564
565
        # Check if the source file was processed by exiftool and an _original
566
        # file was created.
567
        exif_original_file_exists = False
568
        if(os.path.exists(exif_original_file)):
569
            exif_original_file_exists = True
570
571
        if(move is True):
572
            stat = os.stat(_file)
573
            # Move the processed file into the destination directory
574
            shutil.move(_file, dest_path)
575
576
            if(exif_original_file_exists is True):
577
                # We can remove it as we don't need the initial file.
578
                os.remove(exif_original_file)
579
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
580
        else:
581
            if(exif_original_file_exists is True):
582
                # Move the newly processed file with any updated tags to the
583
                # destination directory
584
                shutil.move(_file, dest_path)
585
                # Move the exif _original back to the initial source file
586
                shutil.move(exif_original_file, _file)
587
            else:
588
                compatability._copyfile(_file, dest_path)
589
590
            # Set the utime based on what the original file contained 
591
            #  before we made any changes.
592
            # Then set the utime on the destination file based on metadata.
593
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
594
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
595
596
        db = Db()
597
        db.add_hash(checksum, dest_path)
598
        db.update_hash_db()
599
600
        # Run `after()` for every loaded plugin and if any of them raise an exception
601
        #  then we skip importing the file and log a message.
602
        plugins_run_after_status = self.plugins.run_all_after(_file, destination, dest_path, metadata)
603
        if(plugins_run_after_status == False):
604
            log.warn('At least one plugin pre-run failed for %s' % _file)
605
            return
606
607
608
        return dest_path
609
610
    def set_utime_from_metadata(self, metadata, file_path):
611
        """ Set the modification time on the file based on the file name.
612
        """
613
614
        # Initialize date taken to what's returned from the metadata function.
615
        # If the folder and file name follow a time format of
616
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
617
        date_taken = metadata['date_taken']
618
        base_name = metadata['base_name']
619
        year_month_day_match = re.search(
620
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
621
            base_name
622
        )
623
        if(year_month_day_match is not None):
624
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
625
            date_taken = time.strptime(
626
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
627
                '%Y-%m-%d %H:%M:%S'
628
            )
629
630
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
631
        else:
632
            # We don't make any assumptions about time zones and
633
            # assume local time zone.
634
            date_taken_in_seconds = time.mktime(date_taken)
635
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
636
637
    def should_exclude(self, path, regex_list=set(), needs_compiled=False):
638
        if(len(regex_list) == 0):
639
            return False
640
641
        if(needs_compiled):
642
            compiled_list = []
643
            for regex in regex_list:
644
                compiled_list.append(re.compile(regex))
645
            regex_list = compiled_list
646
647
        return any(regex.search(path) for regex in regex_list)
648