Completed
Push — master ( 16f402...3f1efe )
by Jaisen
8s
created

FileSystem.process_file()   F

Complexity

Conditions 12

Duplication

Lines 0
Ratio 0 %

Size

Total Lines 66

Importance

Changes 5
Bugs 0 Features 1
Metric Value
cc 12
c 5
b 0
f 1
dl 0
loc 66
rs 2.6292

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like FileSystem.process_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import geolocation
15
from elodie import constants
16
from elodie.localstorage import Db
17
18
19
class FileSystem(object):
20
21
    """A class for interacting with the file system."""
22
23
    def create_directory(self, directory_path):
24
        """Create a directory if it does not already exist.
25
26
        :param str directory_name: A fully qualified path of the
27
            to create.
28
        :returns: bool
29
        """
30
        try:
31
            if os.path.exists(directory_path):
32
                return True
33
            else:
34
                os.makedirs(directory_path)
35
                return True
36
        except OSError:
37
            # OSError is thrown for cases like no permission
38
            pass
39
40
        return False
41
42
    def delete_directory_if_empty(self, directory_path):
43
        """Delete a directory only if it's empty.
44
45
        Instead of checking first using `len([name for name in
46
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
47
48
        :param str directory_name: A fully qualified path of the directory
49
            to delete.
50
        """
51
        try:
52
            os.rmdir(directory_path)
53
            return True
54
        except OSError:
55
            pass
56
57
        return False
58
59
    def get_all_files(self, path, extensions=None):
60
        """Recursively get all files which match a path and extension.
61
62
        :param str path string: Path to start recursive file listing
63
        :param tuple(str) extensions: File extensions to include (whitelist)
64
        """
65
        files = []
66
        for dirname, dirnames, filenames in os.walk(path):
67
            # print path to all filenames.
68
            for filename in filenames:
69
                if(
70
                    extensions is None or
71
                    filename.lower().endswith(extensions)
72
                ):
73
                    files.append(os.path.join(dirname, filename))
74
        return files
75
76
    def get_current_directory(self):
77
        """Get the current working directory.
78
79
        :returns: str
80
        """
81
        return os.getcwd()
82
83
    def get_file_name(self, media):
84
        """Generate file name for a photo or video using its metadata.
85
86
        We use an ISO8601-like format for the file name prefix. Instead of
87
        colons as the separator for hours, minutes and seconds we use a hyphen.
88
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
89
90
        :param media: A Photo or Video instance
91
        :type media: :class:`~elodie.media.photo.Photo` or
92
            :class:`~elodie.media.video.Video`
93
        :returns: str or None for non-photo or non-videos
94
        """
95
        if(not media.is_valid()):
96
            return None
97
98
        metadata = media.get_metadata()
99
        if(metadata is None):
100
            return None
101
102
        # If the file has EXIF title we use that in the file name
103
        #   (i.e. my-favorite-photo-img_1234.jpg)
104
        # We want to remove the date prefix we add to the name.
105
        # This helps when re-running the program on file which were already
106
        #   processed.
107
        base_name = re.sub(
108
            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
109
            '',
110
            metadata['base_name']
111
        )
112
        if(len(base_name) == 0):
113
            base_name = metadata['base_name']
114
115
        if(
116
            'title' in metadata and
117
            metadata['title'] is not None and
118
            len(metadata['title']) > 0
119
        ):
120
            title_sanitized = re.sub('\W+', '-', metadata['title'].strip())
121
            base_name = base_name.replace('-%s' % title_sanitized, '')
122
            base_name = '%s-%s' % (base_name, title_sanitized)
123
124
        file_name = '%s-%s.%s' % (
125
            time.strftime(
126
                '%Y-%m-%d_%H-%M-%S',
127
                metadata['date_taken']
128
            ),
129
            base_name,
130
            metadata['extension'])
131
        return file_name.lower()
132
133
    def get_folder_name_by_date(self, time_obj):
134
        """Get date based folder name.
135
136
        :param time time_obj: Time object to be used to determine folder name.
137
        :returns: str
138
        """
139
        return time.strftime('%Y-%m-%b', time_obj)
140
141
    def get_folder_path(self, metadata):
142
        """Get folder path by various parameters.
143
144
        :param time time_obj: Time object to be used to determine folder name.
145
        :returns: str
146
        """
147
        path = []
148
        if(metadata['date_taken'] is not None):
149
            path.append(time.strftime('%Y-%m-%b', metadata['date_taken']))
150
151
        if(metadata['album'] is not None):
152
            path.append(metadata['album'])
153
        elif(
154
            metadata['latitude'] is not None and
155
            metadata['longitude'] is not None
156
        ):
157
            place_name = geolocation.place_name(
158
                metadata['latitude'],
159
                metadata['longitude']
160
            )
161
            if(place_name is not None):
162
                path.append(place_name)
163
164
        # if we don't have a 2nd level directory we use 'Unknown Location'
165
        if(len(path) < 2):
166
            path.append('Unknown Location')
167
168
        # return '/'.join(path[::-1])
169
        return os.path.join(*path)
170
171
    def process_file(self, _file, destination, media, **kwargs):
172
        move = False
173
        if('move' in kwargs):
174
            move = kwargs['move']
175
176
        allow_duplicate = False
177
        if('allowDuplicate' in kwargs):
178
            allow_duplicate = kwargs['allowDuplicate']
179
180
        if(not media.is_valid()):
181
            print('%s is not a valid media file. Skipping...' % _file)
182
            return
183
184
        metadata = media.get_metadata()
185
186
        directory_name = self.get_folder_path(metadata)
187
188
        dest_directory = os.path.join(destination, directory_name)
189
        file_name = self.get_file_name(media)
190
        dest_path = os.path.join(dest_directory, file_name)
191
192
        db = Db()
193
        checksum = db.checksum(_file)
194
        if(checksum is None):
195
            if(constants.debug is True):
196
                print('Could not get checksum for %s. Skipping...' % _file)
197
            return
198
199
        # If duplicates are not allowed then we check if we've seen this file
200
        #  before via checksum. We also check that the file exists at the
201
        #   location we believe it to be.
202
        # If we find a checksum match but the file doesn't exist where we
203
        #  believe it to be then we write a debug log and proceed to import.
204
        checksum_file = db.get_hash(checksum)
205
        if(allow_duplicate is False and checksum_file is not None):
206
            if(os.path.isfile(checksum_file)):
207
                if(constants.debug is True):
208
                    print('%s already exists at %s. Skipping...' % (
209
                        _file,
210
                        checksum_file
211
                    ))
212
                return
213
            else:
214
                if(constants.debug is True):
215
                    print('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
216
                        _file,
217
                        checksum_file
218
                    ))
219
220
        self.create_directory(dest_directory)
221
222
        if(move is True):
223
            stat = os.stat(_file)
224
            shutil.move(_file, dest_path)
225
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
226
        else:
227
            # Do not use copy2(), will have an issue when copying to a
228
            # network/mounted drive using copy and manual
229
            # set_date_from_filename gets the job done
230
            shutil.copy(_file, dest_path)
231
            self.set_utime(media)
232
233
        db.add_hash(checksum, dest_path)
234
        db.update_hash_db()
235
236
        return dest_path
237
238
    def set_utime(self, media):
239
        """ Set the modification time on the file base on the file name.
240
        """
241
242
        # Initialize date taken to what's returned from the metadata function.
243
        # If the folder and file name follow a time format of
244
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
245
        file_path = media.get_file_path()
246
        metadata = media.get_metadata()
247
        date_taken = metadata['date_taken']
248
        base_name = metadata['base_name']
249
        year_month_day_match = re.search(
250
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
251
            base_name
252
        )
253
        if(year_month_day_match is not None):
254
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
255
            date_taken = time.strptime(
256
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
257
                '%Y-%m-%d %H:%M:%S'
258
            )
259
260
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
261
        else:
262
            # We don't make any assumptions about time zones and
263
            # assume local time zone.
264
            date_taken_in_seconds = time.mktime(date_taken)
265
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
266