Passed
Push — master ( e2840a...6a0a44 )
by Steffen
02:19
created

saucenao.saucenao   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 325
Duplicated Lines 0 %

Test Coverage

Coverage 74.5%

Importance

Changes 0
Metric Value
wmc 38
eloc 178
dl 0
loc 325
ccs 111
cts 149
cp 0.745
rs 9.36
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A SauceNao.merge_dicts() 0 11 1
A SauceNao.__merge_results() 0 17 4
A SauceNao.__filter_results() 0 14 3
A SauceNao.__init__() 0 50 4
B SauceNao.parse_results_html_to_json() 0 43 5
A SauceNao.get_title_value() 0 15 4
A SauceNao.check_file() 0 20 2
B SauceNao.__check_image() 0 34 5
A SauceNao.__get_http_data() 0 36 3
A SauceNao.parse_results_json() 0 10 2
A SauceNao.get_content_value() 0 15 5
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import json
4 1
import logging
5 1
import os
6 1
import re
7 1
import time
8 1
from mimetypes import MimeTypes
9 1
from typing import Generator
10
11 1
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14
15 1
from saucenao import http
16 1
from saucenao.exceptions import *
17
18
19 1
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26
    # all available account types, unregistered (always if no API key is passed), basic or premium
27 1
    ACCOUNT_TYPE_UNREGISTERED = ""
28 1
    ACCOUNT_TYPE_BASIC = "basic"
29 1
    ACCOUNT_TYPE_PREMIUM = "premium"
30
31
    # individual search usage limitations
32 1
    LIMIT_30_SECONDS = {
33
        ACCOUNT_TYPE_UNREGISTERED: 4,
34
        ACCOUNT_TYPE_BASIC: 6,
35
        ACCOUNT_TYPE_PREMIUM: 15,
36
    }
37
38
    # 0=html, 2=json but json is omitting important data but includes more data about authors
39
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
40 1
    API_HTML_TYPE = 0
41 1
    API_JSON_TYPE = 2
42
43 1
    CONTENT_CATEGORY_KEY = 'Material'
44 1
    CONTENT_AUTHOR_KEY = 'Creator'
45 1
    CONTENT_CHARACTERS_KEY = 'Characters'
46
47 1
    mime = None
48 1
    logger = None
49
50 1
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
51
                 is_premium=False, exclude_categories='', move_to_categories=False, use_author_as_category=False,
52
                 output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR, title_minimum_similarity=90):
53
        """Initializing function
54
55
        :type directory: str
56
        :type databases: int
57
        :type minimum_similarity: float
58
        :type combine_api_types: bool
59
        :type api_key: str
60
        :type is_premium: bool
61
        :type exclude_categories: str
62
        :type move_to_categories: bool
63
        :type use_author_as_category: bool
64
        :type output_type: int
65
        :type start_file: str
66
        :type log_level: int
67
        :type title_minimum_similarity: float
68
        """
69 1
        self.directory = directory
70 1
        self.databases = databases
71 1
        self.minimum_similarity = minimum_similarity
72 1
        self.combine_api_types = combine_api_types
73 1
        self.api_key = api_key
74 1
        self.is_premium = is_premium
75 1
        self.exclude_categories = exclude_categories
76 1
        self.move_to_categories = move_to_categories
77 1
        self.use_author_as_category = use_author_as_category
78 1
        self.output_type = output_type
79 1
        self.start_file = start_file
80 1
        self.title_minimum_similarity = title_minimum_similarity
81
82 1
        if self.api_key:
83
            if self.is_premium:
84
                account_type = self.ACCOUNT_TYPE_PREMIUM
85
            else:
86
                account_type = self.ACCOUNT_TYPE_BASIC
87
            self.search_limit_30s = self.LIMIT_30_SECONDS[account_type]
88
        else:
89 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
90
91 1
        if self.combine_api_types:
92
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
93
            self.search_limit_30s /= 2
94
95 1
        self.previous_status_code = None
96
97 1
        self.mime = MimeTypes()
98 1
        logging.basicConfig(level=log_level)
99 1
        self.logger = logging.getLogger("saucenao_logger")
100
101 1
    def check_file(self, file_name: str) -> list:
102
        """Check the given file for results on SauceNAO
103
104
        :type file_name: str
105
        :return:
106
        """
107 1
        self.logger.info("checking file: {0:s}".format(file_name))
108 1
        if self.combine_api_types:
109
            result = self.__check_image(file_name, self.API_HTML_TYPE)
110
            sorted_results = self.parse_results_json(result)
111
112
            additional_result = self.__check_image(file_name, self.API_JSON_TYPE)
113
            additional_sorted_results = self.parse_results_json(additional_result)
114
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
115
        else:
116 1
            result = self.__check_image(file_name, self.output_type)
117 1
            sorted_results = self.parse_results_json(result)
118
119 1
        filtered_results = self.__filter_results(sorted_results)
120 1
        return filtered_results
121
122 1
    def __get_http_data(self, file_path: str, output_type: int):
123
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
124
125
        :param file_path:
126
        :param output_type:
127
        :return:
128
        """
129 1
        with open(file_path, 'rb') as file_object:
130 1
            files = {'file': file_object.read()}
131
132 1
        headers = {
133
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
134
                          'Chrome/63.0.3239.84 Safari/537.36',
135
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
136
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
137
            'Accept-Encoding': 'gzip, deflate, br',
138
            'DNT': '1',
139
            'Connection': 'keep-alive'
140
        }
141
142 1
        params = {
143
            'file': file_path,
144
            'Content-Type': self.mime.guess_type(file_path),
145
            # parameters taken from form on main page: https://saucenao.com/
146
            'url': None,
147
            'frame': 1,
148
            'hide': 0,
149
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
150
            'output_type': output_type,
151
            'db': self.databases,
152
        }
153
154 1
        if self.api_key:
155
            params['api_key'] = self.api_key
156
157 1
        return files, params, headers
158
159 1
    def __check_image(self, file_name: str, output_type: int) -> str:
160
        """Check the possible sources for the given file
161
162
        :type output_type: int
163
        :type file_name: str
164
        :return:
165
        """
166 1
        file_path = os.path.join(self.directory, file_name)
167
168 1
        files, params, headers = self.__get_http_data(file_path=file_path, output_type=output_type)
169 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
170
171 1
        code, msg = http.verify_status_code(link, file_name)
172
173 1
        if code == http.STATUS_CODE_SKIP:
174
            self.logger.error(msg)
175
            return json.dumps({'results': []})
176 1
        elif code == http.STATUS_CODE_REPEAT:
177
            if not self.previous_status_code:
178
                self.previous_status_code = code
179
                self.logger.info(
180
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
181
                )
182
                time.sleep(10)
183
                return self.__check_image(file_name, output_type)
184
            else:
185
                raise UnknownStatusCodeException(msg)
186
        else:
187 1
            self.previous_status_code = None
188
189 1
        if output_type == self.API_HTML_TYPE:
190 1
            return self.parse_results_html_to_json(link.text)
191
192
        return link.text
193
194 1
    @staticmethod
195 1
    def parse_results_html_to_json(html: str) -> str:
196
        """Parse the results and sort them descending by similarity
197
198
        :type html: str
199
        :return:
200
        """
201 1
        soup = Soup(html, 'html.parser')
202
        # basic format of json API response
203 1
        results = {'header': {}, 'results': []}
204
205 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
206
            # optional field in SauceNao
207 1
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
208 1
            if title_tag:
209 1
                title = title_tag.text
210
            else:
211
                title = ''
212
213
            # mandatory field in SauceNao
214 1
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
215 1
            alternate_links = [a_tag['href'] for a_tag in
216
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
217 1
            content_column = []
218 1
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
219 1
            for content_column_tag in content_column_tags:
220 1
                for br in content_column_tag.find_all('br'):
221 1
                    br.replace_with('\n')
222 1
                content_column.append(content_column_tag.text)
223
224 1
            result = {
225
                'header': {
226
                    'similarity': similarity
227
                },
228
                'data': {
229
                    'title': title,
230
                    'content': content_column,
231
                    'ext_urls': alternate_links
232
                }
233
            }
234 1
            results['results'].append(result)
235
236 1
        return json.dumps(results)
237
238 1
    @staticmethod
239 1
    def parse_results_json(text: str) -> list:
240
        """Parse the results and sort them descending by similarity
241
242
        :type text: str
243
        :return:
244
        """
245 1
        result = json.loads(text)
246 1
        results = [res for res in result['results']]
247 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
248
249 1
    def __filter_results(self, sorted_results) -> list:
250
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
251
252
        :type sorted_results: list|tuple|Generator
253
        :return:
254
        """
255 1
        filtered_results = []
256 1
        for res in sorted_results:
257 1
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
258 1
                filtered_results.append(res)
259
            else:
260
                # we can break here since the results are sorted by similarity anyways
261
                break
262 1
        return filtered_results
263
264 1
    @staticmethod
265 1
    def get_content_value(results, key: str):
266
        """Return the first match of Material in content
267
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
268
269
        :type results: list|tuple|Generator
270
        :type key: str
271
        :return:
272
        """
273 1
        for result in results:
274 1
            if 'content' in list(result['data'].keys()):
275 1
                for content in result['data']['content']:
276 1
                    if re.search(r'{0:s}: .*'.format(key), content):
277
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
278 1
        return ''
279
280 1
    @staticmethod
281 1
    def get_title_value(results, key: str):
282
        """Return the first match of Material in the title section
283
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
284
285
        :type results: list|tuple|Generator
286
        :type key: str
287
        :return:
288
        """
289
        for result in results:
290
            if 'title' in list(result['data'].keys()):
291
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
292
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
293
                        .split('\n')
294
        return ''
295
296 1
    @staticmethod
297 1
    def merge_dicts(x: dict, y: dict) -> dict:
298
        """Take x dictionary and insert/overwrite y dictionary values
299
300
        :type x: dict
301
        :type y: dict
302
        :return:
303
        """
304
        z = x.copy()
305
        z.update(y)
306
        return z
307
308 1
    def __merge_results(self, result: list, additional_result: list) -> list:
309
        """Merge two result arrays
310
311
        :type result: list
312
        :type additional_result: list
313
        :return:
314
        """
315
        if len(result) <= len(additional_result):
316
            length = len(result)
317
        else:
318
            length = len(additional_result)
319
320
        for i in range(length):
321
            for key in list(result[i].keys()):
322
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
323
324
        return result
325