Test Failed
Push — master ( 1967f0...88899d )
by Steffen
04:26
created

saucenao.saucenao.SauceNao.__init__()   B

Complexity

Conditions 5

Size

Total Lines 53
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 5.1777

Importance

Changes 0
Metric Value
eloc 29
dl 0
loc 53
ccs 21
cts 26
cp 0.8077
rs 8.7173
c 0
b 0
f 0
cc 5
nop 14
crap 5.1777

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import enum
4 1
import json
5 1
import logging
6 1
import os
7 1
import re
8 1
import time
9 1
from typing import Generator, BinaryIO
10 1
11
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14 1
15
from saucenao import http
16 1
from saucenao.exceptions import *
17 1
18
19
class SauceNaoDatabase(enum.Enum):
20 1
    """
21
    database index supported by SauceNao
22
    """
23
24
    HMagazines = 0
25 1
    HGameCG = 2
26 1
    DoujinshiDB = 3
27 1
    PixivImages = 5
28 1
    NicoNicoSeiga = 8
29 1
    Danbooru = 9
30 1
    DrawrImages = 10
31 1
    NijieImages = 11
32 1
    YandeRe = 12
33 1
    Shutterstock = 15
34 1
    FAKKU = 16
35 1
    HMisc = 18
36 1
    TwoDMarket = 19
37 1
    MediBang = 20
38 1
    Anime = 21
39 1
    HAnime = 22
40 1
    Movies = 23
41 1
    Shows = 24
42 1
    Gelbooru = 25
43 1
    Konachan = 26
44 1
    SankakuChannel = 27
45 1
    AnimePicturesNet = 28
46 1
    E621Net = 29
47 1
    IdolComplex = 30
48 1
    BcyNetIllust = 31
49 1
    BcyNetCosplay = 32
50 1
    PortalGraphicsNet = 33
51 1
    DeviantArt = 34
52 1
    PawooNet = 35
53 1
    MadokamiManga = 36
54 1
    MangaDex = 37
55 1
    All = 999
56 1
57
    @classmethod
58 1
    def is_uncompleted(cls, databases):
59
        """Check if the database is uncompleted and the index should not be used
60
61
        :type databases: int
62
        :return:
63
        """
64
        return databases in [cls.HMagazines, cls.HGameCG, cls.DoujinshiDB, cls.Shutterstock, cls.Movies, cls.Shows,
65 1
                             cls.SankakuChannel, cls.IdolComplex, cls.BcyNetIllust, cls.BcyNetCosplay, cls.DeviantArt,
66
                             cls.PawooNet, cls.MangaDex]
67
68
69
class SauceNao(object):
70 1
    """"
71
    small script to work with SauceNao locally
72
    """
73
74
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
75 1
76
    # all available account types, unregistered (always if no API key is passed), basic or premium
77
    ACCOUNT_TYPE_UNREGISTERED = ""
78 1
    ACCOUNT_TYPE_BASIC = "basic"
79 1
    ACCOUNT_TYPE_PREMIUM = "premium"
80 1
81
    # individual search usage limitations
82
    LIMIT_30_SECONDS = {
83 1
        ACCOUNT_TYPE_UNREGISTERED: 4,
84
        ACCOUNT_TYPE_BASIC: 6,
85
        ACCOUNT_TYPE_PREMIUM: 15,
86
    }
87
88
    # 0=html, 2=json but json is omitting important data but includes more data about authors
89
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
90
    API_HTML_TYPE = 0
91 1
    API_JSON_TYPE = 2
92 1
93
    CONTENT_CATEGORY_KEY = 'Material'
94 1
    CONTENT_AUTHOR_KEY = 'Creator'
95 1
    CONTENT_CHARACTERS_KEY = 'Characters'
96 1
97
    logger = None
98 1
99 1
    def __init__(self, directory='', databases=SauceNaoDatabase.All, minimum_similarity=65, combine_api_types=False,
100
                 api_key=None, is_premium=False, exclude_categories='', move_to_categories=False,
101 1
                 use_author_as_category=False, output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR,
102
                 title_minimum_similarity=90):
103
        """Initializing function
104
105
        :type directory: str
106
        :type databases: SauceNaoDatabase|int
107
        :type minimum_similarity: float
108
        :type combine_api_types: bool
109
        :type api_key: str
110
        :type is_premium: bool
111
        :type exclude_categories: str
112
        :type move_to_categories: bool
113
        :type use_author_as_category: bool
114
        :type output_type: int
115
        :type start_file: str
116
        :type log_level: int
117
        :type title_minimum_similarity: float
118
        """
119
        self.directory = directory
120
        self.databases = databases
121 1
        self.minimum_similarity = minimum_similarity
122 1
        self.combine_api_types = combine_api_types
123 1
        self.api_key = api_key
124 1
        self.is_premium = is_premium
125 1
        self.exclude_categories = exclude_categories
126 1
        self.move_to_categories = move_to_categories
127 1
        self.use_author_as_category = use_author_as_category
128 1
        self.output_type = output_type
129 1
        self.start_file = start_file
130 1
        self.title_minimum_similarity = title_minimum_similarity
131 1
132 1
        if self.api_key:
133
            if self.is_premium:
134 1
                account_type = self.ACCOUNT_TYPE_PREMIUM
135
            else:
136
                account_type = self.ACCOUNT_TYPE_BASIC
137
            self.search_limit_30s = self.LIMIT_30_SECONDS[account_type]
138
        else:
139
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
140
141 1
        if self.combine_api_types:
142
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
143 1
            self.search_limit_30s /= 2
144
145
        self.previous_status_code = None
146
147 1
        logging.basicConfig(level=log_level)
148
        self.logger = logging.getLogger("saucenao_logger")
149 1
150 1
        if SauceNaoDatabase.is_uncompleted(self.databases):
151 1
            self.logger.warning("Database #{db} is uncompleted and should not be used.".format(db=self.databases))
152
153 1
    def check_file(self, file_name: str) -> list:
154
        """Check the given file for results on SauceNAO
155
156 1
        :type file_name: str
157
        :return:
158
        """
159
        self.logger.info("checking file: {0:s}".format(file_name))
160
        file_path = os.path.join(self.directory, file_name)
161
        with open(file_path, 'rb') as file_object:
162 1
            return self.check_file_object(file_object)
163 1
164
    def check_file_object(self, file_content: BinaryIO) -> list:
165
        """Check the passed file content for results on SauceNAO
166
167
        :type file_content: bytes
168
        :return:
169
        """
170
        if self.combine_api_types:
171 1
            result = self.__check_image(file_content, self.API_HTML_TYPE)
172 1
            sorted_results = self.parse_results_json(result)
173
174 1
            file_content.seek(0)
175 1
            additional_result = self.__check_image(file_content, self.API_JSON_TYPE)
176
            additional_sorted_results = self.parse_results_json(additional_result)
177 1
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
178
        else:
179
            result = self.__check_image(file_content, self.output_type)
180
            sorted_results = self.parse_results_json(result)
181
182
        filtered_results = self.__filter_results(sorted_results)
183
        return filtered_results
184 1
185 1
    def __get_http_data(self, file_object: BinaryIO, output_type: int):
186
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
187 1
188
        :param file_object:
189
        :param output_type:
190
        :return:
191
        """
192
        files = {'file': file_object.read()}
193
194
        headers = {
195
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
196
                          'Chrome/63.0.3239.84 Safari/537.36',
197 1
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
198
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
199
            'Accept-Encoding': 'gzip, deflate, br',
200
            'DNT': '1',
201
            'Connection': 'keep-alive'
202
        }
203
204
        params = {
205
            'file': file_object,
206
            # parameters taken from form on main page: https://saucenao.com/
207
            'url': None,
208
            'frame': 1,
209 1
            'hide': 0,
210
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
211
            'output_type': output_type,
212 1
            'db': self.databases,
213
        }
214 1
215
        if self.api_key:
216
            params['api_key'] = self.api_key
217
218
        return files, params, headers
219
220
    def __check_image(self, file_object: BinaryIO, output_type: int) -> str:
221 1
        """Check the possible sources for the given file object
222
223 1
        :type output_type: int
224 1
        :type file_object: typing.BinaryIO
225
        :return:
226 1
        """
227
        files, params, headers = self.__get_http_data(file_object=file_object, output_type=output_type)
228 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
229
230
        code, msg = http.verify_status_code(link)
231 1
232
        if code == http.STATUS_CODE_SKIP:
233
            self.logger.error(msg)
234
            return json.dumps({'results': []})
235
        elif code == http.STATUS_CODE_REPEAT:
236
            if not self.previous_status_code:
237
                self.previous_status_code = code
238
                self.logger.info(
239
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
240
                )
241
                time.sleep(10)
242 1
                return self.__check_image(file_object, output_type)
243
            else:
244 1
                raise UnknownStatusCodeException(msg)
245 1
        else:
246
            self.previous_status_code = None
247
248
        if output_type == self.API_HTML_TYPE:
249 1
            return self.parse_results_html_to_json(link.text)
250 1
251
        return link.text
252
253
    @staticmethod
254
    def parse_results_html_to_json(html: str) -> str:
255
        """Parse the results and sort them descending by similarity
256 1
257
        :type html: str
258 1
        :return:
259
        """
260 1
        soup = Soup(html, 'html.parser')
261
        # basic format of json API response
262 1
        results = {'header': {}, 'results': []}
263 1
264 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
265
            # optional field in SauceNao
266
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
267
            if title_tag:
268
                title = title_tag.text
269 1
            else:
270 1
                title = ''
271
272 1
            # mandatory field in SauceNao
273 1
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
274 1
            alternate_links = [a_tag['href'] for a_tag in
275 1
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
276 1
            content_column = []
277 1
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
278
            for content_column_tag in content_column_tags:
279 1
                for br in content_column_tag.find_all('br'):
280
                    br.replace_with('\n')
281
                content_column.append(content_column_tag.text)
282
283
            result = {
284
                'header': {
285
                    'similarity': similarity
286
                },
287
                'data': {
288
                    'title': title,
289 1
                    'content': content_column,
290
                    'ext_urls': alternate_links
291 1
                }
292
            }
293 1
            results['results'].append(result)
294 1
295
        return json.dumps(results)
296
297
    @staticmethod
298
    def parse_results_json(text: str) -> list:
299
        """Parse the results and sort them descending by similarity
300 1
301 1
        :type text: str
302 1
        :return:
303
        """
304 1
        result = json.loads(text)
305
        results = [res for res in result['results']]
306
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
307
308
    def __filter_results(self, sorted_results) -> list:
309
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
310 1
311 1
        :type sorted_results: list|tuple|Generator
312 1
        :return:
313 1
        """
314
        filtered_results = []
315
        for res in sorted_results:
316
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
317 1
                filtered_results.append(res)
318
            else:
319 1
                # we can break here since the results are sorted by similarity anyways
320 1
                break
321
        return filtered_results
322
323
    @staticmethod
324
    def get_content_value(results, key: str):
325
        """Return the first match of Material in content
326
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
327
328
        :type results: list|tuple|Generator
329
        :type key: str
330
        :return:
331
        """
332
        for result in results:
333
            if 'content' in list(result['data'].keys()):
334
                for content in result['data']['content']:
335 1
                    if re.search(r'{0:s}: .*'.format(key), content):
336 1
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
337
        return ''
338
339
    @staticmethod
340
    def get_title_value(results, key: str):
341
        """Return the first match of Material in the title section
342
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
343
344
        :type results: list|tuple|Generator
345
        :type key: str
346
        :return:
347
        """
348
        for result in results:
349
            if 'title' in list(result['data'].keys()):
350
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
351 1
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
352 1
                        .split('\n')
353
        return ''
354
355
    @staticmethod
356
    def merge_dicts(x: dict, y: dict) -> dict:
357
        """Take x dictionary and insert/overwrite y dictionary values
358
359
        :type x: dict
360
        :type y: dict
361
        :return:
362
        """
363 1
        z = x.copy()
364
        z.update(y)
365
        return z
366
367
    def __merge_results(self, result: list, additional_result: list) -> list:
368
        """Merge two result arrays
369
370
        :type result: list
371
        :type additional_result: list
372
        :return:
373
        """
374
        if len(result) <= len(additional_result):
375
            length = len(result)
376
        else:
377
            length = len(additional_result)
378
379
        for i in range(length):
380
            for key in list(result[i].keys()):
381
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
382
383
        return result
384