Passed
Push — master ( 766b61...1967f0 )
by Steffen
16:49
created

saucenao.saucenao   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 380
Duplicated Lines 0 %

Test Coverage

Coverage 76.47%

Importance

Changes 0
Metric Value
wmc 40
eloc 220
dl 0
loc 380
ccs 143
cts 187
cp 0.7647
rs 9.2
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A SauceNao.__merge_results() 0 17 4
A SauceNao.merge_dicts() 0 11 1
A SauceNao.__filter_results() 0 14 3
B SauceNao.__init__() 0 54 5
A SauceNao.get_title_value() 0 15 4
B SauceNao.parse_results_html_to_json() 0 43 5
A SauceNaoDatabase.is_uncompleted() 0 10 1
B SauceNao.__check_image() 0 34 5
A SauceNao.check_file() 0 20 2
A SauceNao.__get_http_data() 0 36 3
A SauceNao.parse_results_json() 0 10 2
A SauceNao.get_content_value() 0 15 5

How to fix   Complexity   

Complexity

Complex classes like saucenao.saucenao often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import enum
4 1
import json
5 1
import logging
6 1
import os
7 1
import re
8 1
import time
9 1
from mimetypes import MimeTypes
10 1
from typing import Generator
11
12 1
import requests
13 1
from bs4 import BeautifulSoup as Soup
14 1
from bs4 import element
15
16 1
from saucenao import http
17 1
from saucenao.exceptions import *
18
19
20 1
class SauceNaoDatabase(enum.Enum):
21
    """
22
    database index supported by SauceNao
23
    """
24
25 1
    HMagazines = 0
26 1
    HGameCG = 2
27 1
    DoujinshiDB = 3
28 1
    PixivImages = 5
29 1
    NicoNicoSeiga = 8
30 1
    Danbooru = 9
31 1
    DrawrImages = 10
32 1
    NijieImages = 11
33 1
    YandeRe = 12
34 1
    Shutterstock = 15
35 1
    FAKKU = 16
36 1
    HMisc = 18
37 1
    TwoDMarket = 19
38 1
    MediBang = 20
39 1
    Anime = 21
40 1
    HAnime = 22
41 1
    Movies = 23
42 1
    Shows = 24
43 1
    Gelbooru = 25
44 1
    Konachan = 26
45 1
    SankakuChannel = 27
46 1
    AnimePicturesNet = 28
47 1
    E621Net = 29
48 1
    IdolComplex = 30
49 1
    BcyNetIllust = 31
50 1
    BcyNetCosplay = 32
51 1
    PortalGraphicsNet = 33
52 1
    DeviantArt = 34
53 1
    PawooNet = 35
54 1
    MadokamiManga = 36
55 1
    MangaDex = 37
56 1
    All = 999
57
58 1
    @classmethod
59
    def is_uncompleted(cls, databases):
60
        """Check if the database is uncompleted and the index should not be used
61
62
        :type databases: int
63
        :return:
64
        """
65 1
        return databases in [cls.HMagazines, cls.HGameCG, cls.DoujinshiDB, cls.Shutterstock, cls.Movies, cls.Shows,
66
                             cls.SankakuChannel, cls.IdolComplex, cls.BcyNetIllust, cls.BcyNetCosplay, cls.DeviantArt,
67
                             cls.PawooNet, cls.MangaDex]
68
69
70 1
class SauceNao(object):
71
    """"
72
    small script to work with SauceNao locally
73
    """
74
75 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
76
77
    # all available account types, unregistered (always if no API key is passed), basic or premium
78 1
    ACCOUNT_TYPE_UNREGISTERED = ""
79 1
    ACCOUNT_TYPE_BASIC = "basic"
80 1
    ACCOUNT_TYPE_PREMIUM = "premium"
81
82
    # individual search usage limitations
83 1
    LIMIT_30_SECONDS = {
84
        ACCOUNT_TYPE_UNREGISTERED: 4,
85
        ACCOUNT_TYPE_BASIC: 6,
86
        ACCOUNT_TYPE_PREMIUM: 15,
87
    }
88
89
    # 0=html, 2=json but json is omitting important data but includes more data about authors
90
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
91 1
    API_HTML_TYPE = 0
92 1
    API_JSON_TYPE = 2
93
94 1
    CONTENT_CATEGORY_KEY = 'Material'
95 1
    CONTENT_AUTHOR_KEY = 'Creator'
96 1
    CONTENT_CHARACTERS_KEY = 'Characters'
97
98 1
    mime = None
99 1
    logger = None
100
101 1
    def __init__(self, directory, databases=SauceNaoDatabase.All, minimum_similarity=65, combine_api_types=False,
102
                 api_key=None, is_premium=False, exclude_categories='', move_to_categories=False,
103
                 use_author_as_category=False, output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR,
104
                 title_minimum_similarity=90):
105
        """Initializing function
106
107
        :type directory: str
108
        :type databases: SauceNaoDatabase|int
109
        :type minimum_similarity: float
110
        :type combine_api_types: bool
111
        :type api_key: str
112
        :type is_premium: bool
113
        :type exclude_categories: str
114
        :type move_to_categories: bool
115
        :type use_author_as_category: bool
116
        :type output_type: int
117
        :type start_file: str
118
        :type log_level: int
119
        :type title_minimum_similarity: float
120
        """
121 1
        self.directory = directory
122 1
        self.databases = databases
123 1
        self.minimum_similarity = minimum_similarity
124 1
        self.combine_api_types = combine_api_types
125 1
        self.api_key = api_key
126 1
        self.is_premium = is_premium
127 1
        self.exclude_categories = exclude_categories
128 1
        self.move_to_categories = move_to_categories
129 1
        self.use_author_as_category = use_author_as_category
130 1
        self.output_type = output_type
131 1
        self.start_file = start_file
132 1
        self.title_minimum_similarity = title_minimum_similarity
133
134 1
        if self.api_key:
135
            if self.is_premium:
136
                account_type = self.ACCOUNT_TYPE_PREMIUM
137
            else:
138
                account_type = self.ACCOUNT_TYPE_BASIC
139
            self.search_limit_30s = self.LIMIT_30_SECONDS[account_type]
140
        else:
141 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
142
143 1
        if self.combine_api_types:
144
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
145
            self.search_limit_30s /= 2
146
147 1
        self.previous_status_code = None
148
149 1
        self.mime = MimeTypes()
150 1
        logging.basicConfig(level=log_level)
151 1
        self.logger = logging.getLogger("saucenao_logger")
152
153 1
        if SauceNaoDatabase.is_uncompleted(self.databases):
154
            self.logger.warning("Database #{db} is uncompleted and should not be used.".format(db=self.databases))
155
156 1
    def check_file(self, file_name: str) -> list:
157
        """Check the given file for results on SauceNAO
158
159
        :type file_name: str
160
        :return:
161
        """
162 1
        self.logger.info("checking file: {0:s}".format(file_name))
163 1
        if self.combine_api_types:
164
            result = self.__check_image(file_name, self.API_HTML_TYPE)
165
            sorted_results = self.parse_results_json(result)
166
167
            additional_result = self.__check_image(file_name, self.API_JSON_TYPE)
168
            additional_sorted_results = self.parse_results_json(additional_result)
169
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
170
        else:
171 1
            result = self.__check_image(file_name, self.output_type)
172 1
            sorted_results = self.parse_results_json(result)
173
174 1
        filtered_results = self.__filter_results(sorted_results)
175 1
        return filtered_results
176
177 1
    def __get_http_data(self, file_path: str, output_type: int):
178
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
179
180
        :param file_path:
181
        :param output_type:
182
        :return:
183
        """
184 1
        with open(file_path, 'rb') as file_object:
185 1
            files = {'file': file_object.read()}
186
187 1
        headers = {
188
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
189
                          'Chrome/63.0.3239.84 Safari/537.36',
190
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
191
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
192
            'Accept-Encoding': 'gzip, deflate, br',
193
            'DNT': '1',
194
            'Connection': 'keep-alive'
195
        }
196
197 1
        params = {
198
            'file': file_path,
199
            'Content-Type': self.mime.guess_type(file_path),
200
            # parameters taken from form on main page: https://saucenao.com/
201
            'url': None,
202
            'frame': 1,
203
            'hide': 0,
204
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
205
            'output_type': output_type,
206
            'db': self.databases,
207
        }
208
209 1
        if self.api_key:
210
            params['api_key'] = self.api_key
211
212 1
        return files, params, headers
213
214 1
    def __check_image(self, file_name: str, output_type: int) -> str:
215
        """Check the possible sources for the given file
216
217
        :type output_type: int
218
        :type file_name: str
219
        :return:
220
        """
221 1
        file_path = os.path.join(self.directory, file_name)
222
223 1
        files, params, headers = self.__get_http_data(file_path=file_path, output_type=output_type)
224 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
225
226 1
        code, msg = http.verify_status_code(link, file_name)
227
228 1
        if code == http.STATUS_CODE_SKIP:
229
            self.logger.error(msg)
230
            return json.dumps({'results': []})
231 1
        elif code == http.STATUS_CODE_REPEAT:
232
            if not self.previous_status_code:
233
                self.previous_status_code = code
234
                self.logger.info(
235
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
236
                )
237
                time.sleep(10)
238
                return self.__check_image(file_name, output_type)
239
            else:
240
                raise UnknownStatusCodeException(msg)
241
        else:
242 1
            self.previous_status_code = None
243
244 1
        if output_type == self.API_HTML_TYPE:
245 1
            return self.parse_results_html_to_json(link.text)
246
247
        return link.text
248
249 1
    @staticmethod
250 1
    def parse_results_html_to_json(html: str) -> str:
251
        """Parse the results and sort them descending by similarity
252
253
        :type html: str
254
        :return:
255
        """
256 1
        soup = Soup(html, 'html.parser')
257
        # basic format of json API response
258 1
        results = {'header': {}, 'results': []}
259
260 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
261
            # optional field in SauceNao
262 1
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
263 1
            if title_tag:
264 1
                title = title_tag.text
265
            else:
266
                title = ''
267
268
            # mandatory field in SauceNao
269 1
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
270 1
            alternate_links = [a_tag['href'] for a_tag in
271
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
272 1
            content_column = []
273 1
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
274 1
            for content_column_tag in content_column_tags:
275 1
                for br in content_column_tag.find_all('br'):
276 1
                    br.replace_with('\n')
277 1
                content_column.append(content_column_tag.text)
278
279 1
            result = {
280
                'header': {
281
                    'similarity': similarity
282
                },
283
                'data': {
284
                    'title': title,
285
                    'content': content_column,
286
                    'ext_urls': alternate_links
287
                }
288
            }
289 1
            results['results'].append(result)
290
291 1
        return json.dumps(results)
292
293 1
    @staticmethod
294 1
    def parse_results_json(text: str) -> list:
295
        """Parse the results and sort them descending by similarity
296
297
        :type text: str
298
        :return:
299
        """
300 1
        result = json.loads(text)
301 1
        results = [res for res in result['results']]
302 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
303
304 1
    def __filter_results(self, sorted_results) -> list:
305
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
306
307
        :type sorted_results: list|tuple|Generator
308
        :return:
309
        """
310 1
        filtered_results = []
311 1
        for res in sorted_results:
312 1
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
313 1
                filtered_results.append(res)
314
            else:
315
                # we can break here since the results are sorted by similarity anyways
316
                break
317 1
        return filtered_results
318
319 1
    @staticmethod
320 1
    def get_content_value(results, key: str):
321
        """Return the first match of Material in content
322
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
323
324
        :type results: list|tuple|Generator
325
        :type key: str
326
        :return:
327
        """
328
        for result in results:
329
            if 'content' in list(result['data'].keys()):
330
                for content in result['data']['content']:
331
                    if re.search(r'{0:s}: .*'.format(key), content):
332
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
333
        return ''
334
335 1
    @staticmethod
336 1
    def get_title_value(results, key: str):
337
        """Return the first match of Material in the title section
338
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
339
340
        :type results: list|tuple|Generator
341
        :type key: str
342
        :return:
343
        """
344
        for result in results:
345
            if 'title' in list(result['data'].keys()):
346
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
347
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
348
                        .split('\n')
349
        return ''
350
351 1
    @staticmethod
352 1
    def merge_dicts(x: dict, y: dict) -> dict:
353
        """Take x dictionary and insert/overwrite y dictionary values
354
355
        :type x: dict
356
        :type y: dict
357
        :return:
358
        """
359
        z = x.copy()
360
        z.update(y)
361
        return z
362
363 1
    def __merge_results(self, result: list, additional_result: list) -> list:
364
        """Merge two result arrays
365
366
        :type result: list
367
        :type additional_result: list
368
        :return:
369
        """
370
        if len(result) <= len(additional_result):
371
            length = len(result)
372
        else:
373
            length = len(additional_result)
374
375
        for i in range(length):
376
            for key in list(result[i].keys()):
377
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
378
379
        return result
380