Completed
Push — master ( e5e811...fbc7b0 )
by Jaisen
29s
created

FileSystem.set_utime_from_metadata()   B

Complexity

Conditions 2

Duplication

Lines 0
Ratio 0 %

Size

Total Lines 26

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 26
rs 8.8571
c 0
b 0
f 0
1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import compatability
15
from elodie import geolocation
16
from elodie import log
17
from elodie.config import load_config
18
from elodie.localstorage import Db
19
from elodie.media.base import Base, get_all_subclasses
20
21
22
class FileSystem(object):
23
    """A class for interacting with the file system."""
24
25
    def __init__(self):
26
        # The default folder path is along the lines of 2015-01-Jan/Chicago
27
        self.default_folder_path_definition = {
28
            'date': '%Y-%m-%b',
29
            'location': '%city',
30
            'full_path': '%date/%album|%location|"{}"'.format(
31
                            geolocation.__DEFAULT_LOCATION__
32
                         ),
33
        }
34
        self.cached_folder_path_definition = None
35
        self.default_parts = ['album', 'city', 'state', 'country']
36
37
    def create_directory(self, directory_path):
38
        """Create a directory if it does not already exist.
39
40
        :param str directory_name: A fully qualified path of the
41
            to create.
42
        :returns: bool
43
        """
44
        try:
45
            if os.path.exists(directory_path):
46
                return True
47
            else:
48
                os.makedirs(directory_path)
49
                return True
50
        except OSError:
51
            # OSError is thrown for cases like no permission
52
            pass
53
54
        return False
55
56
    def delete_directory_if_empty(self, directory_path):
57
        """Delete a directory only if it's empty.
58
59
        Instead of checking first using `len([name for name in
60
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
61
62
        :param str directory_name: A fully qualified path of the directory
63
            to delete.
64
        """
65
        try:
66
            os.rmdir(directory_path)
67
            return True
68
        except OSError:
69
            pass
70
71
        return False
72
73
    def get_all_files(self, path, extensions=None):
74
        """Recursively get all files which match a path and extension.
75
76
        :param str path string: Path to start recursive file listing
77
        :param tuple(str) extensions: File extensions to include (whitelist)
78
        :returns: generator
79
        """
80
        # If extensions is None then we get all supported extensions
81
        if not extensions:
82
            extensions = set()
83
            subclasses = get_all_subclasses(Base)
84
            for cls in subclasses:
85
                extensions.update(cls.extensions)
86
87
        for dirname, dirnames, filenames in os.walk(path):
88
            for filename in filenames:
89
                # If file extension is in `extensions` then append to the list
90
                if os.path.splitext(filename)[1][1:].lower() in extensions:
91
                    yield os.path.join(dirname, filename)
92
93
    def get_current_directory(self):
94
        """Get the current working directory.
95
96
        :returns: str
97
        """
98
        return os.getcwd()
99
100
    def get_file_name(self, media):
101
        """Generate file name for a photo or video using its metadata.
102
103
        We use an ISO8601-like format for the file name prefix. Instead of
104
        colons as the separator for hours, minutes and seconds we use a hyphen.
105
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
106
107
        :param media: A Photo or Video instance
108
        :type media: :class:`~elodie.media.photo.Photo` or
109
            :class:`~elodie.media.video.Video`
110
        :returns: str or None for non-photo or non-videos
111
        """
112
        if(not media.is_valid()):
113
            return None
114
115
        metadata = media.get_metadata()
116
        if(metadata is None):
117
            return None
118
119
        # First we check if we have metadata['original_name'].
120
        # We have to do this for backwards compatibility because
121
        #   we original did not store this back into EXIF.
122
        if('original_name' in metadata and metadata['original_name']):
123
            base_name = os.path.splitext(metadata['original_name'])[0]
124
        else:
125
            # If the file has EXIF title we use that in the file name
126
            #   (i.e. my-favorite-photo-img_1234.jpg)
127
            # We want to remove the date prefix we add to the name.
128
            # This helps when re-running the program on file which were already
129
            #   processed.
130
            base_name = re.sub(
131
                '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
132
                '',
133
                metadata['base_name']
134
            )
135
            if(len(base_name) == 0):
136
                base_name = metadata['base_name']
137
138
        if(
139
            'title' in metadata and
140
            metadata['title'] is not None and
141
            len(metadata['title']) > 0
142
        ):
143
            title_sanitized = re.sub('\W+', '-', metadata['title'].strip())
144
            base_name = base_name.replace('-%s' % title_sanitized, '')
145
            base_name = '%s-%s' % (base_name, title_sanitized)
146
147
        file_name = '%s-%s.%s' % (
148
            time.strftime(
149
                '%Y-%m-%d_%H-%M-%S',
150
                metadata['date_taken']
151
            ),
152
            base_name,
153
            metadata['extension'])
154
        return file_name.lower()
155
156
    def get_folder_path_definition(self):
157
        """Returns a list of folder definitions.
158
159
        Each element in the list represents a folder.
160
        Fallback folders are supported and are nested lists.
161
        Return values take the following form.
162
        [
163
            ('date', '%Y-%m-%d'),
164
            [
165
                ('location', '%city'),
166
                ('album', ''),
167
                ('"Unknown Location", '')
168
            ]
169
        ]
170
171
        :returns: list
172
        """
173
        # If we've done this already then return it immediately without
174
        # incurring any extra work
175
        if self.cached_folder_path_definition is not None:
176
            return self.cached_folder_path_definition
177
178
        config = load_config()
179
180
        # If Directory is in the config we assume full_path and its
181
        #  corresponding values (date, location) are also present
182
        config_directory = self.default_folder_path_definition
183
        if('Directory' in config):
184
            config_directory = config['Directory']
185
186
        # Find all subpatterns of full_path that map to directories.
187
        #  I.e. %foo/%bar => ['foo', 'bar']
188
        #  I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
189
        path_parts = re.findall(
190
                         '(\%[^/]+)',
191
                         config_directory['full_path']
192
                     )
193
194
        if not path_parts or len(path_parts) == 0:
195
            return self.default_folder_path_definition
196
197
        self.cached_folder_path_definition = []
198
        for part in path_parts:
199
            part = part.replace('%', '')
200
            if part in config_directory:
201
                self.cached_folder_path_definition.append(
202
                    [(part, config_directory[part])]
203
                )
204
            elif part in self.default_parts:
205
                self.cached_folder_path_definition.append(
206
                    [(part, '')]
207
                )
208
            else:
209
                this_part = []
210
                for p in part.split('|'):
211
                    this_part.append(
212
                        (p, config_directory[p] if p in config_directory else '')
213
                    )
214
                self.cached_folder_path_definition.append(this_part)
215
216
        return self.cached_folder_path_definition
217
218
    def get_folder_path(self, metadata):
219
        """Given a media's metadata this function returns the folder path as a string.
220
221
        :param metadata dict: Metadata dictionary.
222
        :returns: str
223
        """
224
        path_parts = self.get_folder_path_definition()
225
        path = []
226
        for path_part in path_parts:
227
            # We support fallback values so that
228
            #  'album|city|"Unknown Location"
229
            #  %album|%city|"Unknown Location" results in
230
            #  My Album - when an album exists
231
            #  Sunnyvale - when no album exists but a city exists
232
            #  Unknown Location - when neither an album nor location exist
233
            for this_part in path_part:
234
                part, mask = this_part
235
                if part in ('date', 'day', 'month', 'year'):
236
                    path.append(
237
                        time.strftime(mask, metadata['date_taken'])
238
                    )
239
                    break
240
                elif part in ('location', 'city', 'state', 'country'):
241
                    place_name = geolocation.place_name(
242
                        metadata['latitude'],
243
                        metadata['longitude']
244
                    )
245
246
                    location_parts = re.findall('(%[^%]+)', mask)
247
                    parsed_folder_name = self.parse_mask_for_location(
248
                        mask,
249
                        location_parts,
250
                        place_name,
251
                    )
252
                    path.append(parsed_folder_name)
253
                    break
254
                elif part in ('album'):
255
                    if metadata['album']:
256
                        path.append(metadata['album'])
257
                        break
258
                elif part.startswith('"') and part.endswith('"'):
259
                    path.append(part[1:-1])
260
261
        return os.path.join(*path)
262
263
    def parse_mask_for_location(self, mask, location_parts, place_name):
264
        """Takes a mask for a location and interpolates the actual place names.
265
266
        Given these parameters here are the outputs.
267
268
        mask=%city
269
        location_parts=[('%city','%city','city')]
270
        place_name={'city': u'Sunnyvale'}
271
        output=Sunnyvale
272
273
        mask=%city-%state
274
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
275
        place_name={'city': u'Sunnyvale', 'state': u'California'}
276
        output=Sunnyvale-California
277
278
        mask=%country
279
        location_parts=[('%country','%country','country')]
280
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
281
        output=Sunnyvale
282
283
284
        :param str mask: The location mask in the form of %city-%state, etc
285
        :param list location_parts: A list of tuples in the form of
286
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
287
        :param dict place_name: A dictionary of place keywords and names like
288
            {'default': u'California', 'state': u'California'}
289
        :returns: str
290
        """
291
        found = False
292
        folder_name = mask
293
        for loc_part in location_parts:
294
            # We assume the search returns a tuple of length 2.
295
            # If not then it's a bad mask in config.ini.
296
            # loc_part = '%country-random'
297
            # component_full = '%country-random'
298
            # component = '%country'
299
            # key = 'country
300
            component_full, component, key = re.search(
301
                '((%([a-z]+))[^%]*)',
302
                loc_part
303
            ).groups()
304
305
            if(key in place_name):
306
                found = True
307
                replace_target = component
308
                replace_with = place_name[key]
309
            else:
310
                replace_target = component_full
311
                replace_with = ''
312
313
            folder_name = folder_name.replace(
314
                replace_target,
315
                replace_with,
316
            )
317
318
        if(not found and folder_name == ''):
319
            folder_name = place_name['default']
320
321
        return folder_name
322
323
    def process_file(self, _file, destination, media, **kwargs):
324
        move = False
325
        if('move' in kwargs):
326
            move = kwargs['move']
327
328
        allow_duplicate = False
329
        if('allowDuplicate' in kwargs):
330
            allow_duplicate = kwargs['allowDuplicate']
331
332
        if(not media.is_valid()):
333
            print('%s is not a valid media file. Skipping...' % _file)
334
            return
335
336
        media.set_original_name()
337
        metadata = media.get_metadata()
338
339
        directory_name = self.get_folder_path(metadata)
340
341
        dest_directory = os.path.join(destination, directory_name)
342
        file_name = self.get_file_name(media)
343
        dest_path = os.path.join(dest_directory, file_name)
344
345
        db = Db()
346
        checksum = db.checksum(_file)
347
        if(checksum is None):
348
            log.info('Could not get checksum for %s. Skipping...' % _file)
349
            return
350
351
        # If duplicates are not allowed then we check if we've seen this file
352
        #  before via checksum. We also check that the file exists at the
353
        #   location we believe it to be.
354
        # If we find a checksum match but the file doesn't exist where we
355
        #  believe it to be then we write a debug log and proceed to import.
356
        checksum_file = db.get_hash(checksum)
357
        if(allow_duplicate is False and checksum_file is not None):
358
            if(os.path.isfile(checksum_file)):
359
                log.info('%s already exists at %s. Skipping...' % (
360
                    _file,
361
                    checksum_file
362
                ))
363
                return
364
            else:
365
                log.info('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
366
                    _file,
367
                    checksum_file
368
                ))
369
370
        # If source and destination are identical then
371
        #  we should not write the file. gh-210
372
        if(_file == dest_path):
373
            print('Final source and destination path should not be identical')
374
            return
375
376
        self.create_directory(dest_directory)
377
378
        if(move is True):
379
            stat = os.stat(_file)
380
            shutil.move(_file, dest_path)
381
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
382
        else:
383
            compatability._copyfile(_file, dest_path)
384
            self.set_utime_from_metadata(media.get_metadata(), dest_path)
385
386
        db.add_hash(checksum, dest_path)
387
        db.update_hash_db()
388
389
        return dest_path
390
391
    def set_utime_from_metadata(self, metadata, file_path):
392
        """ Set the modification time on the file based on the file name.
393
        """
394
395
        # Initialize date taken to what's returned from the metadata function.
396
        # If the folder and file name follow a time format of
397
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
398
        date_taken = metadata['date_taken']
399
        base_name = metadata['base_name']
400
        year_month_day_match = re.search(
401
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
402
            base_name
403
        )
404
        if(year_month_day_match is not None):
405
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
406
            date_taken = time.strptime(
407
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
408
                '%Y-%m-%d %H:%M:%S'
409
            )
410
411
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
412
        else:
413
            # We don't make any assumptions about time zones and
414
            # assume local time zone.
415
            date_taken_in_seconds = time.mktime(date_taken)
416
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
417