1 | """Look up geolocation information for media objects.""" |
||
2 | from __future__ import print_function |
||
3 | from __future__ import division |
||
4 | from future import standard_library |
||
5 | from past.utils import old_div |
||
6 | |||
7 | standard_library.install_aliases() # noqa |
||
8 | |||
9 | from os import path |
||
10 | |||
11 | import requests |
||
12 | import urllib.request |
||
13 | import urllib.parse |
||
14 | import urllib.error |
||
15 | |||
16 | from elodie.config import load_config |
||
17 | from elodie import constants |
||
18 | from elodie import log |
||
19 | from elodie.localstorage import Db |
||
20 | |||
21 | __KEY__ = None |
||
22 | __DEFAULT_LOCATION__ = 'Unknown Location' |
||
23 | __PREFER_ENGLISH_NAMES__ = None |
||
24 | |||
25 | |||
26 | def coordinates_by_name(name): |
||
27 | # Try to get cached location first |
||
28 | db = Db() |
||
29 | cached_coordinates = db.get_location_coordinates(name) |
||
30 | if(cached_coordinates is not None): |
||
31 | return { |
||
32 | 'latitude': cached_coordinates[0], |
||
33 | 'longitude': cached_coordinates[1] |
||
34 | } |
||
35 | |||
36 | # If the name is not cached then we go ahead with an API lookup |
||
37 | geolocation_info = lookup(location=name) |
||
38 | |||
39 | if(geolocation_info is not None): |
||
40 | if( |
||
41 | 'results' in geolocation_info and |
||
42 | len(geolocation_info['results']) != 0 and |
||
43 | 'locations' in geolocation_info['results'][0] and |
||
44 | len(geolocation_info['results'][0]['locations']) != 0 |
||
45 | ): |
||
46 | |||
47 | # By default we use the first entry unless we find one with |
||
48 | # geocodeQuality=city. |
||
49 | geolocation_result = geolocation_info['results'][0] |
||
50 | use_location = geolocation_result['locations'][0]['latLng'] |
||
51 | # Loop over the locations to see if we come accross a |
||
52 | # geocodeQuality=city. |
||
53 | # If we find a city we set that to the use_location and break |
||
54 | for location in geolocation_result['locations']: |
||
55 | if( |
||
56 | 'latLng' in location and |
||
57 | 'lat' in location['latLng'] and |
||
58 | 'lng' in location['latLng'] and |
||
59 | location['geocodeQuality'].lower() == 'city' |
||
60 | ): |
||
61 | use_location = location['latLng'] |
||
62 | break |
||
63 | |||
64 | return { |
||
65 | 'latitude': use_location['lat'], |
||
66 | 'longitude': use_location['lng'] |
||
67 | } |
||
68 | |||
69 | return None |
||
70 | |||
71 | |||
72 | def decimal_to_dms(decimal): |
||
73 | decimal = float(decimal) |
||
74 | decimal_abs = abs(decimal) |
||
75 | minutes, seconds = divmod(decimal_abs*3600, 60) |
||
76 | degrees, minutes = divmod(minutes, 60) |
||
77 | degrees = degrees |
||
78 | sign = 1 if decimal >= 0 else -1 |
||
79 | return (degrees, minutes, seconds, sign) |
||
80 | |||
81 | |||
82 | def dms_to_decimal(degrees, minutes, seconds, direction=' '): |
||
83 | sign = 1 |
||
84 | if(direction[0] in 'WSws'): |
||
85 | sign = -1 |
||
86 | return ( |
||
87 | float(degrees) + old_div(float(minutes), 60) + |
||
88 | old_div(float(seconds), 3600) |
||
89 | ) * sign |
||
90 | |||
91 | |||
92 | def dms_string(decimal, type='latitude'): |
||
93 | # Example string -> 38 deg 14' 27.82" S |
||
94 | dms = decimal_to_dms(decimal) |
||
95 | if type == 'latitude': |
||
96 | direction = 'N' if decimal >= 0 else 'S' |
||
97 | elif type == 'longitude': |
||
98 | direction = 'E' if decimal >= 0 else 'W' |
||
99 | return '{} deg {}\' {}" {}'.format(dms[0], dms[1], dms[2], direction) |
||
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
100 | |||
101 | |||
102 | def get_key(): |
||
103 | global __KEY__ |
||
104 | if __KEY__ is not None: |
||
105 | return __KEY__ |
||
106 | |||
107 | if constants.mapquest_key is not None: |
||
108 | __KEY__ = constants.mapquest_key |
||
109 | return __KEY__ |
||
110 | |||
111 | config = load_config() |
||
112 | if('MapQuest' not in config): |
||
113 | return None |
||
114 | |||
115 | __KEY__ = config['MapQuest']['key'] |
||
116 | return __KEY__ |
||
117 | |||
118 | def get_prefer_english_names(): |
||
119 | global __PREFER_ENGLISH_NAMES__ |
||
120 | if __PREFER_ENGLISH_NAMES__ is not None: |
||
121 | return __PREFER_ENGLISH_NAMES__ |
||
122 | |||
123 | config_file = '%s/config.ini' % constants.application_directory |
||
124 | if not path.exists(config_file): |
||
125 | return False |
||
126 | |||
127 | config = load_config() |
||
128 | if('MapQuest' not in config): |
||
129 | return False |
||
130 | |||
131 | if('prefer_english_names' not in config['MapQuest']): |
||
132 | return False |
||
133 | |||
134 | __PREFER_ENGLISH_NAMES__ = bool(config['MapQuest']['prefer_english_names']) |
||
135 | return __PREFER_ENGLISH_NAMES__ |
||
136 | |||
137 | def place_name(lat, lon): |
||
138 | lookup_place_name_default = {'default': __DEFAULT_LOCATION__} |
||
139 | if(lat is None or lon is None): |
||
140 | return lookup_place_name_default |
||
141 | |||
142 | # Convert lat/lon to floats |
||
143 | if(not isinstance(lat, float)): |
||
144 | lat = float(lat) |
||
145 | if(not isinstance(lon, float)): |
||
146 | lon = float(lon) |
||
147 | |||
148 | # Try to get cached location first |
||
149 | db = Db() |
||
150 | # 3km distace radious for a match |
||
151 | cached_place_name = db.get_location_name(lat, lon, 3000) |
||
152 | # We check that it's a dict to coerce an upgrade of the location |
||
153 | # db from a string location to a dictionary. See gh-160. |
||
154 | if(isinstance(cached_place_name, dict)): |
||
155 | return cached_place_name |
||
156 | |||
157 | lookup_place_name = {} |
||
158 | geolocation_info = lookup(lat=lat, lon=lon) |
||
159 | if(geolocation_info is not None and 'address' in geolocation_info): |
||
160 | address = geolocation_info['address'] |
||
161 | # gh-386 adds support for town |
||
162 | # taking precedence after city for backwards compatability |
||
163 | for loc in ['city', 'town', 'state', 'country']: |
||
164 | if(loc in address): |
||
165 | lookup_place_name[loc] = address[loc] |
||
166 | # In many cases the desired key is not available so we |
||
167 | # set the most specific as the default. |
||
168 | if('default' not in lookup_place_name): |
||
169 | lookup_place_name['default'] = address[loc] |
||
170 | |||
171 | if(lookup_place_name): |
||
172 | db.add_location(lat, lon, lookup_place_name) |
||
173 | # TODO: Maybe this should only be done on exit and not for every write. |
||
174 | db.update_location_db() |
||
175 | |||
176 | if('default' not in lookup_place_name): |
||
177 | lookup_place_name = lookup_place_name_default |
||
178 | |||
179 | return lookup_place_name |
||
180 | |||
181 | |||
182 | def lookup(**kwargs): |
||
183 | if( |
||
184 | 'location' not in kwargs and |
||
185 | 'lat' not in kwargs and |
||
186 | 'lon' not in kwargs |
||
187 | ): |
||
188 | return None |
||
189 | |||
190 | if('lat' in kwargs and 'lon' in kwargs): |
||
191 | kwargs['location'] = '{},{}'.format(kwargs['lat'], kwargs['lon']) |
||
192 | |||
193 | key = get_key() |
||
194 | prefer_english_names = get_prefer_english_names() |
||
195 | |||
196 | if(key is None): |
||
197 | return None |
||
198 | |||
199 | try: |
||
200 | headers = {} |
||
201 | params = {'format': 'json', 'key': key} |
||
202 | if(prefer_english_names): |
||
203 | headers = {'Accept-Language':'en-EN,en;q=0.8'} |
||
204 | params['locale'] = 'en_US' |
||
205 | params.update(kwargs) |
||
206 | path = '/geocoding/v1/address' |
||
207 | if('lat' in kwargs and 'lon' in kwargs): |
||
208 | path = '/geocoding/v1/reverse' |
||
209 | url = '%s%s?%s' % ( |
||
210 | constants.mapquest_base_url, |
||
211 | path, |
||
212 | urllib.parse.urlencode(params) |
||
213 | ) |
||
214 | r = requests.get(url, headers=headers) |
||
215 | return parse_result(r.json()) |
||
216 | except requests.exceptions.RequestException as e: |
||
217 | log.error(e) |
||
218 | return None |
||
219 | except ValueError as e: |
||
220 | log.error(r.text) |
||
221 | log.error(e) |
||
222 | return None |
||
223 | |||
224 | |||
225 | def parse_result(result): |
||
226 | # gh-421 |
||
227 | # Return None if statusCode is not 0 |
||
228 | # https://developer.mapquest.com/documentation/geocoding-api/status-codes/ |
||
229 | if( 'info' not in result or |
||
230 | 'statuscode' not in result['info'] or |
||
231 | result['info']['statuscode'] != 0 |
||
232 | ): |
||
233 | return None |
||
234 | |||
235 | address = parse_result_address(result) |
||
236 | if(address is None): |
||
237 | return None |
||
238 | |||
239 | result['address'] = address |
||
240 | result['latLng'] = parse_result_latlon(result) |
||
241 | |||
242 | return result |
||
243 | |||
244 | def parse_result_address(result): |
||
245 | # We want to store the city, state and country |
||
246 | # The only way determined to identify an unfound address is |
||
247 | # that none of the indicies were found |
||
248 | if( 'results' not in result or |
||
249 | len(result['results']) == 0 or |
||
250 | 'locations' not in result['results'][0] or |
||
251 | len(result['results'][0]['locations']) == 0 |
||
252 | ): |
||
253 | return None |
||
254 | |||
255 | index_found = False |
||
256 | addresses = {'city': None, 'state': None, 'country': None} |
||
257 | result_compat = {} |
||
258 | result_compat['address'] = {} |
||
259 | |||
260 | |||
261 | locations = result['results'][0]['locations'][0] |
||
262 | # We are looping over locations to find the adminAreaNType key which |
||
263 | # has a value of City, State or Country. |
||
264 | # Once we find it then we obtain the value from the key adminAreaN |
||
265 | # where N is a numeric index. |
||
266 | # For example |
||
267 | # * adminArea1Type = 'City' |
||
268 | # * adminArea1 = 'Sunnyvale' |
||
269 | for key in locations: |
||
270 | # Check if the key is of the form adminArea1Type |
||
271 | if(key[-4:] == 'Type'): |
||
272 | # If it's a type then check if it corresponds to one we are intereated in |
||
273 | # and store the index by parsing the key |
||
274 | key_prefix = key[:-4] |
||
275 | key_index = key[-5:-4] |
||
276 | if(locations[key].lower() in addresses): |
||
277 | addresses[locations[key].lower()] = locations[key_prefix] |
||
278 | index_found = True |
||
279 | |||
280 | if(index_found is False): |
||
281 | return None |
||
282 | |||
283 | return addresses |
||
284 | |||
285 | def parse_result_latlon(result): |
||
286 | if( 'results' not in result or |
||
287 | len(result['results']) == 0 or |
||
288 | 'locations' not in result['results'][0] or |
||
289 | len(result['results'][0]['locations']) == 0 or |
||
290 | 'latLng' not in result['results'][0]['locations'][0] |
||
291 | ): |
||
292 | return None |
||
293 | |||
294 | latLng = result['results'][0]['locations'][0]['latLng']; |
||
295 | |||
296 | return {'lat': latLng['lat'], 'lon': latLng['lng']} |
||
297 |