Completed
Push — master ( 328aed...cd5ba9 )
by Jaisen
01:01
created

FileSystem.parse_mask_for_location()   B

Complexity

Conditions 5

Duplication

Lines 0
Ratio 0 %

Size

Total Lines 59

Importance

Changes 0
Metric Value
cc 5
c 0
b 0
f 0
dl 0
loc 59
rs 8.3736

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import geolocation
15
from elodie import log
16
from elodie.config import load_config
17
from elodie.localstorage import Db
18
from elodie.media.base import Base, get_all_subclasses
19
20
21
class FileSystem(object):
22
    """A class for interacting with the file system."""
23
24
    def __init__(self):
25
        # The default folder path is along the lines of 2015-01-Jan/Chicago
26
        self.default_folder_path_definition = [
27
            ('date', '%Y-%m-%b'), ('location', '%city')
28
        ]
29
        self.cached_folder_path_definition = None
30
31
    def create_directory(self, directory_path):
32
        """Create a directory if it does not already exist.
33
34
        :param str directory_name: A fully qualified path of the
35
            to create.
36
        :returns: bool
37
        """
38
        try:
39
            if os.path.exists(directory_path):
40
                return True
41
            else:
42
                os.makedirs(directory_path)
43
                return True
44
        except OSError:
45
            # OSError is thrown for cases like no permission
46
            pass
47
48
        return False
49
50
    def delete_directory_if_empty(self, directory_path):
51
        """Delete a directory only if it's empty.
52
53
        Instead of checking first using `len([name for name in
54
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
55
56
        :param str directory_name: A fully qualified path of the directory
57
            to delete.
58
        """
59
        try:
60
            os.rmdir(directory_path)
61
            return True
62
        except OSError:
63
            pass
64
65
        return False
66
67
    def get_all_files(self, path, extensions=None):
68
        """Recursively get all files which match a path and extension.
69
70
        :param str path string: Path to start recursive file listing
71
        :param tuple(str) extensions: File extensions to include (whitelist)
72
        :returns: generator
73
        """
74
        # If extensions is None then we get all supported extensions
75
        if not extensions:
76
            extensions = set()
77
            subclasses = get_all_subclasses(Base)
78
            for cls in subclasses:
79
                extensions.update(cls.extensions)
80
81
        for dirname, dirnames, filenames in os.walk(path):
82
            for filename in filenames:
83
                # If file extension is in `extensions` then append to the list
84
                if os.path.splitext(filename)[1][1:].lower() in extensions:
85
                    yield os.path.join(dirname, filename)
86
87
    def get_current_directory(self):
88
        """Get the current working directory.
89
90
        :returns: str
91
        """
92
        return os.getcwd()
93
94
    def get_file_name(self, media):
95
        """Generate file name for a photo or video using its metadata.
96
97
        We use an ISO8601-like format for the file name prefix. Instead of
98
        colons as the separator for hours, minutes and seconds we use a hyphen.
99
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
100
101
        :param media: A Photo or Video instance
102
        :type media: :class:`~elodie.media.photo.Photo` or
103
            :class:`~elodie.media.video.Video`
104
        :returns: str or None for non-photo or non-videos
105
        """
106
        if(not media.is_valid()):
107
            return None
108
109
        metadata = media.get_metadata()
110
        if(metadata is None):
111
            return None
112
113
        # If the file has EXIF title we use that in the file name
114
        #   (i.e. my-favorite-photo-img_1234.jpg)
115
        # We want to remove the date prefix we add to the name.
116
        # This helps when re-running the program on file which were already
117
        #   processed.
118
        base_name = re.sub(
119
            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
120
            '',
121
            metadata['base_name']
122
        )
123
        if(len(base_name) == 0):
124
            base_name = metadata['base_name']
125
126
        if(
127
            'title' in metadata and
128
            metadata['title'] is not None and
129
            len(metadata['title']) > 0
130
        ):
131
            title_sanitized = re.sub('\W+', '-', metadata['title'].strip())
132
            base_name = base_name.replace('-%s' % title_sanitized, '')
133
            base_name = '%s-%s' % (base_name, title_sanitized)
134
135
        file_name = '%s-%s.%s' % (
136
            time.strftime(
137
                '%Y-%m-%d_%H-%M-%S',
138
                metadata['date_taken']
139
            ),
140
            base_name,
141
            metadata['extension'])
142
        return file_name.lower()
143
144
    def get_folder_path_definition(self):
145
        # If we've done this already then return it immediately without
146
        # incurring any extra work
147
        if self.cached_folder_path_definition is not None:
148
            return self.cached_folder_path_definition
149
150
        config = load_config()
151
152
        # If Directory is in the config we assume full_path and its
153
        #  corresponding values (date, location) are also present
154
        if('Directory' not in config):
155
            return self.default_folder_path_definition
156
157
        config_directory = config['Directory']
158
159
        path_parts = re.search(
160
                         '\%([^/]+)\/\%([^/]+)',
161
                         config_directory['full_path']
162
                     )
163
164
        if not path_parts or len(path_parts.groups()) != 2:
165
            return self.default_folder_path_definition
166
167
        path_part_groups = path_parts.groups()
168
        self.cached_folder_path_definition = [
169
            (path_part_groups[0], config_directory[path_part_groups[0]]),
170
            (path_part_groups[1], config_directory[path_part_groups[1]]),
171
        ]
172
        return self.cached_folder_path_definition
173
174
    def get_folder_path(self, metadata):
175
        """Get folder path by various parameters.
176
177
        :param metadata dict: Metadata dictionary.
178
        :returns: str
179
        """
180
        path_parts = self.get_folder_path_definition()
181
        path = []
182
        for path_part in path_parts:
183
            part, mask = path_part
184
            if part == 'date':
185
                path.append(time.strftime(mask, metadata['date_taken']))
186
            elif part == 'location':
187
                if(
188
                    metadata['latitude'] is not None and
189
                    metadata['longitude'] is not None
190
                ):
191
                    place_name = geolocation.place_name(
192
                        metadata['latitude'],
193
                        metadata['longitude']
194
                    )
195
                    if(place_name is not None):
196
                        location_parts = re.findall('(%[^%]+)', mask)
197
                        parsed_folder_name = self.parse_mask_for_location(
198
                            mask,
199
                            location_parts,
200
                            place_name,
201
                        )
202
                        path.append(parsed_folder_name)
203
204
        # For now we always make the leaf folder an album if it's in the EXIF.
205
        # This is to preserve backwards compatability until we figure out how
206
        # to include %album in the config.ini syntax.
207
        if(metadata['album'] is not None):
208
            if(len(path) == 1):
209
                path.append(metadata['album'])
210
            elif(len(path) == 2):
211
                path[1] = metadata['album']
212
213
        # if we don't have a 2nd level directory we use 'Unknown Location'
214
        if(len(path) < 2):
215
            path.append('Unknown Location')
216
217
        # return '/'.join(path[::-1])
218
        return os.path.join(*path)
219
220
    def parse_mask_for_location(self, mask, location_parts, place_name):
221
        """Takes a mask for a location and interpolates the actual place names.
222
223
        Given these parameters here are the outputs.
224
225
        mask=%city
226
        location_parts=[('%city','%city','city')]
227
        place_name={'city': u'Sunnyvale'}
228
        output=Sunnyvale
229
230
        mask=%city-%state
231
        location_parts=[('%city-','%city','city'), ('%state','%state','state')]
232
        place_name={'city': u'Sunnyvale', 'state': u'California'}
233
        output=Sunnyvale-California
234
235
        mask=%country
236
        location_parts=[('%country','%country','country')]
237
        place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
238
        output=Sunnyvale
239
240
241
        :param str mask: The location mask in the form of %city-%state, etc
242
        :param list location_parts: A list of tuples in the form of
243
            [('%city-', '%city', 'city'), ('%state', '%state', 'state')]
244
        :param dict place_name: A dictionary of place keywords and names like
245
            {'default': u'California', 'state': u'California'}
246
        :returns: str
247
        """
248
        found = False
249
        folder_name = mask
250
        for loc_part in location_parts:
251
            # We assume the search returns a tuple of length 2.
252
            # If not then it's a bad mask in config.ini.
253
            # loc_part = '%country-random'
254
            # component_full = '%country-random'
255
            # component = '%country'
256
            # key = 'country
257
            component_full, component, key = re.search(
258
                '((%([a-z]+))[^%]*)',
259
                loc_part
260
            ).groups()
261
262
            if(key in place_name):
263
                found = True
264
                replace_target = component
265
                replace_with = place_name[key]
266
            else:
267
                replace_target = component_full
268
                replace_with = ''
269
270
            folder_name = folder_name.replace(
271
                replace_target,
272
                replace_with,
273
            )
274
275
        if(not found and folder_name == ''):
276
            folder_name = place_name['default']
277
278
        return folder_name
279
280
    def process_file(self, _file, destination, media, **kwargs):
281
        move = False
282
        if('move' in kwargs):
283
            move = kwargs['move']
284
285
        allow_duplicate = False
286
        if('allowDuplicate' in kwargs):
287
            allow_duplicate = kwargs['allowDuplicate']
288
289
        if(not media.is_valid()):
290
            print('%s is not a valid media file. Skipping...' % _file)
291
            return
292
293
        metadata = media.get_metadata()
294
295
        directory_name = self.get_folder_path(metadata)
296
297
        dest_directory = os.path.join(destination, directory_name)
298
        file_name = self.get_file_name(media)
299
        dest_path = os.path.join(dest_directory, file_name)
300
301
        db = Db()
302
        checksum = db.checksum(_file)
303
        if(checksum is None):
304
            log.info('Could not get checksum for %s. Skipping...' % _file)
305
            return
306
307
        # If duplicates are not allowed then we check if we've seen this file
308
        #  before via checksum. We also check that the file exists at the
309
        #   location we believe it to be.
310
        # If we find a checksum match but the file doesn't exist where we
311
        #  believe it to be then we write a debug log and proceed to import.
312
        checksum_file = db.get_hash(checksum)
313
        if(allow_duplicate is False and checksum_file is not None):
314
            if(os.path.isfile(checksum_file)):
315
                log.info('%s already exists at %s. Skipping...' % (
316
                    _file,
317
                    checksum_file
318
                ))
319
                return
320
            else:
321
                log.info('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
322
                    _file,
323
                    checksum_file
324
                ))
325
326
        self.create_directory(dest_directory)
327
328
        if(move is True):
329
            stat = os.stat(_file)
330
            shutil.move(_file, dest_path)
331
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
332
        else:
333
            # Do not use copy2(), will have an issue when copying to a
334
            # network/mounted drive using copy and manual
335
            # set_date_from_filename gets the job done
336
            shutil.copy(_file, dest_path)
337
            self.set_utime(media)
338
339
        db.add_hash(checksum, dest_path)
340
        db.update_hash_db()
341
342
        return dest_path
343
344
    def set_utime(self, media):
345
        """ Set the modification time on the file base on the file name.
346
        """
347
348
        # Initialize date taken to what's returned from the metadata function.
349
        # If the folder and file name follow a time format of
350
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
351
        file_path = media.get_file_path()
352
        metadata = media.get_metadata()
353
        date_taken = metadata['date_taken']
354
        base_name = metadata['base_name']
355
        year_month_day_match = re.search(
356
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
357
            base_name
358
        )
359
        if(year_month_day_match is not None):
360
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
361
            date_taken = time.strptime(
362
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
363
                '%Y-%m-%d %H:%M:%S'
364
            )
365
366
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
367
        else:
368
            # We don't make any assumptions about time zones and
369
            # assume local time zone.
370
            date_taken_in_seconds = time.mktime(date_taken)
371
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
372