Passed
Push — master ( 33232c...bac56a )
by Steffen
01:23
created

SauceNao   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 261
Duplicated Lines 0 %

Importance

Changes 30
Bugs 3 Features 9
Metric Value
c 30
b 3
f 9
dl 0
loc 261
rs 9.3999
wmc 33

10 Methods

Rating   Name   Duplication   Size   Complexity  
A check_file() 0 20 2
B __init__() 0 32 1
A parse_results_json() 0 10 3
B get_http_data() 0 36 3
B parse_results_html_to_json() 0 43 6
A filter_results() 0 14 3
A merge_two_dicts() 0 11 1
B get_content_value() 0 15 5
B check_image() 0 32 5
A merge_results() 0 17 4
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
import json
4
import logging
5
import os
6
import os.path
7
import re
8
import time
9
from mimetypes import MimeTypes
10
from typing import Generator
11
12
import requests
13
from bs4 import BeautifulSoup as Soup
14
from bs4 import element
15
16
from saucenao import http
17
from saucenao.exceptions import *
18
19
20
class SauceNao(object):
21
    """"
22
    small script to work with SauceNao locally
23
    """
24
25
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
26
27
    # basic account allows currently 20 images within 30 seconds
28
    # you can increase this value is you have a premium account
29
    LIMIT_30_SECONDS = 20
30
31
    # 0=html, 2=json but json is omitting important data but includes more data about authors
32
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
33
    API_HTML_TYPE = 0
34
    API_JSON_TYPE = 2
35
36
    CONTENT_CATEGORY_KEY = 'Material'
37
    CONTENT_CHARACTERS_KEY = 'Characters'
38
39
    mime = None
40
    logger = None
41
42
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
43
                 exclude_categories='', move_to_categories=False, output_type=API_HTML_TYPE, start_file=None,
44
                 log_level=logging.ERROR, title_minimum_similarity=90):
45
        """Initializing function
46
47
        :type directory: str
48
        :type databases: int
49
        :type minimum_similarity: float
50
        :type combine_api_types: bool
51
        :type api_key: str
52
        :type exclude_categories: str
53
        :type move_to_categories: bool
54
        :type start_file: str
55
        :type log_level: int
56
        :type title_minimum_similarity: float
57
        """
58
        self._directory = directory
59
        self._databases = databases
60
        self._minimum_similarity = minimum_similarity
61
        self._combine_api_types = combine_api_types
62
        self._api_key = api_key
63
        self._exclude_categories = exclude_categories
64
        self._move_to_categories = move_to_categories
65
        self._output_type = output_type
66
        self._start_file = start_file
67
        self._title_minimum_similarity = title_minimum_similarity
68
69
        self._previous_status_code = None
70
71
        self.mime = MimeTypes()
72
        logging.basicConfig(level=log_level)
73
        self.logger = logging.getLogger("saucenao_logger")
74
75
    def check_file(self, file_name: str) -> list:
76
        """Check the given file for results on SauceNAO
77
78
        :type file_name: str
79
        :return:
80
        """
81
        self.logger.info("checking file: {0:s}".format(file_name))
82
        if self._combine_api_types:
83
            result = self.check_image(file_name, self.API_HTML_TYPE)
84
            sorted_results = self.parse_results_json(result)
85
86
            additional_result = self.check_image(file_name, self.API_JSON_TYPE)
87
            additional_sorted_results = self.parse_results_json(additional_result)
88
            sorted_results = self.merge_results(sorted_results, additional_sorted_results)
89
        else:
90
            result = self.check_image(file_name, self._output_type)
91
            sorted_results = self.parse_results_json(result)
92
93
        filtered_results = self.filter_results(sorted_results)
94
        return filtered_results
95
96
    def get_http_data(self, file_path: str, output_type: int):
97
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
98
99
        :param file_path:
100
        :param output_type:
101
        :return:
102
        """
103
        with open(file_path, 'rb') as file_object:
104
            files = {'file': file_object.read()}
105
106
        headers = {
107
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
108
                          'Chrome/63.0.3239.84 Safari/537.36',
109
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
110
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
111
            'Accept-Encoding': 'gzip, deflate, br',
112
            'DNT': '1',
113
            'Connection': 'keep-alive'
114
        }
115
116
        params = {
117
            'file': file_path,
118
            'Content-Type': self.mime.guess_type(file_path),
119
            # parameters taken from form on main page: https://saucenao.com/
120
            'url': None,
121
            'frame': 1,
122
            'hide': 0,
123
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
124
            'output_type': output_type,
125
            'db': self._databases,
126
        }
127
128
        if self._api_key:
129
            params['api_key'] = self._api_key
130
131
        return files, params, headers
132
133
    def check_image(self, file_name: str, output_type: int) -> str:
134
        """Check the possible sources for the given file
135
136
        :type output_type: int
137
        :type file_name: str
138
        :return:
139
        """
140
        file_path = os.path.join(self._directory, file_name)
141
142
        files, params, headers = self.get_http_data(file_path=file_path, output_type=output_type)
143
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
144
145
        code, msg = http.verify_status_code(link, file_name)
146
147
        if code == http.STATUS_CODE_SKIP:
148
            self.logger.error(msg)
149
            return json.dumps({'results': []})
150
        elif code == http.STATUS_CODE_REPEAT:
151
            if not self._previous_status_code:
152
                self._previous_status_code = code
153
                self.logger.info("Received an unexpected status code, repeating after 10 seconds...")
154
                time.sleep(10)
155
                return self.check_image(file_name, output_type)
156
            else:
157
                raise UnknownStatusCodeException(msg)
158
        else:
159
            self._previous_status_code = None
160
161
        if output_type == self.API_HTML_TYPE:
162
            return self.parse_results_html_to_json(link.text)
163
164
        return link.text
165
166
    @staticmethod
167
    def parse_results_html_to_json(html: str) -> str:
168
        """Parse the results and sort them descending by similarity
169
170
        :type html: str
171
        :return:
172
        """
173
        soup = Soup(html, 'html.parser')
174
        # basic format of json API response
175
        results = {'header': {}, 'results': []}
176
177
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
178
            # optional field in SauceNao
179
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
180
            if title_tag:
181
                title = title_tag.text
182
            else:
183
                title = ''
184
185
            # mandatory field in SauceNao
186
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
187
            alternate_links = [a_tag['href'] for a_tag in
188
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
189
            content_column = []
190
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
191
            for content_column_tag in content_column_tags:
192
                for br in content_column_tag.find_all('br'):
193
                    br.replace_with('\n')
194
                content_column.append(content_column_tag.text)
195
196
            result = {
197
                'header': {
198
                    'similarity': similarity
199
                },
200
                'data': {
201
                    'title': title,
202
                    'content': content_column,
203
                    'ext_urls': alternate_links
204
                }
205
            }
206
            results['results'].append(result)
207
208
        return json.dumps(results)
209
210
    @staticmethod
211
    def parse_results_json(text: str) -> list:
212
        """Parse the results and sort them descending by similarity
213
214
        :type text: str
215
        :return:
216
        """
217
        result = json.loads(text)
218
        results = [res for res in result['results']]
219
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
220
221
    def filter_results(self, sorted_results) -> list:
222
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
223
224
        :type sorted_results: list|tuple|Generator
225
        :return:
226
        """
227
        filtered_results = []
228
        for res in sorted_results:
229
            if float(res['header']['similarity']) >= float(self._minimum_similarity):
230
                filtered_results.append(res)
231
            else:
232
                # we can break here since the results are sorted by similarity anyways
233
                break
234
        return filtered_results
235
236
    @staticmethod
237
    def get_content_value(results, key: str):
238
        """Return the first match of Material in content
239
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
240
241
        :type results: list|tuple|Generator
242
        :type key: str
243
        :return:
244
        """
245
        for result in results:
246
            if 'content' in list(result['data'].keys()):
247
                for content in result['data']['content']:
248
                    if re.match('{0:s}: .*'.format(key), content):
249
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
250
        return ''
251
252
    @staticmethod
253
    def merge_two_dicts(x: dict, y: dict) -> dict:
254
        """Take x dictionary and insert/overwrite y dictionary values
255
256
        :type x: dict
257
        :type y: dict
258
        :return:
259
        """
260
        z = x.copy()
261
        z.update(y)
262
        return z
263
264
    def merge_results(self, result: list, additional_result: list) -> list:
265
        """Merge two result arrays
266
267
        :type result: list
268
        :type additional_result: list
269
        :return:
270
        """
271
        if len(result) <= len(additional_result):
272
            length = len(result)
273
        else:
274
            length = len(additional_result)
275
276
        for i in range(length):
277
            for key in list(result[i].keys()):
278
                result[i][key] = self.merge_two_dicts(result[i][key], additional_result[i][key])
279
280
        return result
281