Passed
Pull Request — master (#301)
by
unknown
01:23
created

elodie.filesystem.FileSystem.delete_file()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 2
dl 0
loc 13
rs 9.95
c 0
b 0
f 0
1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
27
        self.default_file_name_definition = {
28
            'date': '%Y-%m-%d_%H-%M-%S',
29
            'name': '%date-%original_name-%title.%extension',
30
        }
31
        # The default folder path is along the lines of 2015-01-Jan/Chicago
32
        self.default_folder_path_definition = {
33
            'date': '%Y-%m-%b',
34
            'location': '%city',
35
            'full_path': '%date/%album|%location|"{}"'.format(
36
                            geolocation.__DEFAULT_LOCATION__
37
                         ),
38
        }
39
        self.cached_file_name_definition = None
40
        self.cached_folder_path_definition = None
41
        # Python3 treats the regex \s differently than Python2.
42
        # It captures some additional characters like the unicode checkmark \u2713.
43
        # See build failures in Python3 here.
44
        #  https://travis-ci.org/jmathai/elodie/builds/483012902
45
        self.whitespace_regex = '[ \t\n\r\f\v]+'
46
47
    def create_directory(self, directory_path):
48
        """Create a directory if it does not already exist.
49
50
        :param str directory_name: A fully qualified path of the
51
            to create.
52
        :returns: bool
53
        """
54
        try:
55
            if os.path.exists(directory_path):
56
                return True
57
            else:
58
                os.makedirs(directory_path)
59
                return True
60
        except OSError:
61
            # OSError is thrown for cases like no permission
62
            pass
63
64
        return False
65
66
    def delete_directory_if_empty(self, directory_path):
67
        """Delete a directory only if it's empty.
68
69
        Instead of checking first using `len([name for name in
70
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
71
72
        :param str directory_name: A fully qualified path of the directory
73
            to delete.
74
        """
75
        try:
76
            os.rmdir(directory_path)
77
            return True
78
        except OSError:
79
            pass
80
81
        return False
82
83
    def delete_file(self, file_path):
84
        """Delete a file safely but permanently.
85
        """
86
        try:
87
            if os.path.exists(file_path):
88
                os.remove(file_path)
89
                return True
90
            else:
91
                print("The file does not exist.") 
92
        except OSError:
93
            pass
94
95
        return False
96
97
    def get_all_files(self, path, extensions=None):
98
        """Recursively get all files which match a path and extension.
99
100
        :param str path string: Path to start recursive file listing
101
        :param tuple(str) extensions: File extensions to include (whitelist)
102
        :returns: generator
103
        """
104
        # If extensions is None then we get all supported extensions
105
        if not extensions:
106
            extensions = set()
107
            subclasses = get_all_subclasses(Base)
108
            for cls in subclasses:
109
                extensions.update(cls.extensions)
110
111
        for dirname, dirnames, filenames in os.walk(path):
112
            for filename in filenames:
113
                # If file extension is in `extensions` then append to the list
114
                if os.path.splitext(filename)[1][1:].lower() in extensions:
115
                    yield os.path.join(dirname, filename)
116
117
    def get_current_directory(self):
118
        """Get the current working directory.
119
120
        :returns: str
121
        """
122
        return os.getcwd()
123
124
    def get_file_name(self, media):
125
        """Generate file name for a photo or video using its metadata.
126
127
        Originally we hardcoded the file name to include an ISO date format.
128
        We use an ISO8601-like format for the file name prefix. Instead of
129
        colons as the separator for hours, minutes and seconds we use a hyphen.
130
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
131
132
        PR #225 made the file name customizable and fixed issues #107 #110 #111.
133
        https://github.com/jmathai/elodie/pull/225
134
135
        :param media: A Photo or Video instance
136
        :type media: :class:`~elodie.media.photo.Photo` or
137
            :class:`~elodie.media.video.Video`
138
        :returns: str or None for non-photo or non-videos
139
        """
140
        if(not media.is_valid()):
141
            return None
142
143
        metadata = media.get_metadata()
144
        if(metadata is None):
145
            return None
146
147
        # Get the name template and definition.
148
        # Name template is in the form %date-%original_name-%title.%extension
149
        # Definition is in the form
150
        #  [
151
        #    [('date', '%Y-%m-%d_%H-%M-%S')],
152
        #    [('original_name', '')], [('title', '')], // contains a fallback
153
        #    [('extension', '')]
154
        #  ]
155
        name_template, definition = self.get_file_name_definition()
156
157
        name = name_template
158
        for parts in definition:
159
            this_value = None
160
            for this_part in parts:
161
                part, mask = this_part
162
                if part in ('date', 'day', 'month', 'year'):
163
                    this_value = time.strftime(mask, metadata['date_taken'])
164
                    break
165
                elif part in ('location', 'city', 'state', 'country'):
166
                    place_name = geolocation.place_name(
167
                        metadata['latitude'],
168
                        metadata['longitude']
169
                    )
170
171
                    location_parts = re.findall('(%[^%]+)', mask)
172
                    this_value = self.parse_mask_for_location(
173
                        mask,
174
                        location_parts,
175
                        place_name,
176
                    )
177
                    break
178
                elif part in ('album', 'extension', 'title'):
179
                    if metadata[part]:
180
                        this_value = re.sub(self.whitespace_regex, '-', metadata[part].strip())
181
                        break
182
                elif part in ('original_name'):
183
                    # First we check if we have metadata['original_name'].
184
                    # We have to do this for backwards compatibility because
185
                    #   we original did not store this back into EXIF.
186
                    if metadata[part]:
187
                        this_value = os.path.splitext(metadata['original_name'])[0]
188
                    else:
189
                        # We didn't always store original_name so this is 
190
                        #  for backwards compatability.
191
                        # We want to remove the hardcoded date prefix we used 
192
                        #  to add to the name.
193
                        # This helps when re-running the program on file 
194
                        #  which were already processed.
195
                        this_value = re.sub(
196
                            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
197
                            '',
198
                            metadata['base_name']
199
                        )
200
                        if(len(this_value) == 0):
201
                            this_value = metadata['base_name']
202
203
                    # Lastly we want to sanitize the name
204
                    this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
205
                elif part.startswith('"') and part.endswith('"'):
206
                    this_value = part[1:-1]
207
                    break
208
209
            # Here we replace the placeholder with it's corresponding value.
210
            # Check if this_value was not set so that the placeholder
211
            #  can be removed completely.
212
            # For example, %title- will be replaced with ''
213
            # Else replace the placeholder (i.e. %title) with the value.
214
            if this_value is None:
215
                name = re.sub(
216
                    #'[^a-z_]+%{}'.format(part),
217
                    '[^a-zA-Z0-9_]+%{}'.format(part),
0 ignored issues
show
introduced by
The variable part does not seem to be defined for all execution paths.
Loading history...
218
                    '',
219
                    name,
220
                )
221
            else:
222
                name = re.sub(
223
                    '%{}'.format(part),
224
                    this_value,
225
                    name,
226
                )
227
228
        return name.lower()
229
230
    def get_file_name_definition(self):
231
        """Returns a list of folder definitions.
232
233
        Each element in the list represents a folder.
234
        Fallback folders are supported and are nested lists.
235
        Return values take the following form.
236
        [
237
            ('date', '%Y-%m-%d'),
238
            [
239
                ('location', '%city'),
240
                ('album', ''),
241
                ('"Unknown Location", '')
242
            ]
243
        ]
244
245
        :returns: list
246
        """
247
        # If we've done this already then return it immediately without
248
        # incurring any extra work
249
        if self.cached_file_name_definition is not None:
250
            return self.cached_file_name_definition
251
252
        config = load_config()
253
254
        # If File is in the config we assume name and its
255
        #  corresponding values are also present
256
        config_file = self.default_file_name_definition
257
        if('File' in config):
258
            config_file = config['File']
259
260
        # Find all subpatterns of name that map to the components of the file's
261
        #  name.
262
        #  I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'extension'] #noqa
263
        path_parts = re.findall(
264
                         '(\%[a-z_]+)',
265
                         config_file['name']
266
                     )
267
268
        if not path_parts or len(path_parts) == 0:
269
            return (config_file['name'], self.default_file_name_definition)
270
271
        self.cached_file_name_definition = []
272
        for part in path_parts:
273
            if part in config_file:
274
                part = part[1:]
275
                self.cached_file_name_definition.append(
276
                    [(part, config_file[part])]
277
                )
278
            else:
279
                this_part = []
280
                for p in part.split('|'):
281
                    p = p[1:]
282
                    this_part.append(
283
                        (p, config_file[p] if p in config_file else '')
284
                    )
285
                self.cached_file_name_definition.append(this_part)
286
287
        self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
288
        return self.cached_file_name_definition
289
290
    def get_folder_path_definition(self):
291
        """Returns a list of folder definitions.
292
293
        Each element in the list represents a folder.
294
        Fallback folders are supported and are nested lists.
295
        Return values take the following form.
296
        [
297
            ('date', '%Y-%m-%d'),
298
            [
299
                ('location', '%city'),
300
                ('album', ''),
301
                ('"Unknown Location", '')
302
            ]
303
        ]
304
305
        :returns: list
306
        """
307
        # If we've done this already then return it immediately without
308
        # incurring any extra work
309
        if self.cached_folder_path_definition is not None:
310
            return self.cached_folder_path_definition
311
312
        config = load_config()
313
314
        # If Directory is in the config we assume full_path and its
315
        #  corresponding values (date, location) are also present
316
        config_directory = self.default_folder_path_definition
317
        if('Directory' in config):
318
            config_directory = config['Directory']
319
320
        # Find all subpatterns of full_path that map to directories.
321
        #  I.e. %foo/%bar => ['foo', 'bar']
322
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
323
        path_parts = re.findall(
324
                         '(\%[^/]+)',
325
                         config_directory['full_path']
326
                     )
327
328
        if not path_parts or len(path_parts) == 0:
329
            return self.default_folder_path_definition
330
331
        self.cached_folder_path_definition = []
332
        for part in path_parts:
333
            part = part.replace('%', '')
334
            if part in config_directory:
335
                self.cached_folder_path_definition.append(
336
                    [(part, config_directory[part])]
337
                )
338
            else:
339
                this_part = []
340
                for p in part.split('|'):
341
                    this_part.append(
342
                        (p, config_directory[p] if p in config_directory else '')
343
                    )
344
                self.cached_folder_path_definition.append(this_part)
345
346
        return self.cached_folder_path_definition
347
348
    def get_folder_path(self, metadata, path_parts=None):
349
        """Given a media's metadata this function returns the folder path as a string.
350
351
        :param dict metadata: Metadata dictionary.
352
        :returns: str
353
        """
354
        if path_parts is None:
355
            path_parts = self.get_folder_path_definition()
356
        path = []
357
        for path_part in path_parts:
358
            # We support fallback values so that
359
            #  'album|city|"Unknown Location"
360
            #  %album|%city|"Unknown Location" results in
361
            #  My Album - when an album exists
362
            #  Sunnyvale - when no album exists but a city exists
363
            #  Unknown Location - when neither an album nor location exist
364
            for this_part in path_part:
365
                part, mask = this_part
366
                this_path = self.get_dynamic_path(part, mask, metadata)
367
                if this_path:
368
                    path.append(this_path.strip())
369
                    # We break as soon as we have a value to append
370
                    # Else we continue for fallbacks
371
                    break
372
        return os.path.join(*path)
373
374
    def get_dynamic_path(self, part, mask, metadata):
375
        """Parse a specific folder's name given a mask and metadata.
376
377
        :param part: Name of the part as defined in the path (i.e. date from %date)
378
        :param mask: Mask representing the template for the path (i.e. %city %state
379
        :param metadata: Metadata dictionary.
380
        :returns: str
381
        """
382
383
        # Each part has its own custom logic and we evaluate a single part and return
384
        #  the evaluated string.
385
        if part in ('custom'):
386
            custom_parts = re.findall('(%[a-z_]+)', mask)
387
            folder = mask
388
            for i in custom_parts:
389
                folder = folder.replace(
390
                    i,
391
                    self.get_dynamic_path(i[1:], i, metadata)
392
                )
393
            return folder
394
        elif part in ('date'):
395
            config = load_config()
396
            # If Directory is in the config we assume full_path and its
397
            #  corresponding values (date, location) are also present
398
            config_directory = self.default_folder_path_definition
399
            if('Directory' in config):
400
                config_directory = config['Directory']
401
            date_mask = ''
402
            if 'date' in config_directory:
403
                date_mask = config_directory['date']
404
            return time.strftime(date_mask, metadata['date_taken'])
405
        elif part in ('day', 'month', 'year'):
406
            return time.strftime(mask, metadata['date_taken'])
407
        elif part in ('location', 'city', 'state', 'country'):
408
            place_name = geolocation.place_name(
409
                metadata['latitude'],
410
                metadata['longitude']
411
            )
412
413
            location_parts = re.findall('(%[^%]+)', mask)
414
            parsed_folder_name = self.parse_mask_for_location(
415
                mask,
416
                location_parts,
417
                place_name,
418
            )
419
            return parsed_folder_name
420
        elif part in ('album', 'camera_make', 'camera_model'):
421
            if metadata[part]:
422
                return metadata[part]
423
        elif part.startswith('"') and part.endswith('"'):
424
            # Fallback string
425
            return part[1:-1]
426
427
        return ''
428
429
    def parse_mask_for_location(self, mask, location_parts, place_name):
430
        """Takes a mask for a location and interpolates the actual place names.
431
432
        Given these parameters here are the outputs.
433
434
        mask=%city
435
        location_parts=[('%city','%city','city')]
436
        place_name={'city': u'Sunnyvale'}
437
        output=Sunnyvale
438
439
        mask=%city-%state
440
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
441
        place_name={'city': u'Sunnyvale', 'state': u'California'}
442
        output=Sunnyvale-California
443
444
        mask=%country
445
        location_parts=[('%country','%country','country')]
446
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
447
        output=Sunnyvale
448
449
450
        :param str mask: The location mask in the form of %city-%state, etc
451
        :param list location_parts: A list of tuples in the form of
452
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
453
        :param dict place_name: A dictionary of place keywords and names like
454
            {'default': u'California', 'state': u'California'}
455
        :returns: str
456
        """
457
        found = False
458
        folder_name = mask
459
        for loc_part in location_parts:
460
            # We assume the search returns a tuple of length 2.
461
            # If not then it's a bad mask in config.ini.
462
            # loc_part = '%country-random'
463
            # component_full = '%country-random'
464
            # component = '%country'
465
            # key = 'country
466
            component_full, component, key = re.search(
467
                '((%([a-z]+))[^%]*)',
468
                loc_part
469
            ).groups()
470
471
            if(key in place_name):
472
                found = True
473
                replace_target = component
474
                replace_with = place_name[key]
475
            else:
476
                replace_target = component_full
477
                replace_with = ''
478
479
            folder_name = folder_name.replace(
480
                replace_target,
481
                replace_with,
482
            )
483
484
        if(not found and folder_name == ''):
485
            folder_name = place_name['default']
486
487
        return folder_name
488
489
    def process_file(self, _file, destination, media, **kwargs):
490
        move = False
491
        if('move' in kwargs):
492
            move = kwargs['move']
493
494
        allow_duplicate = False
495
        if('allowDuplicate' in kwargs):
496
            allow_duplicate = kwargs['allowDuplicate']
497
498
        if(not media.is_valid()):
499
            print('%s is not a valid media file. Skipping...' % _file)
500
            return
501
502
        media.set_original_name()
503
        metadata = media.get_metadata()
504
505
        directory_name = self.get_folder_path(metadata)
506
507
        dest_directory = os.path.join(destination, directory_name)
508
        file_name = self.get_file_name(media)
509
        dest_path = os.path.join(dest_directory, file_name)
510
511
        db = Db()
512
        checksum = db.checksum(_file)
513
        if(checksum is None):
514
            log.info('Could not get checksum for %s. Skipping...' % _file)
515
            return
516
517
        # If duplicates are not allowed then we check if we've seen this file
518
        #  before via checksum. We also check that the file exists at the
519
        #   location we believe it to be.
520
        # If we find a checksum match but the file doesn't exist where we
521
        #  believe it to be then we write a debug log and proceed to import.
522
        checksum_file = db.get_hash(checksum)
523
        if(allow_duplicate is False and checksum_file is not None):
524
            if(os.path.isfile(checksum_file)):
525
                if constants.delete_duplicates:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable constants does not seem to be defined.
Loading history...
526
                    log.info('%s already exists at %s. Deleting...' % (
527
                        _file,
528
                        checksum_file
529
                    ))
530
                    self.delete_file(_file)
531
                else:
532
                    log.info('%s already exists at %s. Skipping...' % (
533
                        _file,
534
                        checksum_file
535
                    ))
536
                return
537
            else:
538
                log.info('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
539
                    _file,
540
                    checksum_file
541
                ))
542
543
        # If source and destination are identical then
544
        #  we should not write the file. gh-210
545
        if(_file == dest_path):
546
            print('Final source and destination path should not be identical')
547
            return
548
549
        self.create_directory(dest_directory)
550
551
        if(move is True):
552
            stat = os.stat(_file)
553
            shutil.move(_file, dest_path)
554
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
555
        else:
556
            compatability._copyfile(_file, dest_path)
557
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
558
559
        db.add_hash(checksum, dest_path)
560
        db.update_hash_db()
561
562
        return dest_path
563
564
    def set_utime_from_metadata(self, metadata, file_path):
565
        """ Set the modification time on the file based on the file name.
566
        """
567
568
        # Initialize date taken to what's returned from the metadata function.
569
        # If the folder and file name follow a time format of
570
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
571
        date_taken = metadata['date_taken']
572
        base_name = metadata['base_name']
573
        year_month_day_match = re.search(
574
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
575
            base_name
576
        )
577
        if(year_month_day_match is not None):
578
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
579
            date_taken = time.strptime(
580
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
581
                '%Y-%m-%d %H:%M:%S'
582
            )
583
584
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
585
        else:
586
            # We don't make any assumptions about time zones and
587
            # assume local time zone.
588
            date_taken_in_seconds = time.mktime(date_taken)
589
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
590