Passed
Pull Request — master (#481)
by Jaisen
03:48 queued 17s
created

elodie.geolocation.exiftool_geolocation()   F

Complexity

Conditions 29

Size

Total Lines 93
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 29
eloc 63
nop 2
dl 0
loc 93
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like elodie.geolocation.exiftool_geolocation() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Look up geolocation information for media objects."""
2
from __future__ import print_function
3
from __future__ import division
4
from future import standard_library
5
from past.utils import old_div
6
7
standard_library.install_aliases()  # noqa
8
9
from os import path
10
11
import requests
12
import urllib.request
13
import urllib.parse
14
import urllib.error
15
16
from elodie.config import load_config
17
from elodie import constants
18
from elodie import log
19
from elodie.localstorage import Db
20
from elodie.external.pyexiftool import ExifTool
21
22
__KEY__ = None
23
__DEFAULT_LOCATION__ = 'Unknown Location'
24
__PREFER_ENGLISH_NAMES__ = None
25
26
27
def coordinates_by_name(name):
28
    # Try to get cached location first
29
    db = Db()
30
    cached_coordinates = db.get_location_coordinates(name)
31
    if(cached_coordinates is not None):
32
        return {
33
            'latitude': cached_coordinates[0],
34
            'longitude': cached_coordinates[1]
35
        }
36
37
    # If the name is not cached then we go ahead with an API lookup
38
    geolocation_info = lookup(location=name)
39
40
    if(geolocation_info is not None):
41
        if(
42
            'results' in geolocation_info and
43
            len(geolocation_info['results']) != 0 and
44
            'locations' in geolocation_info['results'][0] and
45
            len(geolocation_info['results'][0]['locations']) != 0
46
        ):
47
48
            # By default we use the first entry unless we find one with
49
            #   geocodeQuality=city.
50
            geolocation_result = geolocation_info['results'][0]
51
            use_location = geolocation_result['locations'][0]['latLng']
52
            # Loop over the locations to see if we come accross a
53
            #   geocodeQuality=city.
54
            # If we find a city we set that to the use_location and break
55
            for location in geolocation_result['locations']:
56
                if(
57
                    'latLng' in location and
58
                    'lat' in location['latLng'] and
59
                    'lng' in location['latLng'] and
60
                    location['geocodeQuality'].lower() == 'city'
61
                ):
62
                    use_location = location['latLng']
63
                    break
64
65
            return {
66
                'latitude': use_location['lat'],
67
                'longitude': use_location['lng']
68
            }
69
70
    return None
71
72
73
def decimal_to_dms(decimal):
74
    decimal = float(decimal)
75
    decimal_abs = abs(decimal)
76
    minutes, seconds = divmod(decimal_abs*3600, 60)
77
    degrees, minutes = divmod(minutes, 60)
78
    degrees = degrees
79
    sign = 1 if decimal >= 0 else -1
80
    return (degrees, minutes, seconds, sign)
81
82
83
def dms_to_decimal(degrees, minutes, seconds, direction=' '):
84
    sign = 1
85
    if(direction[0] in 'WSws'):
86
        sign = -1
87
    return (
88
        float(degrees) + old_div(float(minutes), 60) +
89
        old_div(float(seconds), 3600)
90
    ) * sign
91
92
93
def dms_string(decimal, type='latitude'):
94
    # Example string -> 38 deg 14' 27.82" S
95
    dms = decimal_to_dms(decimal)
96
    if type == 'latitude':
97
        direction = 'N' if decimal >= 0 else 'S'
98
    elif type == 'longitude':
99
        direction = 'E' if decimal >= 0 else 'W'
100
    return '{} deg {}\' {}" {}'.format(dms[0], dms[1], dms[2], direction)
0 ignored issues
show
introduced by
The variable direction does not seem to be defined for all execution paths.
Loading history...
101
102
103
def get_key():
104
    global __KEY__
105
    if __KEY__ is not None:
106
        return __KEY__
107
108
    if constants.mapquest_key is not None:
109
        __KEY__ = constants.mapquest_key
110
        return __KEY__
111
112
    config = load_config()
113
    if('MapQuest' not in config):
114
        return None
115
116
    __KEY__ = config['MapQuest']['key']
117
    return __KEY__
118
119
def get_prefer_english_names():
120
    global __PREFER_ENGLISH_NAMES__
121
    if __PREFER_ENGLISH_NAMES__ is not None:
122
        return __PREFER_ENGLISH_NAMES__
123
124
    config_file = '%s/config.ini' % constants.application_directory
125
    if not path.exists(config_file):
126
        return False
127
128
    config = load_config()
129
    if('MapQuest' not in config):
130
        return False
131
132
    if('prefer_english_names' not in config['MapQuest']):
133
        return False
134
135
    __PREFER_ENGLISH_NAMES__ = bool(config['MapQuest']['prefer_english_names'])
136
    return __PREFER_ENGLISH_NAMES__
137
138
def exiftool_geolocation(lat, lon):
139
    """Use ExifTool's geolocation database to look up place name from coordinates.
140
    
141
    :param float lat: Latitude coordinate
142
    :param float lon: Longitude coordinate
143
    :returns: dict with location information or None if not found
144
    """
145
    if lat is None or lon is None:
146
        return None
147
    
148
    # Convert lat/lon to floats
149
    if not isinstance(lat, float):
150
        lat = float(lat)
151
    if not isinstance(lon, float):
152
        lon = float(lon)
153
    
154
    try:
155
        with ExifTool() as et:
156
            # Query the geolocation database directly using -listgeo and find nearest match
157
            # Format: City,Region,Subregion,CountryCode,Country,TimeZone,FeatureCode,Population,Latitude,Longitude
158
            geo_data = et.execute(b"-listgeo", b"-csv").decode('utf-8')
159
            
160
            if not geo_data:
161
                return None
162
            
163
            lines = geo_data.strip().split('\n')
164
            if len(lines) < 2:  # Header + at least one data line
165
                return None
166
            
167
            # Skip header line
168
            best_match = None
169
            min_distance = float('inf')
170
            
171
            for line in lines[1:]:  # Skip header
172
                try:
173
                    parts = line.split(',')
174
                    if len(parts) >= 10 and parts[8] != 'Latitude':  # Skip header if it appears again
175
                        city = parts[0]
176
                        region = parts[1] 
177
                        subregion = parts[2]
178
                        country_code = parts[3]
179
                        country = parts[4]
180
                        db_lat = float(parts[8])
181
                        db_lon = float(parts[9])
182
                        
183
                        # Calculate simple distance (not exact but good enough for nearest city)
184
                        distance = ((lat - db_lat) ** 2 + (lon - db_lon) ** 2) ** 0.5
185
                        
186
                        if distance < min_distance:
187
                            min_distance = distance
188
                            best_match = {
189
                                'city': city,
190
                                'region': region,
191
                                'subregion': subregion,
192
                                'country': country,
193
                                'country_code': country_code
194
                            }
195
                            
196
                            # If we found a very close match (within ~0.1 degrees), use it
197
                            if distance < 0.1:
198
                                break
199
                                
200
                except (ValueError, IndexError):
201
                    continue
202
            
203
            if best_match and min_distance < 2.0:  # Within reasonable distance
204
                lookup_place_name = {}
205
                
206
                # Priority order: City > Region > Subregion > Country
207
                if best_match['city'] and best_match['city'].strip():
208
                    lookup_place_name['city'] = best_match['city']
209
                    lookup_place_name['default'] = best_match['city']
210
                elif best_match['region'] and best_match['region'].strip():
211
                    lookup_place_name['state'] = best_match['region']
212
                    if 'default' not in lookup_place_name:
213
                        lookup_place_name['default'] = best_match['region']
214
                elif best_match['subregion'] and best_match['subregion'].strip():
215
                    lookup_place_name['state'] = best_match['subregion']
216
                    if 'default' not in lookup_place_name:
217
                        lookup_place_name['default'] = best_match['subregion']
218
                
219
                if best_match['country'] and best_match['country'].strip():
220
                    lookup_place_name['country'] = best_match['country']
221
                    if 'default' not in lookup_place_name:
222
                        lookup_place_name['default'] = best_match['country']
223
                
224
                return lookup_place_name if lookup_place_name else None
225
                
226
    except Exception as e:
227
        log.error("ExifTool geolocation failed: {}".format(e))
228
        return None
229
    
230
    return None
231
232
233
def place_name(lat, lon):
234
    lookup_place_name_default = {'default': __DEFAULT_LOCATION__}
235
    if(lat is None or lon is None):
236
        return lookup_place_name_default
237
238
    # Convert lat/lon to floats
239
    if(not isinstance(lat, float)):
240
        lat = float(lat)
241
    if(not isinstance(lon, float)):
242
        lon = float(lon)
243
244
    # Try to get cached location first
245
    db = Db()
246
    # 3km distace radious for a match
247
    cached_place_name = db.get_location_name(lat, lon, 3000)
248
    # We check that it's a dict to coerce an upgrade of the location
249
    #  db from a string location to a dictionary. See gh-160.
250
    if(isinstance(cached_place_name, dict)):
251
        return cached_place_name
252
253
    lookup_place_name = {}
254
    
255
    # Check if MapQuest key is configured
256
    key = get_key()
257
    if key is not None:
258
        # Use MapQuest if key is available
259
        geolocation_info = lookup(lat=lat, lon=lon)
260
        if(geolocation_info is not None and 'address' in geolocation_info):
261
            address = geolocation_info['address']
262
            # gh-386 adds support for town
263
            # taking precedence after city for backwards compatability
264
            for loc in ['city', 'town', 'state', 'country']:
265
                if(loc in address):
266
                    lookup_place_name[loc] = address[loc]
267
                    # In many cases the desired key is not available so we
268
                    #  set the most specific as the default.
269
                    if('default' not in lookup_place_name):
270
                        lookup_place_name['default'] = address[loc]
271
    else:
272
        # Use ExifTool geolocation if no MapQuest key is configured
273
        lookup_place_name = exiftool_geolocation(lat, lon)
274
        if not lookup_place_name:
275
            lookup_place_name = {}
276
277
    if(lookup_place_name):
278
        db.add_location(lat, lon, lookup_place_name)
279
        # TODO: Maybe this should only be done on exit and not for every write.
280
        db.update_location_db()
281
282
    if('default' not in lookup_place_name):
283
        lookup_place_name = lookup_place_name_default
284
285
    return lookup_place_name
286
287
288
def lookup(**kwargs):
289
    if(
290
        'location' not in kwargs and
291
        'lat' not in kwargs and
292
        'lon' not in kwargs
293
    ):
294
        return None
295
296
    if('lat' in kwargs and 'lon' in kwargs):
297
        kwargs['location'] = '{},{}'.format(kwargs['lat'], kwargs['lon'])
298
299
    key = get_key()
300
    prefer_english_names = get_prefer_english_names()
301
302
    if(key is None):
303
        return None
304
305
    try:
306
        headers = {}
307
        params = {'format': 'json', 'key': key}
308
        if(prefer_english_names):
309
            headers = {'Accept-Language':'en-EN,en;q=0.8'}
310
            params['locale'] = 'en_US'
311
        params.update(kwargs)
312
        path = '/geocoding/v1/address'
313
        if('lat' in kwargs and 'lon' in kwargs):
314
            path = '/geocoding/v1/reverse'
315
        url = '%s%s?%s' % (
316
                    constants.mapquest_base_url,
317
                    path,
318
                    urllib.parse.urlencode(params)
319
              )
320
        # log the MapQuest url gh-446
321
        log.info('MapQuest url: %s' % (url))
322
        r = requests.get(url, headers=headers)
323
        return parse_result(r.json())
324
    except requests.exceptions.RequestException as e:
325
        log.error(e)
326
        return None
327
    except ValueError as e:
328
        log.error(r.text)
329
        log.error(e)
330
        return None
331
332
333
def parse_result(result):
334
    # gh-421
335
    # Return None if statusCode is not 0
336
    #   https://developer.mapquest.com/documentation/geocoding-api/status-codes/
337
    if( 'info' not in result or
338
        'statuscode' not in result['info'] or
339
        result['info']['statuscode'] != 0
340
       ):
341
        return None
342
343
    address = parse_result_address(result)
344
    if(address is None):
345
        return None
346
347
    result['address'] = address
348
    result['latLng'] = parse_result_latlon(result)
349
350
    return result
351
352
def parse_result_address(result):
353
    # We want to store the city, state and country
354
    # The only way determined to identify an unfound address is 
355
    #   that none of the indicies were found
356
    if( 'results' not in result or
357
        len(result['results']) == 0 or
358
        'locations' not in result['results'][0] or
359
        len(result['results'][0]['locations']) == 0
360
        ):
361
        return None
362
363
    index_found = False
364
    addresses = {'city': None, 'state': None, 'country': None}
365
    result_compat = {}
366
    result_compat['address'] = {}
367
368
369
    locations = result['results'][0]['locations'][0]
370
    # We are looping over locations to find the adminAreaNType key which
371
    #   has a value of City, State or Country.
372
    # Once we find it then we obtain the value from the key adminAreaN
373
    #   where N is a numeric index.
374
    # For example
375
    #   * adminArea1Type = 'City'
376
    #   * adminArea1 = 'Sunnyvale'
377
    for key in locations:
378
        # Check if the key is of the form adminArea1Type
379
        if(key[-4:] == 'Type'):
380
            # If it's a type then check if it corresponds to one we are intereated in
381
            #   and store the index by parsing the key
382
            key_prefix = key[:-4]
383
            key_index = key[-5:-4]
384
            if(locations[key].lower() in addresses):
385
                addresses[locations[key].lower()] = locations[key_prefix]
386
                index_found = True
387
388
    if(index_found is False):
389
        return None
390
391
    return addresses
392
393
def parse_result_latlon(result):
394
    if( 'results' not in result or
395
        len(result['results']) == 0 or
396
        'locations' not in result['results'][0] or
397
        len(result['results'][0]['locations']) == 0 or
398
        'latLng' not in result['results'][0]['locations'][0]
399
        ):
400
        return None
401
402
    latLng = result['results'][0]['locations'][0]['latLng'];
403
404
    return {'lat': latLng['lat'], 'lon': latLng['lng']}
405