Test Failed
Push — master ( 16370d...2bdc04 )
by Steffen
02:30
created

saucenao.saucenao   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 300
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 37
eloc 162
dl 0
loc 300
rs 9.44
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A SauceNao.merge_results() 0 17 4
A SauceNao.filter_results() 0 14 3
A SauceNao.__init__() 0 35 1
A SauceNao.merge_two_dicts() 0 11 1
B SauceNao.check_image() 0 32 5
B SauceNao.parse_results_html_to_json() 0 43 6
A SauceNao.get_title_value() 0 15 4
A SauceNao.get_http_data() 0 36 3
A SauceNao.check_file() 0 20 2
A SauceNao.parse_results_json() 0 10 3
A SauceNao.get_content_value() 0 15 5
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
import json
4
import logging
5
import os
6
import re
7
import time
8
from mimetypes import MimeTypes
9
from typing import Generator
10
11
import requests
12
from bs4 import BeautifulSoup as Soup
13
from bs4 import element
14
15
from saucenao import http
16
from saucenao.exceptions import *
17
18
19
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26
    # basic account allows currently 20 images within 30 seconds
27
    # you can increase this value is you have a premium account
28
    LIMIT_30_SECONDS = 20
29
30
    # 0=html, 2=json but json is omitting important data but includes more data about authors
31
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
32
    API_HTML_TYPE = 0
33
    API_JSON_TYPE = 2
34
35
    CONTENT_CATEGORY_KEY = 'Material'
36
    CONTENT_AUTHOR_KEY = 'Creator'
37
    CONTENT_CHARACTERS_KEY = 'Characters'
38
39
    mime = None
40
    logger = None
41
42
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
43
                 exclude_categories='', move_to_categories=False, use_author_as_category=False,
44
                 output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR, title_minimum_similarity=90):
45
        """Initializing function
46
47
        :type directory: str
48
        :type databases: int
49
        :type minimum_similarity: float
50
        :type combine_api_types: bool
51
        :type api_key: str
52
        :type exclude_categories: str
53
        :type move_to_categories: bool
54
        :type use_author_as_category: bool
55
        :type output_type: int
56
        :type start_file: str
57
        :type log_level: int
58
        :type title_minimum_similarity: float
59
        """
60
        self._directory = directory
61
        self._databases = databases
62
        self._minimum_similarity = minimum_similarity
63
        self._combine_api_types = combine_api_types
64
        self._api_key = api_key
65
        self._exclude_categories = exclude_categories
66
        self._move_to_categories = move_to_categories
67
        self._use_author_as_category = use_author_as_category
68
        self._output_type = output_type
69
        self._start_file = start_file
70
        self._title_minimum_similarity = title_minimum_similarity
71
72
        self._previous_status_code = None
73
74
        self.mime = MimeTypes()
75
        logging.basicConfig(level=log_level)
76
        self.logger = logging.getLogger("saucenao_logger")
77
78
    def check_file(self, file_name: str) -> list:
79
        """Check the given file for results on SauceNAO
80
81
        :type file_name: str
82
        :return:
83
        """
84
        self.logger.info("checking file: {0:s}".format(file_name))
85
        if self._combine_api_types:
86
            result = self.check_image(file_name, self.API_HTML_TYPE)
87
            sorted_results = self.parse_results_json(result)
88
89
            additional_result = self.check_image(file_name, self.API_JSON_TYPE)
90
            additional_sorted_results = self.parse_results_json(additional_result)
91
            sorted_results = self.merge_results(sorted_results, additional_sorted_results)
92
        else:
93
            result = self.check_image(file_name, self._output_type)
94
            sorted_results = self.parse_results_json(result)
95
96
        filtered_results = self.filter_results(sorted_results)
97
        return filtered_results
98
99
    def get_http_data(self, file_path: str, output_type: int):
100
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
101
102
        :param file_path:
103
        :param output_type:
104
        :return:
105
        """
106
        with open(file_path, 'rb') as file_object:
107
            files = {'file': file_object.read()}
108
109
        headers = {
110
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
111
                          'Chrome/63.0.3239.84 Safari/537.36',
112
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
113
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
114
            'Accept-Encoding': 'gzip, deflate, br',
115
            'DNT': '1',
116
            'Connection': 'keep-alive'
117
        }
118
119
        params = {
120
            'file': file_path,
121
            'Content-Type': self.mime.guess_type(file_path),
122
            # parameters taken from form on main page: https://saucenao.com/
123
            'url': None,
124
            'frame': 1,
125
            'hide': 0,
126
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
127
            'output_type': output_type,
128
            'db': self._databases,
129
        }
130
131
        if self._api_key:
132
            params['api_key'] = self._api_key
133
134
        return files, params, headers
135
136
    def check_image(self, file_name: str, output_type: int) -> str:
137
        """Check the possible sources for the given file
138
139
        :type output_type: int
140
        :type file_name: str
141
        :return:
142
        """
143
        file_path = os.path.join(self._directory, file_name)
144
145
        files, params, headers = self.get_http_data(file_path=file_path, output_type=output_type)
146
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
147
148
        code, msg = http.verify_status_code(link, file_name)
149
150
        if code == http.STATUS_CODE_SKIP:
151
            self.logger.error(msg)
152
            return json.dumps({'results': []})
153
        elif code == http.STATUS_CODE_REPEAT:
154
            if not self._previous_status_code:
155
                self._previous_status_code = code
156
                self.logger.info("Received an unexpected status code, repeating after 10 seconds...")
157
                time.sleep(10)
158
                return self.check_image(file_name, output_type)
159
            else:
160
                raise UnknownStatusCodeException(msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable UnknownStatusCodeException does not seem to be defined.
Loading history...
161
        else:
162
            self._previous_status_code = None
163
164
        if output_type == self.API_HTML_TYPE:
165
            return self.parse_results_html_to_json(link.text)
166
167
        return link.text
168
169
    @staticmethod
170
    def parse_results_html_to_json(html: str) -> str:
171
        """Parse the results and sort them descending by similarity
172
173
        :type html: str
174
        :return:
175
        """
176
        soup = Soup(html, 'html.parser')
177
        # basic format of json API response
178
        results = {'header': {}, 'results': []}
179
180
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
181
            # optional field in SauceNao
182
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
183
            if title_tag:
184
                title = title_tag.text
185
            else:
186
                title = ''
187
188
            # mandatory field in SauceNao
189
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
190
            alternate_links = [a_tag['href'] for a_tag in
191
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
192
            content_column = []
193
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
194
            for content_column_tag in content_column_tags:
195
                for br in content_column_tag.find_all('br'):
196
                    br.replace_with('\n')
197
                content_column.append(content_column_tag.text)
198
199
            result = {
200
                'header': {
201
                    'similarity': similarity
202
                },
203
                'data': {
204
                    'title': title,
205
                    'content': content_column,
206
                    'ext_urls': alternate_links
207
                }
208
            }
209
            results['results'].append(result)
210
211
        return json.dumps(results)
212
213
    @staticmethod
214
    def parse_results_json(text: str) -> list:
215
        """Parse the results and sort them descending by similarity
216
217
        :type text: str
218
        :return:
219
        """
220
        result = json.loads(text)
221
        results = [res for res in result['results']]
222
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
223
224
    def filter_results(self, sorted_results) -> list:
225
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
226
227
        :type sorted_results: list|tuple|Generator
228
        :return:
229
        """
230
        filtered_results = []
231
        for res in sorted_results:
232
            if float(res['header']['similarity']) >= float(self._minimum_similarity):
233
                filtered_results.append(res)
234
            else:
235
                # we can break here since the results are sorted by similarity anyways
236
                break
237
        return filtered_results
238
239
    @staticmethod
240
    def get_content_value(results, key: str):
241
        """Return the first match of Material in content
242
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
243
244
        :type results: list|tuple|Generator
245
        :type key: str
246
        :return:
247
        """
248
        for result in results:
249
            if 'content' in list(result['data'].keys()):
250
                for content in result['data']['content']:
251
                    if re.match('{0:s}: .*'.format(key), content):
252
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
253
        return ''
254
255
    @staticmethod
256
    def get_title_value(results, key: str):
257
        """Return the first match of Material in the title section
258
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
259
260
        :type results: list|tuple|Generator
261
        :type key: str
262
        :return:
263
        """
264
        for result in results:
265
            if 'title' in list(result['data'].keys()):
266
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
267
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
268
                        .split('\n')
269
        return ''
270
271
    @staticmethod
272
    def merge_two_dicts(x: dict, y: dict) -> dict:
273
        """Take x dictionary and insert/overwrite y dictionary values
274
275
        :type x: dict
276
        :type y: dict
277
        :return:
278
        """
279
        z = x.copy()
280
        z.update(y)
281
        return z
282
283
    def merge_results(self, result: list, additional_result: list) -> list:
284
        """Merge two result arrays
285
286
        :type result: list
287
        :type additional_result: list
288
        :return:
289
        """
290
        if len(result) <= len(additional_result):
291
            length = len(result)
292
        else:
293
            length = len(additional_result)
294
295
        for i in range(length):
296
            for key in list(result[i].keys()):
297
                result[i][key] = self.merge_two_dicts(result[i][key], additional_result[i][key])
298
299
        return result
300