Completed
Push8eaba9...3fc43c
passed — Build
created

FileSystem.process_file()   D

↳ Parent: FileSystem

Complexity

Conditions 9

Duplication

Lines 0
Ratio 0 %

Size

Total Lines 63

Importance

Changes 5
Bugs 0 Features 1
Metric Value
cc 9
c 5
b 0
f 1
dl 0
loc 63
rs 4.7173

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
General file system methods.
3
4
.. moduleauthor:: Jaisen Mathai <[email protected]>
5
"""
6
from __future__ import print_function
7
from builtins import object
8
9
import os
10
import re
11
import shutil
12
import time
13
14
from elodie import geolocation
15
from elodie import log
16
from elodie.localstorage import Db
17
18
19
class FileSystem(object):
20
21
    """A class for interacting with the file system."""
22
23
    def create_directory(self, directory_path):
24
        """Create a directory if it does not already exist.
25
26
        :param str directory_name: A fully qualified path of the
27
            to create.
28
        :returns: bool
29
        """
30
        try:
31
            if os.path.exists(directory_path):
32
                return True
33
            else:
34
                os.makedirs(directory_path)
35
                return True
36
        except OSError:
37
            # OSError is thrown for cases like no permission
38
            pass
39
40
        return False
41
42
    def delete_directory_if_empty(self, directory_path):
43
        """Delete a directory only if it's empty.
44
45
        Instead of checking first using `len([name for name in
46
        os.listdir(directory_path)]) == 0`, we catch the OSError exception.
47
48
        :param str directory_name: A fully qualified path of the directory
49
            to delete.
50
        """
51
        try:
52
            os.rmdir(directory_path)
53
            return True
54
        except OSError:
55
            pass
56
57
        return False
58
59
    def get_all_files(self, path, extensions=None):
60
        """Recursively get all files which match a path and extension.
61
62
        :param str path string: Path to start recursive file listing
63
        :param tuple(str) extensions: File extensions to include (whitelist)
64
        """
65
        files = []
66
        for dirname, dirnames, filenames in os.walk(path):
67
            # print path to all filenames.
68
            for filename in filenames:
69
                if(
70
                    extensions is None or
71
                    filename.lower().endswith(extensions)
72
                ):
73
                    files.append(os.path.join(dirname, filename))
74
        return files
75
76
    def get_current_directory(self):
77
        """Get the current working directory.
78
79
        :returns: str
80
        """
81
        return os.getcwd()
82
83
    def get_file_name(self, media):
84
        """Generate file name for a photo or video using its metadata.
85
86
        We use an ISO8601-like format for the file name prefix. Instead of
87
        colons as the separator for hours, minutes and seconds we use a hyphen.
88
        https://en.wikipedia.org/wiki/ISO_8601#General_principles
89
90
        :param media: A Photo or Video instance
91
        :type media: :class:`~elodie.media.photo.Photo` or
92
            :class:`~elodie.media.video.Video`
93
        :returns: str or None for non-photo or non-videos
94
        """
95
        if(not media.is_valid()):
96
            return None
97
98
        metadata = media.get_metadata()
99
        if(metadata is None):
100
            return None
101
102
        # If the file has EXIF title we use that in the file name
103
        #   (i.e. my-favorite-photo-img_1234.jpg)
104
        # We want to remove the date prefix we add to the name.
105
        # This helps when re-running the program on file which were already
106
        #   processed.
107
        base_name = re.sub(
108
            '^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
109
            '',
110
            metadata['base_name']
111
        )
112
        if(len(base_name) == 0):
113
            base_name = metadata['base_name']
114
115
        if(
116
            'title' in metadata and
117
            metadata['title'] is not None and
118
            len(metadata['title']) > 0
119
        ):
120
            title_sanitized = re.sub('\W+', '-', metadata['title'].strip())
121
            base_name = base_name.replace('-%s' % title_sanitized, '')
122
            base_name = '%s-%s' % (base_name, title_sanitized)
123
124
        file_name = '%s-%s.%s' % (
125
            time.strftime(
126
                '%Y-%m-%d_%H-%M-%S',
127
                metadata['date_taken']
128
            ),
129
            base_name,
130
            metadata['extension'])
131
        return file_name.lower()
132
133
    def get_folder_name_by_date(self, time_obj):
134
        """Get date based folder name.
135
136
        :param time time_obj: Time object to be used to determine folder name.
137
        :returns: str
138
        """
139
        return time.strftime('%Y-%m-%b', time_obj)
140
141
    def get_folder_path(self, metadata):
142
        """Get folder path by various parameters.
143
144
        :param time time_obj: Time object to be used to determine folder name.
145
        :returns: str
146
        """
147
        path = []
148
        if(metadata['date_taken'] is not None):
149
            path.append(time.strftime('%Y-%m-%b', metadata['date_taken']))
150
151
        if(metadata['album'] is not None):
152
            path.append(metadata['album'])
153
        elif(
154
            metadata['latitude'] is not None and
155
            metadata['longitude'] is not None
156
        ):
157
            place_name = geolocation.place_name(
158
                metadata['latitude'],
159
                metadata['longitude']
160
            )
161
            if(place_name is not None):
162
                path.append(place_name)
163
164
        # if we don't have a 2nd level directory we use 'Unknown Location'
165
        if(len(path) < 2):
166
            path.append('Unknown Location')
167
168
        # return '/'.join(path[::-1])
169
        return os.path.join(*path)
170
171
    def process_file(self, _file, destination, media, **kwargs):
172
        move = False
173
        if('move' in kwargs):
174
            move = kwargs['move']
175
176
        allow_duplicate = False
177
        if('allowDuplicate' in kwargs):
178
            allow_duplicate = kwargs['allowDuplicate']
179
180
        if(not media.is_valid()):
181
            print('%s is not a valid media file. Skipping...' % _file)
182
            return
183
184
        metadata = media.get_metadata()
185
186
        directory_name = self.get_folder_path(metadata)
187
188
        dest_directory = os.path.join(destination, directory_name)
189
        file_name = self.get_file_name(media)
190
        dest_path = os.path.join(dest_directory, file_name)
191
192
        db = Db()
193
        checksum = db.checksum(_file)
194
        if(checksum is None):
195
            log.info('Could not get checksum for %s. Skipping...' % _file)
196
            return
197
198
        # If duplicates are not allowed then we check if we've seen this file
199
        #  before via checksum. We also check that the file exists at the
200
        #   location we believe it to be.
201
        # If we find a checksum match but the file doesn't exist where we
202
        #  believe it to be then we write a debug log and proceed to import.
203
        checksum_file = db.get_hash(checksum)
204
        if(allow_duplicate is False and checksum_file is not None):
205
            if(os.path.isfile(checksum_file)):
206
                log.info('%s already exists at %s. Skipping...' % (
207
                    _file,
208
                    checksum_file
209
                ))
210
                return
211
            else:
212
                log.info('%s matched checksum but file not found at %s. Importing again...' % (  # noqa
213
                    _file,
214
                    checksum_file
215
                ))
216
217
        self.create_directory(dest_directory)
218
219
        if(move is True):
220
            stat = os.stat(_file)
221
            shutil.move(_file, dest_path)
222
            os.utime(dest_path, (stat.st_atime, stat.st_mtime))
223
        else:
224
            # Do not use copy2(), will have an issue when copying to a
225
            # network/mounted drive using copy and manual
226
            # set_date_from_filename gets the job done
227
            shutil.copy(_file, dest_path)
228
            self.set_utime(media)
229
230
        db.add_hash(checksum, dest_path)
231
        db.update_hash_db()
232
233
        return dest_path
234
235
    def set_utime(self, media):
236
        """ Set the modification time on the file base on the file name.
237
        """
238
239
        # Initialize date taken to what's returned from the metadata function.
240
        # If the folder and file name follow a time format of
241
        #   YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
242
        file_path = media.get_file_path()
243
        metadata = media.get_metadata()
244
        date_taken = metadata['date_taken']
245
        base_name = metadata['base_name']
246
        year_month_day_match = re.search(
247
            '^(\d{4})-(\d{2})-(\d{2})_(\d{2})-(\d{2})-(\d{2})',
248
            base_name
249
        )
250
        if(year_month_day_match is not None):
251
            (year, month, day, hour, minute, second) = year_month_day_match.groups()  # noqa
252
            date_taken = time.strptime(
253
                '{}-{}-{} {}:{}:{}'.format(year, month, day, hour, minute, second),  # noqa
254
                '%Y-%m-%d %H:%M:%S'
255
            )
256
257
            os.utime(file_path, (time.time(), time.mktime(date_taken)))
258
        else:
259
            # We don't make any assumptions about time zones and
260
            # assume local time zone.
261
            date_taken_in_seconds = time.mktime(date_taken)
262
            os.utime(file_path, (time.time(), (date_taken_in_seconds)))
263