Completed
Branch master (9117d2)
by Steffen
03:33 queued 12s
created

SauceNao   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 248
Duplicated Lines 0 %

Importance

Changes 28
Bugs 2 Features 9
Metric Value
c 28
b 2
f 9
dl 0
loc 248
rs 9.8
wmc 31

9 Methods

Rating   Name   Duplication   Size   Complexity  
A parse_results_json() 0 10 3
B parse_results_html_to_json() 0 43 6
A filter_results() 0 14 3
A check_file() 0 20 2
A merge_two_dicts() 0 11 1
B get_content_value() 0 15 5
B check_image() 0 56 6
B __init__() 0 32 1
A merge_results() 0 17 4
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
import json
4
import logging
5
import os
6
import os.path
7
import re
8
import time
9
from mimetypes import MimeTypes
10
from typing import Generator
11
12
import requests
13
from bs4 import BeautifulSoup as Soup
14
from bs4 import element
15
16
from saucenao import http
17
18
try:
19
    from titlesearch import get_similar_titles
20
except ImportError:
21
    get_similar_titles = None
22
23
from saucenao.exceptions import *
24
25
26
class SauceNao(object):
27
    """"
28
    small script to work with SauceNao locally
29
    """
30
31
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
32
33
    # basic account allows currently 20 images within 30 seconds
34
    # you can increase this value is you have a premium account
35
    LIMIT_30_SECONDS = 20
36
37
    # 0=html, 2=json but json is omitting important data but includes more data about authors
38
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
39
    API_HTML_TYPE = 0
40
    API_JSON_TYPE = 2
41
42
    CONTENT_CATEGORY_KEY = 'Material'
43
    CONTENT_CHARACTERS_KEY = 'Characters'
44
45
    mime = None
46
    logger = None
47
48
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
49
                 exclude_categories='', move_to_categories=False, output_type=API_HTML_TYPE, start_file=None,
50
                 log_level=logging.ERROR, title_minimum_similarity=90):
51
        """Initializing function
52
53
        :type directory: str
54
        :type databases: int
55
        :type minimum_similarity: float
56
        :type combine_api_types: bool
57
        :type api_key: str
58
        :type exclude_categories: str
59
        :type move_to_categories: bool
60
        :type start_file: str
61
        :type log_level: int
62
        :type title_minimum_similarity: float
63
        """
64
        self._directory = directory
65
        self._databases = databases
66
        self._minimum_similarity = minimum_similarity
67
        self._combine_api_types = combine_api_types
68
        self._api_key = api_key
69
        self._exclude_categories = exclude_categories
70
        self._move_to_categories = move_to_categories
71
        self._output_type = output_type
72
        self._start_file = start_file
73
        self._title_minimum_similarity = title_minimum_similarity
74
75
        self._previous_status_code = None
76
77
        self.mime = MimeTypes()
78
        logging.basicConfig(level=log_level)
79
        self.logger = logging.getLogger("saucenao_logger")
80
81
    def check_file(self, file_name: str) -> list:
82
        """Check the given file for results on SauceNAO
83
84
        :type file_name: str
85
        :return:
86
        """
87
        self.logger.info("checking file: {0:s}".format(file_name))
88
        if self._combine_api_types:
89
            result = self.check_image(file_name, self.API_HTML_TYPE)
90
            sorted_results = self.parse_results_json(result)
91
92
            additional_result = self.check_image(file_name, self.API_JSON_TYPE)
93
            additional_sorted_results = self.parse_results_json(additional_result)
94
            sorted_results = self.merge_results(sorted_results, additional_sorted_results)
95
        else:
96
            result = self.check_image(file_name, self._output_type)
97
            sorted_results = self.parse_results_json(result)
98
99
        filtered_results = self.filter_results(sorted_results)
100
        return filtered_results
101
102
    def check_image(self, file_name: str, output_type: int) -> str:
103
        """Check the possible sources for the given file
104
105
        :type output_type: int
106
        :type file_name: str
107
        :return:
108
        """
109
        file_path = os.path.join(self._directory, file_name)
110
111
        files = {'file': open(file_path, 'rb').read()}
112
        headers = {
113
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
114
                          'Chrome/63.0.3239.84 Safari/537.36',
115
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
116
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
117
            'Accept-Encoding': 'gzip, deflate, br',
118
            'DNT': '1',
119
            'Connection': 'keep-alive'
120
        }
121
        params = {
122
            'file': file_path,
123
            'Content-Type': self.mime.guess_type(file_path),
124
            # parameters taken from form on main page: https://saucenao.com/
125
            'url': None,
126
            'frame': 1,
127
            'hide': 0,
128
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
129
            'output_type': output_type,
130
            'db': self._databases,
131
        }
132
133
        if self._api_key:
134
            params['api_key'] = self._api_key
135
136
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
137
138
        code, msg = http.verify_status_code(link, file_name)
139
140
        if code == http.STATUS_CODE_SKIP:
141
            self.logger.error(msg)
142
            return json.dumps({'results': []})
143
        elif code == http.STATUS_CODE_REPEAT:
144
            if not self._previous_status_code:
145
                self._previous_status_code = code
146
                self.logger.info("Received an unexpected status code, repeating after 10 seconds...")
147
                time.sleep(10)
148
                return self.check_image(file_name, output_type)
149
            else:
150
                raise UnknownStatusCodeException(msg)
151
        else:
152
            self._previous_status_code = None
153
154
        if output_type == self.API_HTML_TYPE:
155
            return self.parse_results_html_to_json(link.text)
156
157
        return link.text
158
159
    @staticmethod
160
    def parse_results_html_to_json(html: str) -> str:
161
        """Parse the results and sort them descending by similarity
162
163
        :type html: str
164
        :return:
165
        """
166
        soup = Soup(html, 'html.parser')
167
        # basic format of json API response
168
        results = {'header': {}, 'results': []}
169
170
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
171
            # optional field in SauceNao
172
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
173
            if title_tag:
174
                title = title_tag.text
175
            else:
176
                title = ''
177
178
            # mandatory field in SauceNao
179
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
180
            alternate_links = [a_tag['href'] for a_tag in
181
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
182
            content_column = []
183
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
184
            for content_column_tag in content_column_tags:
185
                for br in content_column_tag.find_all('br'):
186
                    br.replace_with('\n')
187
                content_column.append(content_column_tag.text)
188
189
            result = {
190
                'header': {
191
                    'similarity': similarity
192
                },
193
                'data': {
194
                    'title': title,
195
                    'content': content_column,
196
                    'ext_urls': alternate_links
197
                }
198
            }
199
            results['results'].append(result)
200
201
        return json.dumps(results)
202
203
    @staticmethod
204
    def parse_results_json(text: str) -> list:
205
        """Parse the results and sort them descending by similarity
206
207
        :type text: str
208
        :return:
209
        """
210
        result = json.loads(text)
211
        results = [res for res in result['results']]
212
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
213
214
    def filter_results(self, sorted_results) -> list:
215
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
216
217
        :type sorted_results: list|tuple|Generator
218
        :return:
219
        """
220
        filtered_results = []
221
        for res in sorted_results:
222
            if float(res['header']['similarity']) >= float(self._minimum_similarity):
223
                filtered_results.append(res)
224
            else:
225
                # we can break here since the results are sorted by similarity anyways
226
                break
227
        return filtered_results
228
229
    @staticmethod
230
    def get_content_value(results, key: str):
231
        """Return the first match of Material in content
232
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
233
234
        :type results: list|tuple|Generator
235
        :type key: str
236
        :return:
237
        """
238
        for result in results:
239
            if 'content' in list(result['data'].keys()):
240
                for content in result['data']['content']:
241
                    if re.match('{0:s}: .*'.format(key), content):
242
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
243
        return ''
244
245
    @staticmethod
246
    def merge_two_dicts(x: dict, y: dict) -> dict:
247
        """Take x dictionary and insert/overwrite y dictionary values
248
249
        :type x: dict
250
        :type y: dict
251
        :return:
252
        """
253
        z = x.copy()
254
        z.update(y)
255
        return z
256
257
    def merge_results(self, result: list, additional_result: list) -> list:
258
        """Merge two result arrays
259
260
        :type result: list
261
        :type additional_result: list
262
        :return:
263
        """
264
        if len(result) <= len(additional_result):
265
            length = len(result)
266
        else:
267
            length = len(additional_result)
268
269
        for i in range(length):
270
            for key in list(result[i].keys()):
271
                result[i][key] = self.merge_two_dicts(result[i][key], additional_result[i][key])
272
273
        return result
274