1 | import logging |
||
2 | import requests |
||
3 | import os |
||
4 | from enum import Enum |
||
5 | import random |
||
6 | import string |
||
7 | import time |
||
8 | |||
9 | |||
10 | class Purity(Enum): |
||
11 | sfw = "sfw" |
||
12 | sketchy = "sketchy" |
||
13 | nsfw = "nsfw" |
||
14 | |||
15 | |||
16 | class Category(Enum): |
||
17 | general = "general" |
||
18 | anime = "anime" |
||
19 | people = "people" |
||
20 | |||
21 | |||
22 | class Sorting(Enum): |
||
23 | date_added = "date_added" |
||
24 | relevance = "relevance" |
||
25 | random = "random" |
||
26 | views = "views" |
||
27 | favorites = "favorites" |
||
28 | toplist = "toplist" |
||
29 | |||
30 | |||
31 | class Order(Enum): |
||
32 | # desc used by default |
||
33 | desc = "desc" |
||
34 | asc = "asc" |
||
35 | |||
36 | |||
37 | class TopRange(Enum): |
||
38 | one_day = "1d" |
||
39 | three_days = "3d" |
||
40 | one_week = "1w" |
||
41 | one_month = "1M" |
||
42 | three_months = "3M" |
||
43 | six_months = "6M" |
||
44 | one_year = "1y" |
||
45 | |||
46 | |||
47 | class Color(Enum): |
||
48 | # Color names from http://chir.ag/projects/name-that-color |
||
49 | lonestar = "660000" |
||
50 | red_berry = "990000" |
||
51 | guardsman_red = "cc0000" |
||
52 | persian_red = "cc3333" |
||
53 | french_rose = "ea4c88" |
||
54 | plum = "993399" |
||
55 | royal_purple = "663399" |
||
56 | sapphire = "333399" |
||
57 | science_blue = "0066cc" |
||
58 | pacific_blue = "0099cc" |
||
59 | downy = "66cccc" |
||
60 | atlantis = "77cc33" |
||
61 | limeade = "669900" |
||
62 | verdun_green = "336600" |
||
63 | verdun_green_2 = "666600" |
||
64 | olive = "999900" |
||
65 | earls_green = "cccc33" |
||
66 | yellow = "ffff00" |
||
67 | sunglow = "ffcc33" |
||
68 | orange_peel = "ff9900" |
||
69 | blaze_orange = "ff6600" |
||
70 | tuscany = "cc6633" |
||
71 | potters_clay = "996633" |
||
72 | nutmeg_wood_finish = "663300" |
||
73 | black = "000000" |
||
74 | dusty_gray = "999999" |
||
75 | silver = "cccccc" |
||
76 | white = "ffffff" |
||
77 | gun_powder = "424153" |
||
78 | |||
79 | |||
80 | class Type(Enum): |
||
81 | jpeg = "jpeg" |
||
82 | jpg = "jpg" # the same as jpeg |
||
83 | png = "png" |
||
84 | |||
85 | |||
86 | class Seed(object): |
||
87 | @staticmethod |
||
88 | def generate(): |
||
89 | # [a-zA-Z0-9]{6} |
||
90 | return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(6)) |
||
91 | |||
92 | |||
93 | class RequestsLimitError(Exception): |
||
94 | def __init__(self): |
||
95 | super().__init__("You have exceeded requests limit. Please try later.") |
||
96 | |||
97 | |||
98 | class ApiKeyError(Exception): |
||
99 | def __init__(self): |
||
100 | super().__init__("Bad api key. Check it please.") |
||
101 | |||
102 | |||
103 | class UnhandledException(Exception): |
||
104 | def __init__(self): |
||
105 | super().__init__("Somthing went wrong. Please submit this issue to " |
||
106 | "https://github.com/Goblenus/WallhavenApi/issues.") |
||
107 | |||
108 | |||
109 | class NoWallpaperError(Exception): |
||
110 | def __init__(self, wallpaper_id): |
||
111 | super().__init__("No wallpaper with id {}".format(wallpaper_id)) |
||
112 | |||
113 | |||
114 | class WallhavenApiV1: |
||
115 | def __init__(self, api_key=None, verify_connection=True, base_url="https://wallhaven.cc/api/v1", |
||
116 | timeout=(2,5), requestslimit_timeout=None): |
||
117 | self.verify_connection = verify_connection |
||
118 | self.api_key = api_key |
||
119 | self.base_url = base_url |
||
120 | self.timeout = timeout |
||
121 | self.requestslimit_timeout = requestslimit_timeout |
||
122 | |||
123 | def _request(self, to_json, **kwargs): |
||
124 | for i in range(self.requestslimit_timeout[0] if self.requestslimit_timeout is not None else 1): |
||
125 | if self.api_key is not None: |
||
126 | if "params" in kwargs: |
||
127 | kwargs["params"]["apikey"] = self.api_key |
||
128 | else: |
||
129 | kwargs["params"] = {"apikey": self.api_key} |
||
130 | |||
131 | if "timeout" not in kwargs: |
||
132 | kwargs["timeout"] = self.timeout |
||
133 | |||
134 | if "verify" not in kwargs: |
||
135 | kwargs["verify"] = self.verify_connection |
||
136 | |||
137 | response = requests.request(**kwargs) |
||
138 | |||
139 | if response.status_code == 429: |
||
140 | if self.requestslimit_timeout is None\ |
||
141 | or i == (self.requestslimit_timeout[0] - 1) if self.requestslimit_timeout is not None else 0: |
||
142 | raise RequestsLimitError |
||
143 | |||
144 | time.sleep(self.requestslimit_timeout[1]) |
||
145 | continue |
||
146 | |||
147 | if response.status_code == 401: |
||
148 | raise ApiKeyError |
||
149 | |||
150 | if response.status_code != 200: |
||
151 | raise UnhandledException |
||
152 | |||
153 | if to_json: |
||
154 | try: |
||
155 | return response.json() |
||
156 | except: |
||
157 | raise UnhandledException |
||
158 | |||
159 | return response |
||
160 | |||
161 | def _url_format(self, *args): |
||
162 | url = self.base_url |
||
163 | url += "/" if not url.endswith("/") else "" |
||
164 | |||
165 | return url + "/".join((str(x) for x in args)) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
166 | |||
167 | @staticmethod |
||
168 | def _category(general=True, anime=True, people=False): |
||
169 | return "{}{}{}".format(int(general), int(anime), int(people)) |
||
170 | |||
171 | @staticmethod |
||
172 | def _purity(sfw=True, sketchy=True, nsfw=False): |
||
173 | return "{}{}{}".format(int(sfw), int(sketchy), int(nsfw)) |
||
174 | |||
175 | def search(self, q=None, categories=None, purities=None, sorting=None, order=None, top_range=None, atleast=None, |
||
176 | resolutions=None, ratios=None, colors=None, page=None, seed=None): |
||
177 | params = {} |
||
178 | if q is not None: |
||
179 | params["q"] = q |
||
180 | |||
181 | if categories is not None: |
||
182 | categories = categories if type(categories) is list else [categories] |
||
183 | params["categories"] = self._category(Category.general in categories, Category.anime in categories, |
||
184 | Category.people in categories) |
||
185 | |||
186 | if purities is not None: |
||
187 | purities = purities if type(purities) is list else [purities] |
||
188 | params["purity"] = self._purity(Purity.sfw in purities, Purity.sketchy in purities, |
||
189 | Purity.nsfw in purities) |
||
190 | |||
191 | if sorting is not None: |
||
192 | params["sorting"] = sorting.value |
||
193 | |||
194 | if order is not None: |
||
195 | params["order"] = order.value |
||
196 | |||
197 | if top_range is not None: |
||
198 | params["topRange"] = top_range.value |
||
199 | |||
200 | if atleast is not None: |
||
201 | params["atleast"] = "{}x{}".format(atleast[0], atleast[1]) |
||
202 | |||
203 | if resolutions is not None: |
||
204 | params["resolutions"] = ",".join(["{}x{}".format(x[0], x[1]) \ |
||
205 | for x in (resolutions if type(resolutions) is list else [resolutions])]) |
||
206 | |||
207 | if ratios is not None: |
||
208 | params["ratios"] = ",".join(["{}x{}".format(x[0], x[1]) \ |
||
209 | for x in (ratios if type(ratios) is list else [ratios])]) |
||
210 | |||
211 | if colors is not None: |
||
212 | params["colors"] = colors.value |
||
213 | |||
214 | if page is not None: |
||
215 | params["page"] = str(page) |
||
216 | |||
217 | if seed is not None: |
||
218 | params["seed"] = seed |
||
219 | |||
220 | return self._request(True, method="get", url=self._url_format("search"), params=params) |
||
221 | |||
222 | def wallpaper(self, wallpaper_id): |
||
223 | return self._request(True, method="get", url=self._url_format("w", wallpaper_id)) |
||
224 | |||
225 | def is_walpaper_exists(self, wallpaper_id): |
||
226 | return "error" not in self.wallpaper(wallpaper_id) |
||
227 | |||
228 | def download_walpaper(self, *args, **kwargs): |
||
229 | logging.warning('Please use "download_wallpaper" method instead "download_walpaper"') |
||
230 | return self.download_wallpaper(*args, **kwargs) |
||
231 | |||
232 | def download_wallpaper(self, wallpaper_id, file_path, chunk_size=4096): |
||
233 | wallpaper_data = self.wallpaper(wallpaper_id) |
||
234 | |||
235 | if "error" in wallpaper_data: |
||
236 | raise NoWallpaperError(wallpaper_id) |
||
237 | |||
238 | wallpaper = requests.get(wallpaper_data["data"]["path"], stream=True, timeout=self.timeout, |
||
239 | verify=self.verify_connection) |
||
240 | |||
241 | if wallpaper.status_code != 200: |
||
242 | raise UnhandledException |
||
243 | |||
244 | if file_path is not None: |
||
245 | save_path = os.path.abspath(file_path) |
||
246 | save_directory_path = os.path.dirname(save_path) |
||
247 | |||
248 | if not os.path.exists(save_directory_path): |
||
249 | os.makedirs(save_directory_path) |
||
250 | |||
251 | with open(save_path, "wb") as image_file: |
||
252 | for chunk in wallpaper.iter_content(chunk_size): |
||
253 | image_file.write(chunk) |
||
254 | |||
255 | return save_path |
||
256 | |||
257 | return wallpaper.content |
||
258 | |||
259 | def tag(self, tag_id): |
||
260 | return self._request(True, method="get", url=self._url_format("tag", tag_id)) |
||
261 | |||
262 | def settings(self): |
||
263 | return None if self.api_key is None else self._request(True, method="get", url=self._url_format("settings")) |
||
264 | |||
265 | def collections(self, user_name): |
||
266 | return self._request(True, method="get", url=self._url_format(f"collections/{user_name}")) |
||
267 | |||
268 | def collection_wallpapers(self, user_name, collection_id, page=None): |
||
269 | return self._request(True, method="get", url=self._url_format(f"collections/{user_name}/{collection_id}"), |
||
270 | params={"page": str(page)} if page is not None else {}) |
||
271 | |||
272 | def my_collections(self): |
||
273 | return None if self.api_key is None else self._request(True, method="get", url=self._url_format(f"collections")) |
||
274 |