saucenao.saucenao.SauceNao.__check_image()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 32
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 8.125

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 32
ccs 9
cts 18
cp 0.5
rs 8.9833
c 0
b 0
f 0
cc 5
nop 3
crap 8.125
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import enum
4 1
import json
5 1
import logging
6 1
import os
7 1
import re
8 1
import time
9 1
from typing import Generator, BinaryIO, Iterable
10
11 1
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14
15 1
from saucenao import http
16 1
from saucenao.exceptions import *
17
18
19 1
class SauceNaoDatabase(enum.Enum):
20
    """
21
    database index supported by SauceNao
22
    """
23
24 1
    HMagazines = 0
25 1
    HGameCG = 2
26 1
    DoujinshiDB = 3
27 1
    PixivImages = 5
28 1
    NicoNicoSeiga = 8
29 1
    Danbooru = 9
30 1
    DrawrImages = 10
31 1
    NijieImages = 11
32 1
    YandeRe = 12
33 1
    Shutterstock = 15
34 1
    FAKKU = 16
35 1
    HMisc = 18
36 1
    TwoDMarket = 19
37 1
    MediBang = 20
38 1
    Anime = 21
39 1
    HAnime = 22
40 1
    Movies = 23
41 1
    Shows = 24
42 1
    Gelbooru = 25
43 1
    Konachan = 26
44 1
    SankakuChannel = 27
45 1
    AnimePicturesNet = 28
46 1
    E621Net = 29
47 1
    IdolComplex = 30
48 1
    BcyNetIllust = 31
49 1
    BcyNetCosplay = 32
50 1
    PortalGraphicsNet = 33
51 1
    DeviantArt = 34
52 1
    PawooNet = 35
53 1
    MadokamiManga = 36
54 1
    MangaDex = 37
55 1
    All = 999
56
57 1
    @classmethod
58
    def is_uncompleted(cls, databases):
59
        """Check if the database is uncompleted and the index should not be used
60
61
        :type databases: int
62
        :return:
63
        """
64 1
        return databases in [cls.HMagazines, cls.HGameCG, cls.DoujinshiDB, cls.Shutterstock, cls.Movies, cls.Shows,
65
                             cls.SankakuChannel, cls.IdolComplex, cls.BcyNetIllust, cls.BcyNetCosplay, cls.DeviantArt,
66
                             cls.PawooNet, cls.MangaDex]
67
68
69 1
class SauceNao(object):
70
    """"
71
    small script to work with SauceNao locally
72
    """
73
74 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
75
76
    # all available account types, unregistered (always if no API key is passed), basic or premium
77 1
    ACCOUNT_TYPE_UNREGISTERED = ""
78 1
    ACCOUNT_TYPE_BASIC = "basic"
79 1
    ACCOUNT_TYPE_PREMIUM = "premium"
80
81
    # individual search usage limitations
82 1
    LIMIT_30_SECONDS = {
83
        ACCOUNT_TYPE_UNREGISTERED: 4,
84
        ACCOUNT_TYPE_BASIC: 6,
85
        ACCOUNT_TYPE_PREMIUM: 15,
86
    }
87
88
    # 0=html, 2=json but json is omitting important data but includes more data about authors
89
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
90 1
    API_HTML_TYPE = 0
91 1
    API_JSON_TYPE = 2
92
93 1
    CONTENT_CATEGORY_KEY = 'Material'
94 1
    CONTENT_AUTHOR_KEY = 'Creator'
95 1
    CONTENT_CHARACTERS_KEY = 'Characters'
96
97 1
    logger = None
98
99 1
    def __init__(self, directory='', databases=SauceNaoDatabase.All, minimum_similarity=65, combine_api_types=False,
100
                 api_key=None, is_premium=False, exclude_categories='', move_to_categories=False,
101
                 use_author_as_category=False, output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR,
102
                 title_minimum_similarity=90):
103
        """Initializing function
104
105
        :type directory: str
106
        :type databases: SauceNaoDatabase|int
107
        :type minimum_similarity: float
108
        :type combine_api_types: bool
109
        :type api_key: str
110
        :type is_premium: bool
111
        :type exclude_categories: str
112
        :type move_to_categories: bool
113
        :type use_author_as_category: bool
114
        :type output_type: int
115
        :type start_file: str
116
        :type log_level: int
117
        :type title_minimum_similarity: float
118
        """
119 1
        self.directory = directory
120 1
        self.databases = databases
121 1
        self.minimum_similarity = minimum_similarity
122 1
        self.combine_api_types = combine_api_types
123 1
        self.api_key = api_key
124 1
        self.is_premium = is_premium
125 1
        self.exclude_categories = exclude_categories
126 1
        self.move_to_categories = move_to_categories
127 1
        self.use_author_as_category = use_author_as_category
128 1
        self.output_type = output_type
129 1
        self.start_file = start_file
130 1
        self.title_minimum_similarity = title_minimum_similarity
131
132 1
        if self.api_key:
133
            if self.is_premium:
134
                account_type = self.ACCOUNT_TYPE_PREMIUM
135
            else:
136
                account_type = self.ACCOUNT_TYPE_BASIC
137
            self.search_limit_30s = self.LIMIT_30_SECONDS[account_type]
138
        else:
139 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
140
141 1
        if self.combine_api_types:
142
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
143
            self.search_limit_30s /= 2
144
145 1
        self.previous_status_code = None
146
147 1
        logging.basicConfig(level=log_level)
148 1
        self.logger = logging.getLogger("saucenao_logger")
149
150 1
        if SauceNaoDatabase.is_uncompleted(self.databases):
151
            self.logger.warning("Database #{db} is uncompleted and should not be used.".format(db=self.databases))
152
153 1
    def check_file(self, file_name: str) -> list:
154
        """Check the given file for results on SauceNAO
155
156
        :type file_name: str
157
        :return:
158
        """
159 1
        self.logger.info("checking file: {0:s}".format(file_name))
160 1
        file_path = os.path.join(self.directory, file_name)
161 1
        with open(file_path, 'rb') as file_object:
162 1
            return self.check_file_object(file_object)
163
164 1
    def check_file_object(self, file_content: BinaryIO) -> list:
165
        """Check the passed file content for results on SauceNAO
166
167
        :type file_content: bytes
168
        :return:
169
        """
170 1
        if self.combine_api_types:
171
            result = self.__check_image(file_content, self.API_HTML_TYPE)
172
            sorted_results = self.parse_results_json(result)
173
174
            file_content.seek(0)
175
            additional_result = self.__check_image(file_content, self.API_JSON_TYPE)
176
            additional_sorted_results = self.parse_results_json(additional_result)
177
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
178
        else:
179 1
            result = self.__check_image(file_content, self.output_type)
180 1
            sorted_results = self.parse_results_json(result)
181
182 1
        filtered_results = self.__filter_results(sorted_results)
183 1
        return filtered_results
184
185 1
    def __get_http_data(self, file_object: BinaryIO, output_type: int):
186
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
187
188
        :param file_object:
189
        :param output_type:
190
        :return:
191
        """
192 1
        files = {'file': file_object.read()}
193
194 1
        headers = {
195
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
196
                          'Chrome/63.0.3239.84 Safari/537.36',
197
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
198
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
199
            'Accept-Encoding': 'gzip, deflate, br',
200
            'DNT': '1',
201
            'Connection': 'keep-alive'
202
        }
203
204 1
        params = {
205
            'file': file_object,
206
            # parameters taken from form on main page: https://saucenao.com/
207
            'url': None,
208
            'frame': 1,
209
            'hide': 0,
210
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
211
            'output_type': output_type,
212
            'db': self.databases,
213
        }
214
215 1
        if self.api_key:
216
            params['api_key'] = self.api_key
217
218 1
        return files, params, headers
219
220 1
    def __check_image(self, file_object: BinaryIO, output_type: int) -> str:
221
        """Check the possible sources for the given file object
222
223
        :type output_type: int
224
        :type file_object: typing.BinaryIO
225
        :return:
226
        """
227 1
        files, params, headers = self.__get_http_data(file_object=file_object, output_type=output_type)
228 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
229
230 1
        code, msg = http.verify_status_code(link)
231
232 1
        if code == http.STATUS_CODE_SKIP:
233
            self.logger.error(msg)
234
            return json.dumps({'results': []})
235 1
        elif code == http.STATUS_CODE_REPEAT:
236
            if not self.previous_status_code:
237
                self.previous_status_code = code
238
                self.logger.info(
239
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
240
                )
241
                time.sleep(10)
242
                return self.__check_image(file_object, output_type)
243
            else:
244
                raise UnknownStatusCodeException(msg)
245
        else:
246 1
            self.previous_status_code = None
247
248 1
        if output_type == self.API_HTML_TYPE:
249 1
            return self.parse_results_html_to_json(link.text)
250
251
        return link.text
252
253 1
    @staticmethod
254 1
    def parse_results_html_to_json(html: str) -> str:
255
        """Parse the results and sort them descending by similarity
256
257
        :type html: str
258
        :return:
259
        """
260 1
        soup = Soup(html, 'html.parser')
261
        # basic format of json API response
262 1
        results = {'header': {}, 'results': []}
263
264 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
265
            # optional field in SauceNao
266
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
267
            if title_tag:
268
                title = title_tag.text
269
            else:
270
                title = ''
271
272
            # mandatory field in SauceNao
273
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
274
            alternate_links = [a_tag['href'] for a_tag in
275
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
276
            content_column = []
277
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
278
            for content_column_tag in content_column_tags:
279
                for br in content_column_tag.find_all('br'):
280
                    br.replace_with('\n')
281
                content_column.append(content_column_tag.text)
282
283
            result = {
284
                'header': {
285
                    'similarity': similarity
286
                },
287
                'data': {
288
                    'title': title,
289
                    'content': content_column,
290
                    'ext_urls': alternate_links
291
                }
292
            }
293
            results['results'].append(result)
294
295 1
        return json.dumps(results)
296
297 1
    @staticmethod
298 1
    def parse_results_json(text: str) -> list:
299
        """Parse the results and sort them descending by similarity
300
301
        :type text: str
302
        :return:
303
        """
304 1
        result = json.loads(text)
305 1
        results = [res for res in result['results']]
306 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
307
308 1
    def __filter_results(self, sorted_results) -> list:
309
        """Return results with a similarity bigger or the same as the defined similarity from the arguments
310
        (default 65%)
311
312
        :type sorted_results: list|tuple|Generator
313
        :return:
314
        """
315 1
        filtered_results = []
316 1
        for res in sorted_results:
317
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
318
                filtered_results.append(res)
319
            else:
320
                # we can break here since the results are sorted by similarity anyways
321
                break
322 1
        return filtered_results
323
324 1
    @staticmethod
325 1
    def get_content_value(results: Iterable, key: str):
326
        """Return the first match of Material in content
327
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
328
329
        :type results: Iterable
330
        :type key: str
331
        :return:
332
        """
333
        for result in results:
334
            if 'content' in list(result['data'].keys()):
335
                for content in result['data']['content']:
336
                    if re.search(r'{0:s}: .*'.format(key), content):
337
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
338
        return ''
339
340 1
    @staticmethod
341 1
    def get_title_value(results: Iterable, key: str):
342
        """Return the first match of Material in the title section
343
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
344
345
        :type results: Iterable
346
        :type key: str
347
        :return:
348
        """
349
        for result in results:
350
            if 'title' in list(result['data'].keys()):
351
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
352
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
353
                        .split('\n')
354
        return ''
355
356 1
    @staticmethod
357 1
    def merge_dicts(x: dict, y: dict) -> dict:
358
        """Take x dictionary and insert/overwrite y dictionary values
359
360
        :type x: dict
361
        :type y: dict
362
        :return:
363
        """
364
        z = x.copy()
365
        z.update(y)
366
        return z
367
368 1
    def __merge_results(self, result: list, additional_result: list) -> list:
369
        """Merge two result arrays
370
371
        :type result: list
372
        :type additional_result: list
373
        :return:
374
        """
375
        if len(result) <= len(additional_result):
376
            length = len(result)
377
        else:
378
            length = len(additional_result)
379
380
        for i in range(length):
381
            for key in list(result[i].keys()):
382
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
383
384
        return result
385