Passed
Push — master ( e2840a...6a0a44 )
by Steffen
02:19
created

saucenao.saucenao.SauceNao.__init__()   A

Complexity

Conditions 4

Size

Total Lines 50
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 4.128

Importance

Changes 0
Metric Value
eloc 27
dl 0
loc 50
ccs 20
cts 25
cp 0.8
rs 9.232
c 0
b 0
f 0
cc 4
nop 14
crap 4.128

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import json
4 1
import logging
5 1
import os
6 1
import re
7 1
import time
8 1
from mimetypes import MimeTypes
9 1
from typing import Generator
10
11 1
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14
15 1
from saucenao import http
16 1
from saucenao.exceptions import *
17
18
19 1
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26
    # all available account types, unregistered (always if no API key is passed), basic or premium
27 1
    ACCOUNT_TYPE_UNREGISTERED = ""
28 1
    ACCOUNT_TYPE_BASIC = "basic"
29 1
    ACCOUNT_TYPE_PREMIUM = "premium"
30
31
    # individual search usage limitations
32 1
    LIMIT_30_SECONDS = {
33
        ACCOUNT_TYPE_UNREGISTERED: 4,
34
        ACCOUNT_TYPE_BASIC: 6,
35
        ACCOUNT_TYPE_PREMIUM: 15,
36
    }
37
38
    # 0=html, 2=json but json is omitting important data but includes more data about authors
39
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
40 1
    API_HTML_TYPE = 0
41 1
    API_JSON_TYPE = 2
42
43 1
    CONTENT_CATEGORY_KEY = 'Material'
44 1
    CONTENT_AUTHOR_KEY = 'Creator'
45 1
    CONTENT_CHARACTERS_KEY = 'Characters'
46
47 1
    mime = None
48 1
    logger = None
49
50 1
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
51
                 is_premium=False, exclude_categories='', move_to_categories=False, use_author_as_category=False,
52
                 output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR, title_minimum_similarity=90):
53
        """Initializing function
54
55
        :type directory: str
56
        :type databases: int
57
        :type minimum_similarity: float
58
        :type combine_api_types: bool
59
        :type api_key: str
60
        :type is_premium: bool
61
        :type exclude_categories: str
62
        :type move_to_categories: bool
63
        :type use_author_as_category: bool
64
        :type output_type: int
65
        :type start_file: str
66
        :type log_level: int
67
        :type title_minimum_similarity: float
68
        """
69 1
        self.directory = directory
70 1
        self.databases = databases
71 1
        self.minimum_similarity = minimum_similarity
72 1
        self.combine_api_types = combine_api_types
73 1
        self.api_key = api_key
74 1
        self.is_premium = is_premium
75 1
        self.exclude_categories = exclude_categories
76 1
        self.move_to_categories = move_to_categories
77 1
        self.use_author_as_category = use_author_as_category
78 1
        self.output_type = output_type
79 1
        self.start_file = start_file
80 1
        self.title_minimum_similarity = title_minimum_similarity
81
82 1
        if self.api_key:
83
            if self.is_premium:
84
                account_type = self.ACCOUNT_TYPE_PREMIUM
85
            else:
86
                account_type = self.ACCOUNT_TYPE_BASIC
87
            self.search_limit_30s = self.LIMIT_30_SECONDS[account_type]
88
        else:
89 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
90
91 1
        if self.combine_api_types:
92
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
93
            self.search_limit_30s /= 2
94
95 1
        self.previous_status_code = None
96
97 1
        self.mime = MimeTypes()
98 1
        logging.basicConfig(level=log_level)
99 1
        self.logger = logging.getLogger("saucenao_logger")
100
101 1
    def check_file(self, file_name: str) -> list:
102
        """Check the given file for results on SauceNAO
103
104
        :type file_name: str
105
        :return:
106
        """
107 1
        self.logger.info("checking file: {0:s}".format(file_name))
108 1
        if self.combine_api_types:
109
            result = self.__check_image(file_name, self.API_HTML_TYPE)
110
            sorted_results = self.parse_results_json(result)
111
112
            additional_result = self.__check_image(file_name, self.API_JSON_TYPE)
113
            additional_sorted_results = self.parse_results_json(additional_result)
114
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
115
        else:
116 1
            result = self.__check_image(file_name, self.output_type)
117 1
            sorted_results = self.parse_results_json(result)
118
119 1
        filtered_results = self.__filter_results(sorted_results)
120 1
        return filtered_results
121
122 1
    def __get_http_data(self, file_path: str, output_type: int):
123
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
124
125
        :param file_path:
126
        :param output_type:
127
        :return:
128
        """
129 1
        with open(file_path, 'rb') as file_object:
130 1
            files = {'file': file_object.read()}
131
132 1
        headers = {
133
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
134
                          'Chrome/63.0.3239.84 Safari/537.36',
135
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
136
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
137
            'Accept-Encoding': 'gzip, deflate, br',
138
            'DNT': '1',
139
            'Connection': 'keep-alive'
140
        }
141
142 1
        params = {
143
            'file': file_path,
144
            'Content-Type': self.mime.guess_type(file_path),
145
            # parameters taken from form on main page: https://saucenao.com/
146
            'url': None,
147
            'frame': 1,
148
            'hide': 0,
149
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
150
            'output_type': output_type,
151
            'db': self.databases,
152
        }
153
154 1
        if self.api_key:
155
            params['api_key'] = self.api_key
156
157 1
        return files, params, headers
158
159 1
    def __check_image(self, file_name: str, output_type: int) -> str:
160
        """Check the possible sources for the given file
161
162
        :type output_type: int
163
        :type file_name: str
164
        :return:
165
        """
166 1
        file_path = os.path.join(self.directory, file_name)
167
168 1
        files, params, headers = self.__get_http_data(file_path=file_path, output_type=output_type)
169 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
170
171 1
        code, msg = http.verify_status_code(link, file_name)
172
173 1
        if code == http.STATUS_CODE_SKIP:
174
            self.logger.error(msg)
175
            return json.dumps({'results': []})
176 1
        elif code == http.STATUS_CODE_REPEAT:
177
            if not self.previous_status_code:
178
                self.previous_status_code = code
179
                self.logger.info(
180
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
181
                )
182
                time.sleep(10)
183
                return self.__check_image(file_name, output_type)
184
            else:
185
                raise UnknownStatusCodeException(msg)
186
        else:
187 1
            self.previous_status_code = None
188
189 1
        if output_type == self.API_HTML_TYPE:
190 1
            return self.parse_results_html_to_json(link.text)
191
192
        return link.text
193
194 1
    @staticmethod
195 1
    def parse_results_html_to_json(html: str) -> str:
196
        """Parse the results and sort them descending by similarity
197
198
        :type html: str
199
        :return:
200
        """
201 1
        soup = Soup(html, 'html.parser')
202
        # basic format of json API response
203 1
        results = {'header': {}, 'results': []}
204
205 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
206
            # optional field in SauceNao
207 1
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
208 1
            if title_tag:
209 1
                title = title_tag.text
210
            else:
211
                title = ''
212
213
            # mandatory field in SauceNao
214 1
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
215 1
            alternate_links = [a_tag['href'] for a_tag in
216
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
217 1
            content_column = []
218 1
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
219 1
            for content_column_tag in content_column_tags:
220 1
                for br in content_column_tag.find_all('br'):
221 1
                    br.replace_with('\n')
222 1
                content_column.append(content_column_tag.text)
223
224 1
            result = {
225
                'header': {
226
                    'similarity': similarity
227
                },
228
                'data': {
229
                    'title': title,
230
                    'content': content_column,
231
                    'ext_urls': alternate_links
232
                }
233
            }
234 1
            results['results'].append(result)
235
236 1
        return json.dumps(results)
237
238 1
    @staticmethod
239 1
    def parse_results_json(text: str) -> list:
240
        """Parse the results and sort them descending by similarity
241
242
        :type text: str
243
        :return:
244
        """
245 1
        result = json.loads(text)
246 1
        results = [res for res in result['results']]
247 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
248
249 1
    def __filter_results(self, sorted_results) -> list:
250
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
251
252
        :type sorted_results: list|tuple|Generator
253
        :return:
254
        """
255 1
        filtered_results = []
256 1
        for res in sorted_results:
257 1
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
258 1
                filtered_results.append(res)
259
            else:
260
                # we can break here since the results are sorted by similarity anyways
261
                break
262 1
        return filtered_results
263
264 1
    @staticmethod
265 1
    def get_content_value(results, key: str):
266
        """Return the first match of Material in content
267
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
268
269
        :type results: list|tuple|Generator
270
        :type key: str
271
        :return:
272
        """
273 1
        for result in results:
274 1
            if 'content' in list(result['data'].keys()):
275 1
                for content in result['data']['content']:
276 1
                    if re.search(r'{0:s}: .*'.format(key), content):
277
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
278 1
        return ''
279
280 1
    @staticmethod
281 1
    def get_title_value(results, key: str):
282
        """Return the first match of Material in the title section
283
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
284
285
        :type results: list|tuple|Generator
286
        :type key: str
287
        :return:
288
        """
289
        for result in results:
290
            if 'title' in list(result['data'].keys()):
291
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
292
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
293
                        .split('\n')
294
        return ''
295
296 1
    @staticmethod
297 1
    def merge_dicts(x: dict, y: dict) -> dict:
298
        """Take x dictionary and insert/overwrite y dictionary values
299
300
        :type x: dict
301
        :type y: dict
302
        :return:
303
        """
304
        z = x.copy()
305
        z.update(y)
306
        return z
307
308 1
    def __merge_results(self, result: list, additional_result: list) -> list:
309
        """Merge two result arrays
310
311
        :type result: list
312
        :type additional_result: list
313
        :return:
314
        """
315
        if len(result) <= len(additional_result):
316
            length = len(result)
317
        else:
318
            length = len(additional_result)
319
320
        for i in range(length):
321
            for key in list(result[i].keys()):
322
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
323
324
        return result
325