saucenao.saucenao.SauceNao.__init__()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 53
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 5.3073

Importance

Changes 0
Metric Value
eloc 29
dl 0
loc 53
ccs 20
cts 26
cp 0.7692
rs 8.7173
c 0
b 0
f 0
cc 5
nop 14
crap 5.3073

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import enum
4 1
import json
5 1
import logging
6 1
import os
7 1
import re
8 1
import time
9 1
from typing import Generator, BinaryIO, Iterable
10
11 1
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14
15 1
from saucenao import http
16 1
from saucenao.exceptions import *
17
18
19 1
class SauceNaoDatabase(enum.Enum):
20
    """
21
    database index supported by SauceNao
22
    """
23
24 1
    HMagazines = 0
25 1
    HGameCG = 2
26 1
    DoujinshiDB = 3
27 1
    PixivImages = 5
28 1
    NicoNicoSeiga = 8
29 1
    Danbooru = 9
30 1
    DrawrImages = 10
31 1
    NijieImages = 11
32 1
    YandeRe = 12
33 1
    Shutterstock = 15
34 1
    FAKKU = 16
35 1
    HMisc = 18
36 1
    TwoDMarket = 19
37 1
    MediBang = 20
38 1
    Anime = 21
39 1
    HAnime = 22
40 1
    Movies = 23
41 1
    Shows = 24
42 1
    Gelbooru = 25
43 1
    Konachan = 26
44 1
    SankakuChannel = 27
45 1
    AnimePicturesNet = 28
46 1
    E621Net = 29
47 1
    IdolComplex = 30
48 1
    BcyNetIllust = 31
49 1
    BcyNetCosplay = 32
50 1
    PortalGraphicsNet = 33
51 1
    DeviantArt = 34
52 1
    PawooNet = 35
53 1
    MadokamiManga = 36
54 1
    MangaDex = 37
55 1
    All = 999
56
57 1
    @classmethod
58
    def is_uncompleted(cls, databases):
59
        """Check if the database is uncompleted and the index should not be used
60
61
        :type databases: int
62
        :return:
63
        """
64 1
        return databases in [cls.HMagazines, cls.HGameCG, cls.DoujinshiDB, cls.Shutterstock, cls.Movies, cls.Shows,
65
                             cls.SankakuChannel, cls.IdolComplex, cls.BcyNetIllust, cls.BcyNetCosplay, cls.DeviantArt,
66
                             cls.PawooNet, cls.MangaDex]
67
68
69 1
class SauceNao(object):
70
    """"
71
    small script to work with SauceNao locally
72
    """
73
74 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
75
76
    # all available account types, unregistered (always if no API key is passed), basic or premium
77 1
    ACCOUNT_TYPE_UNREGISTERED = ""
78 1
    ACCOUNT_TYPE_BASIC = "basic"
79 1
    ACCOUNT_TYPE_PREMIUM = "premium"
80
81
    # individual search usage limitations
82 1
    LIMIT_30_SECONDS = {
83
        ACCOUNT_TYPE_UNREGISTERED: 4,
84
        ACCOUNT_TYPE_BASIC: 6,
85
        ACCOUNT_TYPE_PREMIUM: 15,
86
    }
87
88
    # 0=html, 2=json but json is omitting important data but includes more data about authors
89
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
90 1
    API_HTML_TYPE = 0
91 1
    API_JSON_TYPE = 2
92
93 1
    CONTENT_CATEGORY_KEY = 'Material'
94 1
    CONTENT_AUTHOR_KEY = 'Creator'
95 1
    CONTENT_CHARACTERS_KEY = 'Characters'
96
97 1
    logger = None
98
99 1
    def __init__(self, directory='', databases=SauceNaoDatabase.All, minimum_similarity=65, combine_api_types=False,
100
                 api_key=None, is_premium=False, exclude_categories='', move_to_categories=False,
101
                 use_author_as_category=False, output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR,
102
                 title_minimum_similarity=90):
103
        """Initializing function
104
105
        :type directory: str
106
        :type databases: SauceNaoDatabase|int
107
        :type minimum_similarity: float
108
        :type combine_api_types: bool
109
        :type api_key: str
110
        :type is_premium: bool
111
        :type exclude_categories: str
112
        :type move_to_categories: bool
113
        :type use_author_as_category: bool
114
        :type output_type: int
115
        :type start_file: str
116
        :type log_level: int
117
        :type title_minimum_similarity: float
118
        """
119 1
        self.directory = directory
120 1
        self.databases = databases
121 1
        self.minimum_similarity = minimum_similarity
122 1
        self.combine_api_types = combine_api_types
123 1
        self.api_key = api_key
124 1
        self.is_premium = is_premium
125 1
        self.exclude_categories = exclude_categories
126 1
        self.move_to_categories = move_to_categories
127 1
        self.use_author_as_category = use_author_as_category
128 1
        self.output_type = output_type
129 1
        self.start_file = start_file
130 1
        self.title_minimum_similarity = title_minimum_similarity
131
132 1
        if self.api_key:
133
            if self.is_premium:
134
                account_type = self.ACCOUNT_TYPE_PREMIUM
135
            else:
136
                account_type = self.ACCOUNT_TYPE_BASIC
137
            self.search_limit_30s = self.LIMIT_30_SECONDS[account_type]
138
        else:
139 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
140
141 1
        if self.combine_api_types:
142
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
143
            self.search_limit_30s /= 2
144
145 1
        self.previous_status_code = None
146
147 1
        logging.basicConfig(level=log_level)
148 1
        self.logger = logging.getLogger("saucenao_logger")
149
150 1
        if SauceNaoDatabase.is_uncompleted(self.databases):
151
            self.logger.warning("Database #{db} is uncompleted and should not be used.".format(db=self.databases))
152
153 1
    def check_file(self, file_name: str) -> list:
154
        """Check the given file for results on SauceNAO
155
156
        :type file_name: str
157
        :return:
158
        """
159 1
        self.logger.info("checking file: {0:s}".format(file_name))
160 1
        file_path = os.path.join(self.directory, file_name)
161 1
        with open(file_path, 'rb') as file_object:
162 1
            return self.check_file_object(file_object)
163
164 1
    def check_file_object(self, file_content: BinaryIO) -> list:
165
        """Check the passed file content for results on SauceNAO
166
167
        :type file_content: bytes
168
        :return:
169
        """
170 1
        if self.combine_api_types:
171
            result = self.__check_image(file_content, self.API_HTML_TYPE)
172
            sorted_results = self.parse_results_json(result)
173
174
            file_content.seek(0)
175
            additional_result = self.__check_image(file_content, self.API_JSON_TYPE)
176
            additional_sorted_results = self.parse_results_json(additional_result)
177
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
178
        else:
179 1
            result = self.__check_image(file_content, self.output_type)
180 1
            sorted_results = self.parse_results_json(result)
181
182 1
        filtered_results = self.__filter_results(sorted_results)
183 1
        return filtered_results
184
185 1
    def __get_http_data(self, file_object: BinaryIO, output_type: int):
186
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
187
188
        :param file_object:
189
        :param output_type:
190
        :return:
191
        """
192 1
        files = {'file': file_object.read()}
193
194 1
        headers = {
195
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
196
                          'Chrome/63.0.3239.84 Safari/537.36',
197
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
198
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
199
            'Accept-Encoding': 'gzip, deflate, br',
200
            'DNT': '1',
201
            'Connection': 'keep-alive'
202
        }
203
204 1
        params = {
205
            'file': file_object,
206
            # parameters taken from form on main page: https://saucenao.com/
207
            'url': None,
208
            'frame': 1,
209
            'hide': 0,
210
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
211
            'output_type': output_type,
212
            'db': self.databases,
213
        }
214
215 1
        if self.api_key:
216
            params['api_key'] = self.api_key
217
218 1
        return files, params, headers
219
220 1
    def __check_image(self, file_object: BinaryIO, output_type: int) -> str:
221
        """Check the possible sources for the given file object
222
223
        :type output_type: int
224
        :type file_object: typing.BinaryIO
225
        :return:
226
        """
227 1
        files, params, headers = self.__get_http_data(file_object=file_object, output_type=output_type)
228 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
229
230 1
        code, msg = http.verify_status_code(link)
231
232 1
        if code == http.STATUS_CODE_SKIP:
233
            self.logger.error(msg)
234
            return json.dumps({'results': []})
235 1
        elif code == http.STATUS_CODE_REPEAT:
236
            if not self.previous_status_code:
237
                self.previous_status_code = code
238
                self.logger.info(
239
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
240
                )
241
                time.sleep(10)
242
                return self.__check_image(file_object, output_type)
243
            else:
244
                raise UnknownStatusCodeException(msg)
245
        else:
246 1
            self.previous_status_code = None
247
248 1
        if output_type == self.API_HTML_TYPE:
249 1
            return self.parse_results_html_to_json(link.text)
250
251
        return link.text
252
253 1
    @staticmethod
254 1
    def parse_results_html_to_json(html: str) -> str:
255
        """Parse the results and sort them descending by similarity
256
257
        :type html: str
258
        :return:
259
        """
260 1
        soup = Soup(html, 'html.parser')
261
        # basic format of json API response
262 1
        results = {'header': {}, 'results': []}
263
264 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
265
            # optional field in SauceNao
266
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
267
            if title_tag:
268
                title = title_tag.text
269
            else:
270
                title = ''
271
272
            # mandatory field in SauceNao
273
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
274
            alternate_links = [a_tag['href'] for a_tag in
275
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
276
            content_column = []
277
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
278
            for content_column_tag in content_column_tags:
279
                for br in content_column_tag.find_all('br'):
280
                    br.replace_with('\n')
281
                content_column.append(content_column_tag.text)
282
283
            result = {
284
                'header': {
285
                    'similarity': similarity
286
                },
287
                'data': {
288
                    'title': title,
289
                    'content': content_column,
290
                    'ext_urls': alternate_links
291
                }
292
            }
293
            results['results'].append(result)
294
295 1
        return json.dumps(results)
296
297 1
    @staticmethod
298 1
    def parse_results_json(text: str) -> list:
299
        """Parse the results and sort them descending by similarity
300
301
        :type text: str
302
        :return:
303
        """
304 1
        result = json.loads(text)
305 1
        results = [res for res in result['results']]
306 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
307
308 1
    def __filter_results(self, sorted_results) -> list:
309
        """Return results with a similarity bigger or the same as the defined similarity from the arguments
310
        (default 65%)
311
312
        :type sorted_results: list|tuple|Generator
313
        :return:
314
        """
315 1
        filtered_results = []
316 1
        for res in sorted_results:
317
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
318
                filtered_results.append(res)
319
            else:
320
                # we can break here since the results are sorted by similarity anyways
321
                break
322 1
        return filtered_results
323
324 1
    @staticmethod
325 1
    def get_content_value(results: Iterable, key: str):
326
        """Return the first match of Material in content
327
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
328
329
        :type results: Iterable
330
        :type key: str
331
        :return:
332
        """
333
        for result in results:
334
            if 'content' in list(result['data'].keys()):
335
                for content in result['data']['content']:
336
                    if re.search(r'{0:s}: .*'.format(key), content):
337
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
338
        return ''
339
340 1
    @staticmethod
341 1
    def get_title_value(results: Iterable, key: str):
342
        """Return the first match of Material in the title section
343
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
344
345
        :type results: Iterable
346
        :type key: str
347
        :return:
348
        """
349
        for result in results:
350
            if 'title' in list(result['data'].keys()):
351
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
352
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
353
                        .split('\n')
354
        return ''
355
356 1
    @staticmethod
357 1
    def merge_dicts(x: dict, y: dict) -> dict:
358
        """Take x dictionary and insert/overwrite y dictionary values
359
360
        :type x: dict
361
        :type y: dict
362
        :return:
363
        """
364
        z = x.copy()
365
        z.update(y)
366
        return z
367
368 1
    def __merge_results(self, result: list, additional_result: list) -> list:
369
        """Merge two result arrays
370
371
        :type result: list
372
        :type additional_result: list
373
        :return:
374
        """
375
        if len(result) <= len(additional_result):
376
            length = len(result)
377
        else:
378
            length = len(additional_result)
379
380
        for i in range(length):
381
            for key in list(result[i].keys()):
382
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
383
384
        return result
385