Passed
Push — master ( 95920f...059214 )
by Steffen
04:34
created

saucenao.saucenao.SauceNao.__init__()   A

Complexity

Conditions 3

Size

Total Lines 44
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 3.0077

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 44
ccs 19
cts 21
cp 0.9048
rs 9.328
c 0
b 0
f 0
cc 3
nop 13
crap 3.0077

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import json
4 1
import logging
5 1
import os
6 1
import re
7 1
import time
8 1
from mimetypes import MimeTypes
9 1
from typing import Generator
10
11 1
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14
15 1
from saucenao import http
16 1
from saucenao.exceptions import *
17
18
19 1
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26 1
    ACCOUNT_TYPE_UNREGISTERED = ""
27 1
    ACCOUNT_TYPE_REGISTERED = "basic"
28
29
    # individual search usage limitations
30 1
    LIMIT_30_SECONDS = {
31
        ACCOUNT_TYPE_UNREGISTERED: 4,
32
        ACCOUNT_TYPE_REGISTERED: 15,
33
    }
34
35
    # 0=html, 2=json but json is omitting important data but includes more data about authors
36
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
37 1
    API_HTML_TYPE = 0
38 1
    API_JSON_TYPE = 2
39
40 1
    CONTENT_CATEGORY_KEY = 'Material'
41 1
    CONTENT_AUTHOR_KEY = 'Creator'
42 1
    CONTENT_CHARACTERS_KEY = 'Characters'
43
44 1
    mime = None
45 1
    logger = None
46
47 1
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
48
                 exclude_categories='', move_to_categories=False, use_author_as_category=False,
49
                 output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR, title_minimum_similarity=90):
50
        """Initializing function
51
52
        :type directory: str
53
        :type databases: int
54
        :type minimum_similarity: float
55
        :type combine_api_types: bool
56
        :type api_key: str
57
        :type exclude_categories: str
58
        :type move_to_categories: bool
59
        :type use_author_as_category: bool
60
        :type output_type: int
61
        :type start_file: str
62
        :type log_level: int
63
        :type title_minimum_similarity: float
64
        """
65 1
        self.directory = directory
66 1
        self.databases = databases
67 1
        self.minimum_similarity = minimum_similarity
68 1
        self.combine_api_types = combine_api_types
69 1
        self.api_key = api_key
70 1
        self.exclude_categories = exclude_categories
71 1
        self.move_to_categories = move_to_categories
72 1
        self.use_author_as_category = use_author_as_category
73 1
        self.output_type = output_type
74 1
        self.start_file = start_file
75 1
        self.title_minimum_similarity = title_minimum_similarity
76
77 1
        if self.api_key:
78
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_REGISTERED]
79
        else:
80 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
81
82 1
        if self.combine_api_types:
83
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
84
            self.search_limit_30s /= 2
85
86 1
        self.previous_status_code = None
87
88 1
        self.mime = MimeTypes()
89 1
        logging.basicConfig(level=log_level)
90 1
        self.logger = logging.getLogger("saucenao_logger")
91
92 1
    def check_file(self, file_name: str) -> list:
93
        """Check the given file for results on SauceNAO
94
95
        :type file_name: str
96
        :return:
97
        """
98 1
        self.logger.info("checking file: {0:s}".format(file_name))
99 1
        if self.combine_api_types:
100
            result = self.__check_image(file_name, self.API_HTML_TYPE)
101
            sorted_results = self.parse_results_json(result)
102
103
            additional_result = self.__check_image(file_name, self.API_JSON_TYPE)
104
            additional_sorted_results = self.parse_results_json(additional_result)
105
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
106
        else:
107 1
            result = self.__check_image(file_name, self.output_type)
108 1
            sorted_results = self.parse_results_json(result)
109
110 1
        filtered_results = self.__filter_results(sorted_results)
111 1
        return filtered_results
112
113 1
    def __get_http_data(self, file_path: str, output_type: int):
114
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
115
116
        :param file_path:
117
        :param output_type:
118
        :return:
119
        """
120 1
        with open(file_path, 'rb') as file_object:
121 1
            files = {'file': file_object.read()}
122
123 1
        headers = {
124
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
125
                          'Chrome/63.0.3239.84 Safari/537.36',
126
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
127
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
128
            'Accept-Encoding': 'gzip, deflate, br',
129
            'DNT': '1',
130
            'Connection': 'keep-alive'
131
        }
132
133 1
        params = {
134
            'file': file_path,
135
            'Content-Type': self.mime.guess_type(file_path),
136
            # parameters taken from form on main page: https://saucenao.com/
137
            'url': None,
138
            'frame': 1,
139
            'hide': 0,
140
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
141
            'output_type': output_type,
142
            'db': self.databases,
143
        }
144
145 1
        if self.api_key:
146
            params['api_key'] = self.api_key
147
148 1
        return files, params, headers
149
150 1
    def __check_image(self, file_name: str, output_type: int) -> str:
151
        """Check the possible sources for the given file
152
153
        :type output_type: int
154
        :type file_name: str
155
        :return:
156
        """
157 1
        file_path = os.path.join(self.directory, file_name)
158
159 1
        files, params, headers = self.__get_http_data(file_path=file_path, output_type=output_type)
160 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
161
162 1
        code, msg = http.verify_status_code(link, file_name)
163
164 1
        if code == http.STATUS_CODE_SKIP:
165
            self.logger.error(msg)
166
            return json.dumps({'results': []})
167 1
        elif code == http.STATUS_CODE_REPEAT:
168
            if not self.previous_status_code:
169
                self.previous_status_code = code
170
                self.logger.info(
171
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
172
                )
173
                time.sleep(10)
174
                return self.__check_image(file_name, output_type)
175
            else:
176
                raise UnknownStatusCodeException(msg)
177
        else:
178 1
            self.previous_status_code = None
179
180 1
        if output_type == self.API_HTML_TYPE:
181 1
            return self.parse_results_html_to_json(link.text)
182
183
        return link.text
184
185 1
    @staticmethod
186 1
    def parse_results_html_to_json(html: str) -> str:
187
        """Parse the results and sort them descending by similarity
188
189
        :type html: str
190
        :return:
191
        """
192 1
        soup = Soup(html, 'html.parser')
193
        # basic format of json API response
194 1
        results = {'header': {}, 'results': []}
195
196 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
197
            # optional field in SauceNao
198 1
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
199 1
            if title_tag:
200 1
                title = title_tag.text
201
            else:
202
                title = ''
203
204
            # mandatory field in SauceNao
205 1
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
206 1
            alternate_links = [a_tag['href'] for a_tag in
207
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
208 1
            content_column = []
209 1
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
210 1
            for content_column_tag in content_column_tags:
211 1
                for br in content_column_tag.find_all('br'):
212 1
                    br.replace_with('\n')
213 1
                content_column.append(content_column_tag.text)
214
215 1
            result = {
216
                'header': {
217
                    'similarity': similarity
218
                },
219
                'data': {
220
                    'title': title,
221
                    'content': content_column,
222
                    'ext_urls': alternate_links
223
                }
224
            }
225 1
            results['results'].append(result)
226
227 1
        return json.dumps(results)
228
229 1
    @staticmethod
230 1
    def parse_results_json(text: str) -> list:
231
        """Parse the results and sort them descending by similarity
232
233
        :type text: str
234
        :return:
235
        """
236 1
        result = json.loads(text)
237 1
        results = [res for res in result['results']]
238 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
239
240 1
    def __filter_results(self, sorted_results) -> list:
241
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
242
243
        :type sorted_results: list|tuple|Generator
244
        :return:
245
        """
246 1
        filtered_results = []
247 1
        for res in sorted_results:
248 1
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
249 1
                filtered_results.append(res)
250
            else:
251
                # we can break here since the results are sorted by similarity anyways
252
                break
253 1
        return filtered_results
254
255 1
    @staticmethod
256 1
    def get_content_value(results, key: str):
257
        """Return the first match of Material in content
258
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
259
260
        :type results: list|tuple|Generator
261
        :type key: str
262
        :return:
263
        """
264 1
        for result in results:
265 1
            if 'content' in list(result['data'].keys()):
266 1
                for content in result['data']['content']:
267 1
                    if re.search(r'{0:s}: .*'.format(key), content):
268
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
269 1
        return ''
270
271 1
    @staticmethod
272 1
    def get_title_value(results, key: str):
273
        """Return the first match of Material in the title section
274
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
275
276
        :type results: list|tuple|Generator
277
        :type key: str
278
        :return:
279
        """
280
        for result in results:
281
            if 'title' in list(result['data'].keys()):
282
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
283
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
284
                        .split('\n')
285
        return ''
286
287 1
    @staticmethod
288 1
    def merge_dicts(x: dict, y: dict) -> dict:
289
        """Take x dictionary and insert/overwrite y dictionary values
290
291
        :type x: dict
292
        :type y: dict
293
        :return:
294
        """
295
        z = x.copy()
296
        z.update(y)
297
        return z
298
299 1
    def __merge_results(self, result: list, additional_result: list) -> list:
300
        """Merge two result arrays
301
302
        :type result: list
303
        :type additional_result: list
304
        :return:
305
        """
306
        if len(result) <= len(additional_result):
307
            length = len(result)
308
        else:
309
            length = len(additional_result)
310
311
        for i in range(length):
312
            for key in list(result[i].keys()):
313
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
314
315
        return result
316