Completed
Branch master (66dac0)
by Steffen
06:20 queued 03:07
created

SauceNao.get_title_value()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
c 0
b 0
f 0
dl 0
loc 15
rs 9.2
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
import json
4
import logging
5
import os
6
import re
7
import time
8
from mimetypes import MimeTypes
9
from typing import Generator
10
11
import requests
12
from bs4 import BeautifulSoup as Soup
13
from bs4 import element
14
15
from saucenao import http
16
from saucenao.exceptions import *
17
18
19
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26
    # basic account allows currently 20 images within 30 seconds
27
    # you can increase this value is you have a premium account
28
    LIMIT_30_SECONDS = 20
29
30
    # 0=html, 2=json but json is omitting important data but includes more data about authors
31
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
32
    API_HTML_TYPE = 0
33
    API_JSON_TYPE = 2
34
35
    CONTENT_CATEGORY_KEY = 'Material'
36
    CONTENT_AUTHOR_KEY = 'Creator'
37
    CONTENT_CHARACTERS_KEY = 'Characters'
38
39
    mime = None
40
    logger = None
41
42
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
43
                 exclude_categories='', move_to_categories=False, output_type=API_HTML_TYPE, start_file=None,
44
                 log_level=logging.ERROR, title_minimum_similarity=90, use_author_as_category=False):
45
        """Initializing function
46
47
        :type directory: str
48
        :type databases: int
49
        :type minimum_similarity: float
50
        :type combine_api_types: bool
51
        :type api_key: str
52
        :type exclude_categories: str
53
        :type move_to_categories: bool
54
        :type start_file: str
55
        :type log_level: int
56
        :type title_minimum_similarity: float
57
        """
58
        self._directory = directory
59
        self._databases = databases
60
        self._minimum_similarity = minimum_similarity
61
        self._combine_api_types = combine_api_types
62
        self._api_key = api_key
63
        self._exclude_categories = exclude_categories
64
        self._move_to_categories = move_to_categories
65
        self._output_type = output_type
66
        self._start_file = start_file
67
        self._title_minimum_similarity = title_minimum_similarity
68
        self._use_author_as_category = use_author_as_category
69
70
        self._previous_status_code = None
71
72
        self.mime = MimeTypes()
73
        logging.basicConfig(level=log_level)
74
        self.logger = logging.getLogger("saucenao_logger")
75
76
    def check_file(self, file_name: str) -> list:
77
        """Check the given file for results on SauceNAO
78
79
        :type file_name: str
80
        :return:
81
        """
82
        self.logger.info("checking file: {0:s}".format(file_name))
83
        if self._combine_api_types:
84
            result = self.check_image(file_name, self.API_HTML_TYPE)
85
            sorted_results = self.parse_results_json(result)
86
87
            additional_result = self.check_image(file_name, self.API_JSON_TYPE)
88
            additional_sorted_results = self.parse_results_json(additional_result)
89
            sorted_results = self.merge_results(sorted_results, additional_sorted_results)
90
        else:
91
            result = self.check_image(file_name, self._output_type)
92
            sorted_results = self.parse_results_json(result)
93
94
        filtered_results = self.filter_results(sorted_results)
95
        return filtered_results
96
97
    def get_http_data(self, file_path: str, output_type: int):
98
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
99
100
        :param file_path:
101
        :param output_type:
102
        :return:
103
        """
104
        with open(file_path, 'rb') as file_object:
105
            files = {'file': file_object.read()}
106
107
        headers = {
108
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
109
                          'Chrome/63.0.3239.84 Safari/537.36',
110
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
111
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
112
            'Accept-Encoding': 'gzip, deflate, br',
113
            'DNT': '1',
114
            'Connection': 'keep-alive'
115
        }
116
117
        params = {
118
            'file': file_path,
119
            'Content-Type': self.mime.guess_type(file_path),
120
            # parameters taken from form on main page: https://saucenao.com/
121
            'url': None,
122
            'frame': 1,
123
            'hide': 0,
124
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
125
            'output_type': output_type,
126
            'db': self._databases,
127
        }
128
129
        if self._api_key:
130
            params['api_key'] = self._api_key
131
132
        return files, params, headers
133
134
    def check_image(self, file_name: str, output_type: int) -> str:
135
        """Check the possible sources for the given file
136
137
        :type output_type: int
138
        :type file_name: str
139
        :return:
140
        """
141
        file_path = os.path.join(self._directory, file_name)
142
143
        files, params, headers = self.get_http_data(file_path=file_path, output_type=output_type)
144
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
145
146
        code, msg = http.verify_status_code(link, file_name)
147
148
        if code == http.STATUS_CODE_SKIP:
149
            self.logger.error(msg)
150
            return json.dumps({'results': []})
151
        elif code == http.STATUS_CODE_REPEAT:
152
            if not self._previous_status_code:
153
                self._previous_status_code = code
154
                self.logger.info("Received an unexpected status code, repeating after 10 seconds...")
155
                time.sleep(10)
156
                return self.check_image(file_name, output_type)
157
            else:
158
                raise UnknownStatusCodeException(msg)
159
        else:
160
            self._previous_status_code = None
161
162
        if output_type == self.API_HTML_TYPE:
163
            return self.parse_results_html_to_json(link.text)
164
165
        return link.text
166
167
    @staticmethod
168
    def parse_results_html_to_json(html: str) -> str:
169
        """Parse the results and sort them descending by similarity
170
171
        :type html: str
172
        :return:
173
        """
174
        soup = Soup(html, 'html.parser')
175
        # basic format of json API response
176
        results = {'header': {}, 'results': []}
177
178
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
179
            # optional field in SauceNao
180
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
181
            if title_tag:
182
                title = title_tag.text
183
            else:
184
                title = ''
185
186
            # mandatory field in SauceNao
187
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
188
            alternate_links = [a_tag['href'] for a_tag in
189
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
190
            content_column = []
191
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
192
            for content_column_tag in content_column_tags:
193
                for br in content_column_tag.find_all('br'):
194
                    br.replace_with('\n')
195
                content_column.append(content_column_tag.text)
196
197
            result = {
198
                'header': {
199
                    'similarity': similarity
200
                },
201
                'data': {
202
                    'title': title,
203
                    'content': content_column,
204
                    'ext_urls': alternate_links
205
                }
206
            }
207
            results['results'].append(result)
208
209
        return json.dumps(results)
210
211
    @staticmethod
212
    def parse_results_json(text: str) -> list:
213
        """Parse the results and sort them descending by similarity
214
215
        :type text: str
216
        :return:
217
        """
218
        result = json.loads(text)
219
        results = [res for res in result['results']]
220
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
221
222
    def filter_results(self, sorted_results) -> list:
223
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
224
225
        :type sorted_results: list|tuple|Generator
226
        :return:
227
        """
228
        filtered_results = []
229
        for res in sorted_results:
230
            if float(res['header']['similarity']) >= float(self._minimum_similarity):
231
                filtered_results.append(res)
232
            else:
233
                # we can break here since the results are sorted by similarity anyways
234
                break
235
        return filtered_results
236
237
    @staticmethod
238
    def get_content_value(results, key: str):
239
        """Return the first match of Material in content
240
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
241
242
        :type results: list|tuple|Generator
243
        :type key: str
244
        :return:
245
        """
246
        for result in results:
247
            if 'content' in list(result['data'].keys()):
248
                for content in result['data']['content']:
249
                    if re.match('{0:s}: .*'.format(key), content):
250
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
251
        return ''
252
253
    @staticmethod
254
    def get_title_value(results, key: str):
255
        """Return the first match of Material in the title section
256
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
257
258
        :type results: list|tuple|Generator
259
        :type key: str
260
        :return:
261
        """
262
        for result in results:
263
            if 'title' in list(result['data'].keys()):
264
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
265
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
266
                        .split('\n')
267
        return ''
268
269
    @staticmethod
270
    def merge_two_dicts(x: dict, y: dict) -> dict:
271
        """Take x dictionary and insert/overwrite y dictionary values
272
273
        :type x: dict
274
        :type y: dict
275
        :return:
276
        """
277
        z = x.copy()
278
        z.update(y)
279
        return z
280
281
    def merge_results(self, result: list, additional_result: list) -> list:
282
        """Merge two result arrays
283
284
        :type result: list
285
        :type additional_result: list
286
        :return:
287
        """
288
        if len(result) <= len(additional_result):
289
            length = len(result)
290
        else:
291
            length = len(additional_result)
292
293
        for i in range(length):
294
            for key in list(result[i].keys()):
295
                result[i][key] = self.merge_two_dicts(result[i][key], additional_result[i][key])
296
297
        return result
298