Issues (157)

elodie/filesystem.py (1 issue)

Severity
1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
from elodie.plugins.plugins import Plugins
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
27
        self.default_file_name_definition = {
28
            'date': '%Y-%m-%d_%H-%M-%S',
29
            'name': '%date-%original_name-%title.%extension',
30
        }
31
        # The default folder path is along the lines of 2015-01-Jan/Chicago
32
        self.default_folder_path_definition = {
33
            'date': '%Y-%m-%b',
34
            'location': '%city',
35
            'full_path': '%date/%album|%location|"{}"'.format(
36
                            geolocation.__DEFAULT_LOCATION__
37
                         ),
38
        }
39
        self.cached_file_name_definition = None
40
        self.cached_folder_path_definition = None
41
        # Python3 treats the regex \s differently than Python2.
42
        # It captures some additional characters like the unicode checkmark \u2713.
43
        # See build failures in Python3 here.
44
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
45
        self.whitespace_regex = '[ \t\n\r\f\v]+'
46
47
        # Instantiate a plugins object
48
        self.plugins = Plugins()
49
50
    def create_directory(self, directory_path):
51
        """Create a directory if it does not already exist.
52
53
        :param str directory_name: A fully qualified path of the
54
            to create.
55
        :returns: bool
56
        """
57
        try:
58
            if os.path.exists(directory_path):
59
                return True
60
            else:
61
                os.makedirs(directory_path)
62
                return True
63
        except OSError:
64
            # OSError is thrown for cases like no permission
65
            pass
66
67
        return False
68
69
    def delete_directory_if_empty(self, directory_path):
70
        """Delete a directory only if it's empty.
71
72
        Instead of checking first using `len([name for name in
73
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
74
75
        :param str directory_name: A fully qualified path of the directory
76
            to delete.
77
        """
78
        try:
79
            os.rmdir(directory_path)
80
            return True
81
        except OSError:
82
            pass
83
84
        return False
85
86
    def get_all_files(self, path, extensions=None, exclude_regex_list=set()):
87
        """Recursively get all files which match a path and extension.
88
89
        :param str path string: Path to start recursive file listing
90
        :param tuple(str) extensions: File extensions to include (whitelist)
91
        :returns: generator
92
        """
93
        # If extensions is None then we get all supported extensions
94
        if not extensions:
95
            extensions = set()
96
            subclasses = get_all_subclasses(Base)
97
            for cls in subclasses:
98
                extensions.update(cls.extensions)
99
100
        # Create a list of compiled regular expressions to match against the file path
101
        compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list]
102
        for dirname, dirnames, filenames in os.walk(path):
103
            for filename in filenames:
104
                # If file extension is in `extensions` 
105
                # And if file path is not in exclude regexes
106
                # Then append to the list
107
                filename_path = os.path.join(dirname, filename)
108
                if (
109
                        os.path.splitext(filename)[1][1:].lower() in extensions and
110
                        not self.should_exclude(filename_path, compiled_regex_list, False)
111
                    ):
112
                    yield filename_path
113
114
    def get_current_directory(self):
115
        """Get the current working directory.
116
117
        :returns: str
118
        """
119
        return os.getcwd()
120
121
    def get_file_name(self, metadata):
122
        """Generate file name for a photo or video using its metadata.
123
124
        Originally we hardcoded the file name to include an ISO date format.
125
        We use an ISO8601-like format for the file name prefix. Instead of
126
        colons as the separator for hours, minutes and seconds we use a hyphen.
127
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
128
129
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
130
        https://github.com/jmathai/elodie/pull/225
131
132
        :param media: A Photo or Video instance
133
        :type media: :class:`~elodie.media.photo.Photo` or
134
            :class:`~elodie.media.video.Video`
135
        :returns: str or None for non-photo or non-videos
136
        """
137
        if(metadata is None):
138
            return None
139
140
        # Get the name template and definition.
141
        # Name template is in the form %date-%original_name-%title.%extension
142
        # Definition is in the form
143
        #  [
144
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
145
        #    [('original_name', '')], [('title', '')], // contains a fallback
146
        #    [('extension', '')]
147
        #  ]
148
        name_template, definition = self.get_file_name_definition()
149
150
        name = name_template
151
        for parts in definition:
152
            this_value = None
153
            for this_part in parts:
154
                part, mask = this_part
155
                if part in ('date', 'day', 'month', 'year'):
156
                    this_value = time.strftime(mask, metadata['date_taken'])
157
                    break
158
                elif part in ('location', 'city', 'state', 'country'):
159
                    place_name = geolocation.place_name(
160
                        metadata['latitude'],
161
                        metadata['longitude']
162
                    )
163
164
                    location_parts = re.findall('(%[^%]+)', mask)
165
                    this_value = self.parse_mask_for_location(
166
                        mask,
167
                        location_parts,
168
                        place_name,
169
                    )
170
                    break
171
                elif part in ('album', 'extension', 'title'):
172
                    if metadata[part]:
173
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
174
                        break
175
                elif part in ('original_name'):
176
                    # First we check if we have metadata['original_name'].
177
                    # We have to do this for backwards compatibility because
178
                    #   we original did not store this back into EXIF.
179
                    if metadata[part]:
180
                        this_value = os.path.splitext(metadata['original_name'])[0]
181
                    else:
182
                        # We didn't always store original_name so this is 
183
                        #  for backwards compatability.
184
                        # We want to remove the hardcoded date prefix we used 
185
                        #  to add to the name.
186
                        # This helps when re-running the program on file 
187
                        #  which were already processed.
188
                        this_value = re.sub(
189
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
190
                            '',
191
                            metadata['base_name']
192
                        )
193
                        if(len(this_value) == 0):
194
                            this_value = metadata['base_name']
195
196
                    # Lastly we want to sanitize the name
197
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
198
                elif part.startswith('"') and part.endswith('"'):
199
                    this_value = part[1:-1]
200
                    break
201
202
            # Here we replace the placeholder with it's corresponding value.
203
            # Check if this_value was not set so that the placeholder
204
            #  can be removed completely.
205
            # For example, %title- will be replaced with ''
206
            # Else replace the placeholder (i.e. %title) with the value.
207
            if this_value is None:
208
                name = re.sub(
209
                    #'[^a-z_]+%{}'.format(part),
210
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
The variable part does not seem to be defined for all execution paths.
Loading history...
211
                    '',
212
                    name,
213
                )
214
            else:
215
                name = re.sub(
216
                    '%{}'.format(part),
217
                    this_value,
218
                    name,
219
                )
220
221
        config = load_config()
222
223
        if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
224
            return name.upper()
225
        else:
226
            return name.lower()
227
228
    def get_file_name_definition(self):
229
        """Returns a list of folder definitions.
230
231
        Each element in the list represents a folder.
232
        Fallback folders are supported and are nested lists.
233
        Return values take the following form.
234
        [
235
            ('date', '%Y-%m-%d'),
236
            [
237
                ('location', '%city'),
238
                ('album', ''),
239
                ('"Unknown Location", '')
240
            ]
241
        ]
242
243
        :returns: list
244
        """
245
        # If we've done this already then return it immediately without
246
        # incurring any extra work
247
        if self.cached_file_name_definition is not None:
248
            return self.cached_file_name_definition
249
250
        config = load_config()
251
252
        # If File is in the config we assume name and its
253
        #  corresponding values are also present
254
        config_file = self.default_file_name_definition
255
        if('File' in config):
256
            config_file = config['File']
257
258
        # Find all subpatterns of name that map to the components of the file's
259
        #  name.
260
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
261
        path_parts = re.findall(
262
                         '(\%[a-z_]+)',
263
                         config_file['name']
264
                     )
265
266
        if not path_parts or len(path_parts) == 0:
267
            return (config_file['name'], self.default_file_name_definition)
268
269
        self.cached_file_name_definition = []
270
        for part in path_parts:
271
            if part in config_file:
272
                part = part[1:]
273
                self.cached_file_name_definition.append(
274
                    [(part, config_file[part])]
275
                )
276
            else:
277
                this_part = []
278
                for p in part.split('|'):
279
                    p = p[1:]
280
                    this_part.append(
281
                        (p, config_file[p] if p in config_file else '')
282
                    )
283
                self.cached_file_name_definition.append(this_part)
284
285
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
286
        return self.cached_file_name_definition
287
288
    def get_folder_path_definition(self):
289
        """Returns a list of folder definitions.
290
291
        Each element in the list represents a folder.
292
        Fallback folders are supported and are nested lists.
293
        Return values take the following form.
294
        [
295
            ('date', '%Y-%m-%d'),
296
            [
297
                ('location', '%city'),
298
                ('album', ''),
299
                ('"Unknown Location", '')
300
            ]
301
        ]
302
303
        :returns: list
304
        """
305
        # If we've done this already then return it immediately without
306
        # incurring any extra work
307
        if self.cached_folder_path_definition is not None:
308
            return self.cached_folder_path_definition
309
310
        config = load_config()
311
312
        # If Directory is in the config we assume full_path and its
313
        #  corresponding values (date, location) are also present
314
        config_directory = self.default_folder_path_definition
315
        if('Directory' in config):
316
            config_directory = config['Directory']
317
318
        # Find all subpatterns of full_path that map to directories.
319
        #  I.e. %foo/%bar => ['foo', 'bar']
320
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
321
        path_parts = re.findall(
322
                         '(\%[^/]+)',
323
                         config_directory['full_path']
324
                     )
325
326
        if not path_parts or len(path_parts) == 0:
327
            return self.default_folder_path_definition
328
329
        self.cached_folder_path_definition = []
330
        for part in path_parts:
331
            part = part.replace('%', '')
332
            if part in config_directory:
333
                self.cached_folder_path_definition.append(
334
                    [(part, config_directory[part])]
335
                )
336
            else:
337
                this_part = []
338
                for p in part.split('|'):
339
                    this_part.append(
340
                        (p, config_directory[p] if p in config_directory else '')
341
                    )
342
                self.cached_folder_path_definition.append(this_part)
343
344
        return self.cached_folder_path_definition
345
346
    def get_folder_path(self, metadata, path_parts=None):
347
        """Given a media's metadata this function returns the folder path as a string.
348
349
        :param dict metadata: Metadata dictionary.
350
        :returns: str
351
        """
352
        if path_parts is None:
353
            path_parts = self.get_folder_path_definition()
354
        path = []
355
        for path_part in path_parts:
356
            # We support fallback values so that
357
            #  'album|city|"Unknown Location"
358
            #  %album|%city|"Unknown Location" results in
359
            #  My Album - when an album exists
360
            #  Sunnyvale - when no album exists but a city exists
361
            #  Unknown Location - when neither an album nor location exist
362
            for this_part in path_part:
363
                part, mask = this_part
364
                this_path = self.get_dynamic_path(part, mask, metadata)
365
                if this_path:
366
                    path.append(this_path.strip())
367
                    # We break as soon as we have a value to append
368
                    # Else we continue for fallbacks
369
                    break
370
        return os.path.join(*path)
371
372
    def get_dynamic_path(self, part, mask, metadata):
373
        """Parse a specific folder's name given a mask and metadata.
374
375
        :param part: Name of the part as defined in the path (i.e. date from %date)
376
        :param mask: Mask representing the template for the path (i.e. %city %state
377
        :param metadata: Metadata dictionary.
378
        :returns: str
379
        """
380
381
        # Each part has its own custom logic and we evaluate a single part and return
382
        #  the evaluated string.
383
        if part in ('custom'):
384
            custom_parts = re.findall('(%[a-z_]+)', mask)
385
            folder = mask
386
            for i in custom_parts:
387
                folder = folder.replace(
388
                    i,
389
                    self.get_dynamic_path(i[1:], i, metadata)
390
                )
391
            return folder
392
        elif part in ('date'):
393
            config = load_config()
394
            # If Directory is in the config we assume full_path and its
395
            #  corresponding values (date, location) are also present
396
            config_directory = self.default_folder_path_definition
397
            if('Directory' in config):
398
                config_directory = config['Directory']
399
            date_mask = ''
400
            if 'date' in config_directory:
401
                date_mask = config_directory['date']
402
            return time.strftime(date_mask, metadata['date_taken'])
403
        elif part in ('day', 'month', 'year'):
404
            return time.strftime(mask, metadata['date_taken'])
405
        elif part in ('location', 'city', 'state', 'country'):
406
            place_name = geolocation.place_name(
407
                metadata['latitude'],
408
                metadata['longitude']
409
            )
410
411
            location_parts = re.findall('(%[^%]+)', mask)
412
            parsed_folder_name = self.parse_mask_for_location(
413
                mask,
414
                location_parts,
415
                place_name,
416
            )
417
            return parsed_folder_name
418
        elif part in ('album', 'camera_make', 'camera_model'):
419
            if metadata[part]:
420
                return metadata[part]
421
        elif part.startswith('"') and part.endswith('"'):
422
            # Fallback string
423
            return part[1:-1]
424
425
        return ''
426
427
    def parse_mask_for_location(self, mask, location_parts, place_name):
428
        """Takes a mask for a location and interpolates the actual place names.
429
430
        Given these parameters here are the outputs.
431
432
        mask=%city
433
        location_parts=[('%city','%city','city')]
434
        place_name={'city': u'Sunnyvale'}
435
        output=Sunnyvale
436
437
        mask=%city-%state
438
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
439
        place_name={'city': u'Sunnyvale', 'state': u'California'}
440
        output=Sunnyvale-California
441
442
        mask=%country
443
        location_parts=[('%country','%country','country')]
444
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
445
        output=Sunnyvale
446
447
448
        :param str mask: The location mask in the form of %city-%state, etc
449
        :param list location_parts: A list of tuples in the form of
450
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
451
        :param dict place_name: A dictionary of place keywords and names like
452
            {'default': u'California', 'state': u'California'}
453
        :returns: str
454
        """
455
        found = False
456
        folder_name = mask
457
        for loc_part in location_parts:
458
            # We assume the search returns a tuple of length 2.
459
            # If not then it's a bad mask in config.ini.
460
            # loc_part = '%country-random'
461
            # component_full = '%country-random'
462
            # component = '%country'
463
            # key = 'country
464
            component_full, component, key = re.search(
465
                '((%([a-z]+))[^%]*)',
466
                loc_part
467
            ).groups()
468
469
            if(key in place_name):
470
                found = True
471
                replace_target = component
472
                replace_with = place_name[key]
473
            else:
474
                replace_target = component_full
475
                replace_with = ''
476
477
            folder_name = folder_name.replace(
478
                replace_target,
479
                replace_with,
480
            )
481
482
        if(not found and folder_name == ''):
483
            folder_name = place_name['default']
484
485
        return folder_name
486
487
    def process_checksum(self, _file, allow_duplicate):
488
        db = Db()
489
        checksum = db.checksum(_file)
490
        if(checksum is None):
491
            log.info('Could not get checksum for %s.' % _file)
492
            return None
493
494
        # If duplicates are not allowed then we check if we've seen this file
495
        #  before via checksum. We also check that the file exists at the
496
        #   location we believe it to be.
497
        # If we find a checksum match but the file doesn't exist where we
498
        #  believe it to be then we write a debug log and proceed to import.
499
        checksum_file = db.get_hash(checksum)
500
        if(allow_duplicate is False and checksum_file is not None):
501
            if(os.path.isfile(checksum_file)):
502
                log.info('%s already at %s.' % (
503
                    _file,
504
                    checksum_file
505
                ))
506
                return None
507
            else:
508
                log.info('%s matched checksum but file not found at %s.' % (  # noqa
509
                    _file,
510
                    checksum_file
511
                ))
512
        return checksum
513
514
    def process_file(self, _file, destination, media, **kwargs):
515
        move = False
516
        if('move' in kwargs):
517
            move = kwargs['move']
518
519
        allow_duplicate = False
520
        if('allowDuplicate' in kwargs):
521
            allow_duplicate = kwargs['allowDuplicate']
522
523
        stat_info_original = os.stat(_file)
524
        metadata = media.get_metadata()
525
526
        if(not media.is_valid()):
527
            print('%s is not a valid media file. Skipping...' % _file)
528
            return
529
530
        checksum = self.process_checksum(_file, allow_duplicate)
531
        if(checksum is None):
532
            log.info('Original checksum returned None for %s. Skipping...' %
533
                     _file)
534
            return
535
536
        # Run `before()` for every loaded plugin and if any of them raise an exception
537
        #  then we skip importing the file and log a message.
538
        plugins_run_before_status = self.plugins.run_all_before(_file, destination)
539
        if(plugins_run_before_status == False):
540
            log.warn('At least one plugin pre-run failed for %s' % _file)
541
            return
542
543
        directory_name = self.get_folder_path(metadata)
544
        dest_directory = os.path.join(destination, directory_name)
545
        file_name = self.get_file_name(metadata)
546
        dest_path = os.path.join(dest_directory, file_name)        
547
548
        media.set_original_name()
549
550
        # If source and destination are identical then
551
        #  we should not write the file. gh-210
552
        if(_file == dest_path):
553
            print('Final source and destination path should not be identical')
554
            return
555
556
        self.create_directory(dest_directory)
557
558
        # exiftool renames the original file by appending '_original' to the
559
        # file name. A new file is written with new tags with the initial file
560
        # name. See exiftool man page for more details.
561
        exif_original_file = _file + '_original'
562
563
        # Check if the source file was processed by exiftool and an _original
564
        # file was created.
565
        exif_original_file_exists = False
566
        if(os.path.exists(exif_original_file)):
567
            exif_original_file_exists = True
568
569
        if(move is True):
570
            stat = os.stat(_file)
571
            # Move the processed file into the destination directory
572
            shutil.move(_file, dest_path)
573
574
            if(exif_original_file_exists is True):
575
                # We can remove it as we don't need the initial file.
576
                os.remove(exif_original_file)
577
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
578
        else:
579
            if(exif_original_file_exists is True):
580
                # Move the newly processed file with any updated tags to the
581
                # destination directory
582
                shutil.move(_file, dest_path)
583
                # Move the exif _original back to the initial source file
584
                shutil.move(exif_original_file, _file)
585
            else:
586
                compatability._copyfile(_file, dest_path)
587
588
            # Set the utime based on what the original file contained 
589
            #  before we made any changes.
590
            # Then set the utime on the destination file based on metadata.
591
            os.utime(_file, (stat_info_original.st_atime, stat_info_original.st_mtime))
592
            self.set_utime_from_metadata(metadata, dest_path)
593
594
        db = Db()
595
        db.add_hash(checksum, dest_path)
596
        db.update_hash_db()
597
598
        # Run `after()` for every loaded plugin and if any of them raise an exception
599
        #  then we skip importing the file and log a message.
600
        plugins_run_after_status = self.plugins.run_all_after(_file, destination, dest_path, metadata)
601
        if(plugins_run_after_status == False):
602
            log.warn('At least one plugin pre-run failed for %s' % _file)
603
            return
604
605
606
        return dest_path
607
608
    def set_utime_from_metadata(self, metadata, file_path):
609
        """ Set the modification time on the file based on the file name.
610
        """
611
612
        # Initialize date taken to what's returned from the metadata function.
613
        # If the folder and file name follow a time format of
614
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
615
        date_taken = metadata['date_taken']
616
        base_name = metadata['base_name']
617
        year_month_day_match = re.search(
618
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
619
            base_name
620
        )
621
        if(year_month_day_match is not None):
622
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
623
            date_taken = time.strptime(
624
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
625
                '%Y-%m-%d %H:%M:%S'
626
            )
627
628
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
629
        else:
630
            # We don't make any assumptions about time zones and
631
            # assume local time zone.
632
            date_taken_in_seconds = time.mktime(date_taken)
633
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
634
635
    def should_exclude(self, path, regex_list=set(), needs_compiled=False):
636
        if(len(regex_list) == 0):
637
            return False
638
639
        if(needs_compiled):
640
            compiled_list = []
641
            for regex in regex_list:
642
                compiled_list.append(re.compile(regex))
643
            regex_list = compiled_list
644
645
        return any(regex.search(path) for regex in regex_list)
646