Test Failed
Push — master ( 16370d...2bdc04 )
by Steffen
02:30
created

saucenao.saucenao.SauceNao.__init__()   A

Complexity

Conditions 1

Size

Total Lines 35
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 35
rs 9.5
c 0
b 0
f 0
cc 1
nop 13

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
import json
4
import logging
5
import os
6
import re
7
import time
8
from mimetypes import MimeTypes
9
from typing import Generator
10
11
import requests
12
from bs4 import BeautifulSoup as Soup
13
from bs4 import element
14
15
from saucenao import http
16
from saucenao.exceptions import *
17
18
19
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26
    # basic account allows currently 20 images within 30 seconds
27
    # you can increase this value is you have a premium account
28
    LIMIT_30_SECONDS = 20
29
30
    # 0=html, 2=json but json is omitting important data but includes more data about authors
31
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
32
    API_HTML_TYPE = 0
33
    API_JSON_TYPE = 2
34
35
    CONTENT_CATEGORY_KEY = 'Material'
36
    CONTENT_AUTHOR_KEY = 'Creator'
37
    CONTENT_CHARACTERS_KEY = 'Characters'
38
39
    mime = None
40
    logger = None
41
42
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
43
                 exclude_categories='', move_to_categories=False, use_author_as_category=False,
44
                 output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR, title_minimum_similarity=90):
45
        """Initializing function
46
47
        :type directory: str
48
        :type databases: int
49
        :type minimum_similarity: float
50
        :type combine_api_types: bool
51
        :type api_key: str
52
        :type exclude_categories: str
53
        :type move_to_categories: bool
54
        :type use_author_as_category: bool
55
        :type output_type: int
56
        :type start_file: str
57
        :type log_level: int
58
        :type title_minimum_similarity: float
59
        """
60
        self._directory = directory
61
        self._databases = databases
62
        self._minimum_similarity = minimum_similarity
63
        self._combine_api_types = combine_api_types
64
        self._api_key = api_key
65
        self._exclude_categories = exclude_categories
66
        self._move_to_categories = move_to_categories
67
        self._use_author_as_category = use_author_as_category
68
        self._output_type = output_type
69
        self._start_file = start_file
70
        self._title_minimum_similarity = title_minimum_similarity
71
72
        self._previous_status_code = None
73
74
        self.mime = MimeTypes()
75
        logging.basicConfig(level=log_level)
76
        self.logger = logging.getLogger("saucenao_logger")
77
78
    def check_file(self, file_name: str) -> list:
79
        """Check the given file for results on SauceNAO
80
81
        :type file_name: str
82
        :return:
83
        """
84
        self.logger.info("checking file: {0:s}".format(file_name))
85
        if self._combine_api_types:
86
            result = self.check_image(file_name, self.API_HTML_TYPE)
87
            sorted_results = self.parse_results_json(result)
88
89
            additional_result = self.check_image(file_name, self.API_JSON_TYPE)
90
            additional_sorted_results = self.parse_results_json(additional_result)
91
            sorted_results = self.merge_results(sorted_results, additional_sorted_results)
92
        else:
93
            result = self.check_image(file_name, self._output_type)
94
            sorted_results = self.parse_results_json(result)
95
96
        filtered_results = self.filter_results(sorted_results)
97
        return filtered_results
98
99
    def get_http_data(self, file_path: str, output_type: int):
100
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
101
102
        :param file_path:
103
        :param output_type:
104
        :return:
105
        """
106
        with open(file_path, 'rb') as file_object:
107
            files = {'file': file_object.read()}
108
109
        headers = {
110
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
111
                          'Chrome/63.0.3239.84 Safari/537.36',
112
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
113
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
114
            'Accept-Encoding': 'gzip, deflate, br',
115
            'DNT': '1',
116
            'Connection': 'keep-alive'
117
        }
118
119
        params = {
120
            'file': file_path,
121
            'Content-Type': self.mime.guess_type(file_path),
122
            # parameters taken from form on main page: https://saucenao.com/
123
            'url': None,
124
            'frame': 1,
125
            'hide': 0,
126
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
127
            'output_type': output_type,
128
            'db': self._databases,
129
        }
130
131
        if self._api_key:
132
            params['api_key'] = self._api_key
133
134
        return files, params, headers
135
136
    def check_image(self, file_name: str, output_type: int) -> str:
137
        """Check the possible sources for the given file
138
139
        :type output_type: int
140
        :type file_name: str
141
        :return:
142
        """
143
        file_path = os.path.join(self._directory, file_name)
144
145
        files, params, headers = self.get_http_data(file_path=file_path, output_type=output_type)
146
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
147
148
        code, msg = http.verify_status_code(link, file_name)
149
150
        if code == http.STATUS_CODE_SKIP:
151
            self.logger.error(msg)
152
            return json.dumps({'results': []})
153
        elif code == http.STATUS_CODE_REPEAT:
154
            if not self._previous_status_code:
155
                self._previous_status_code = code
156
                self.logger.info("Received an unexpected status code, repeating after 10 seconds...")
157
                time.sleep(10)
158
                return self.check_image(file_name, output_type)
159
            else:
160
                raise UnknownStatusCodeException(msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable UnknownStatusCodeException does not seem to be defined.
Loading history...
161
        else:
162
            self._previous_status_code = None
163
164
        if output_type == self.API_HTML_TYPE:
165
            return self.parse_results_html_to_json(link.text)
166
167
        return link.text
168
169
    @staticmethod
170
    def parse_results_html_to_json(html: str) -> str:
171
        """Parse the results and sort them descending by similarity
172
173
        :type html: str
174
        :return:
175
        """
176
        soup = Soup(html, 'html.parser')
177
        # basic format of json API response
178
        results = {'header': {}, 'results': []}
179
180
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
181
            # optional field in SauceNao
182
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
183
            if title_tag:
184
                title = title_tag.text
185
            else:
186
                title = ''
187
188
            # mandatory field in SauceNao
189
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
190
            alternate_links = [a_tag['href'] for a_tag in
191
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
192
            content_column = []
193
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
194
            for content_column_tag in content_column_tags:
195
                for br in content_column_tag.find_all('br'):
196
                    br.replace_with('\n')
197
                content_column.append(content_column_tag.text)
198
199
            result = {
200
                'header': {
201
                    'similarity': similarity
202
                },
203
                'data': {
204
                    'title': title,
205
                    'content': content_column,
206
                    'ext_urls': alternate_links
207
                }
208
            }
209
            results['results'].append(result)
210
211
        return json.dumps(results)
212
213
    @staticmethod
214
    def parse_results_json(text: str) -> list:
215
        """Parse the results and sort them descending by similarity
216
217
        :type text: str
218
        :return:
219
        """
220
        result = json.loads(text)
221
        results = [res for res in result['results']]
222
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
223
224
    def filter_results(self, sorted_results) -> list:
225
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
226
227
        :type sorted_results: list|tuple|Generator
228
        :return:
229
        """
230
        filtered_results = []
231
        for res in sorted_results:
232
            if float(res['header']['similarity']) >= float(self._minimum_similarity):
233
                filtered_results.append(res)
234
            else:
235
                # we can break here since the results are sorted by similarity anyways
236
                break
237
        return filtered_results
238
239
    @staticmethod
240
    def get_content_value(results, key: str):
241
        """Return the first match of Material in content
242
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
243
244
        :type results: list|tuple|Generator
245
        :type key: str
246
        :return:
247
        """
248
        for result in results:
249
            if 'content' in list(result['data'].keys()):
250
                for content in result['data']['content']:
251
                    if re.match('{0:s}: .*'.format(key), content):
252
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
253
        return ''
254
255
    @staticmethod
256
    def get_title_value(results, key: str):
257
        """Return the first match of Material in the title section
258
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
259
260
        :type results: list|tuple|Generator
261
        :type key: str
262
        :return:
263
        """
264
        for result in results:
265
            if 'title' in list(result['data'].keys()):
266
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
267
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
268
                        .split('\n')
269
        return ''
270
271
    @staticmethod
272
    def merge_two_dicts(x: dict, y: dict) -> dict:
273
        """Take x dictionary and insert/overwrite y dictionary values
274
275
        :type x: dict
276
        :type y: dict
277
        :return:
278
        """
279
        z = x.copy()
280
        z.update(y)
281
        return z
282
283
    def merge_results(self, result: list, additional_result: list) -> list:
284
        """Merge two result arrays
285
286
        :type result: list
287
        :type additional_result: list
288
        :return:
289
        """
290
        if len(result) <= len(additional_result):
291
            length = len(result)
292
        else:
293
            length = len(additional_result)
294
295
        for i in range(length):
296
            for key in list(result[i].keys()):
297
                result[i][key] = self.merge_two_dicts(result[i][key], additional_result[i][key])
298
299
        return result
300