Passed
Push — master ( 5b5e86...33232c )
by Steffen
01:28
created

SauceNao.merge_results()   A

Complexity

Conditions 4

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 1
Metric Value
cc 4
c 3
b 0
f 1
dl 0
loc 17
rs 9.2
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
import json
4
import logging
5
import os
6
import os.path
7
import re
8
import time
9
from mimetypes import MimeTypes
10
from typing import Generator
11
12
import requests
13
from bs4 import BeautifulSoup as Soup
14
from bs4 import element
15
16
from saucenao import http
17
from saucenao.exceptions import *
18
19
20
class SauceNao(object):
21
    """"
22
    small script to work with SauceNao locally
23
    """
24
25
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
26
27
    # basic account allows currently 20 images within 30 seconds
28
    # you can increase this value is you have a premium account
29
    LIMIT_30_SECONDS = 20
30
31
    # 0=html, 2=json but json is omitting important data but includes more data about authors
32
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
33
    API_HTML_TYPE = 0
34
    API_JSON_TYPE = 2
35
36
    CONTENT_CATEGORY_KEY = 'Material'
37
    CONTENT_CHARACTERS_KEY = 'Characters'
38
39
    mime = None
40
    logger = None
41
42
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
43
                 exclude_categories='', move_to_categories=False, output_type=API_HTML_TYPE, start_file=None,
44
                 log_level=logging.ERROR, title_minimum_similarity=90):
45
        """Initializing function
46
47
        :type directory: str
48
        :type databases: int
49
        :type minimum_similarity: float
50
        :type combine_api_types: bool
51
        :type api_key: str
52
        :type exclude_categories: str
53
        :type move_to_categories: bool
54
        :type start_file: str
55
        :type log_level: int
56
        :type title_minimum_similarity: float
57
        """
58
        self._directory = directory
59
        self._databases = databases
60
        self._minimum_similarity = minimum_similarity
61
        self._combine_api_types = combine_api_types
62
        self._api_key = api_key
63
        self._exclude_categories = exclude_categories
64
        self._move_to_categories = move_to_categories
65
        self._output_type = output_type
66
        self._start_file = start_file
67
        self._title_minimum_similarity = title_minimum_similarity
68
69
        self._previous_status_code = None
70
71
        self.mime = MimeTypes()
72
        logging.basicConfig(level=log_level)
73
        self.logger = logging.getLogger("saucenao_logger")
74
75
    def check_file(self, file_name: str) -> list:
76
        """Check the given file for results on SauceNAO
77
78
        :type file_name: str
79
        :return:
80
        """
81
        self.logger.info("checking file: {0:s}".format(file_name))
82
        if self._combine_api_types:
83
            result = self.check_image(file_name, self.API_HTML_TYPE)
84
            sorted_results = self.parse_results_json(result)
85
86
            additional_result = self.check_image(file_name, self.API_JSON_TYPE)
87
            additional_sorted_results = self.parse_results_json(additional_result)
88
            sorted_results = self.merge_results(sorted_results, additional_sorted_results)
89
        else:
90
            result = self.check_image(file_name, self._output_type)
91
            sorted_results = self.parse_results_json(result)
92
93
        filtered_results = self.filter_results(sorted_results)
94
        return filtered_results
95
96
    def check_image(self, file_name: str, output_type: int) -> str:
97
        """Check the possible sources for the given file
98
99
        :type output_type: int
100
        :type file_name: str
101
        :return:
102
        """
103
        file_path = os.path.join(self._directory, file_name)
104
105
        with open(file_path, 'rb') as file_object:
106
            files = {'file': file_object.read()}
107
108
        headers = {
109
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
110
                          'Chrome/63.0.3239.84 Safari/537.36',
111
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
112
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
113
            'Accept-Encoding': 'gzip, deflate, br',
114
            'DNT': '1',
115
            'Connection': 'keep-alive'
116
        }
117
118
        params = {
119
            'file': file_path,
120
            'Content-Type': self.mime.guess_type(file_path),
121
            # parameters taken from form on main page: https://saucenao.com/
122
            'url': None,
123
            'frame': 1,
124
            'hide': 0,
125
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
126
            'output_type': output_type,
127
            'db': self._databases,
128
        }
129
130
        if self._api_key:
131
            params['api_key'] = self._api_key
132
133
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
134
135
        code, msg = http.verify_status_code(link, file_name)
136
137
        if code == http.STATUS_CODE_SKIP:
138
            self.logger.error(msg)
139
            return json.dumps({'results': []})
140
        elif code == http.STATUS_CODE_REPEAT:
141
            if not self._previous_status_code:
142
                self._previous_status_code = code
143
                self.logger.info("Received an unexpected status code, repeating after 10 seconds...")
144
                time.sleep(10)
145
                return self.check_image(file_name, output_type)
146
            else:
147
                raise UnknownStatusCodeException(msg)
148
        else:
149
            self._previous_status_code = None
150
151
        if output_type == self.API_HTML_TYPE:
152
            return self.parse_results_html_to_json(link.text)
153
154
        return link.text
155
156
    @staticmethod
157
    def parse_results_html_to_json(html: str) -> str:
158
        """Parse the results and sort them descending by similarity
159
160
        :type html: str
161
        :return:
162
        """
163
        soup = Soup(html, 'html.parser')
164
        # basic format of json API response
165
        results = {'header': {}, 'results': []}
166
167
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
168
            # optional field in SauceNao
169
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
170
            if title_tag:
171
                title = title_tag.text
172
            else:
173
                title = ''
174
175
            # mandatory field in SauceNao
176
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
177
            alternate_links = [a_tag['href'] for a_tag in
178
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
179
            content_column = []
180
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
181
            for content_column_tag in content_column_tags:
182
                for br in content_column_tag.find_all('br'):
183
                    br.replace_with('\n')
184
                content_column.append(content_column_tag.text)
185
186
            result = {
187
                'header': {
188
                    'similarity': similarity
189
                },
190
                'data': {
191
                    'title': title,
192
                    'content': content_column,
193
                    'ext_urls': alternate_links
194
                }
195
            }
196
            results['results'].append(result)
197
198
        return json.dumps(results)
199
200
    @staticmethod
201
    def parse_results_json(text: str) -> list:
202
        """Parse the results and sort them descending by similarity
203
204
        :type text: str
205
        :return:
206
        """
207
        result = json.loads(text)
208
        results = [res for res in result['results']]
209
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
210
211
    def filter_results(self, sorted_results) -> list:
212
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
213
214
        :type sorted_results: list|tuple|Generator
215
        :return:
216
        """
217
        filtered_results = []
218
        for res in sorted_results:
219
            if float(res['header']['similarity']) >= float(self._minimum_similarity):
220
                filtered_results.append(res)
221
            else:
222
                # we can break here since the results are sorted by similarity anyways
223
                break
224
        return filtered_results
225
226
    @staticmethod
227
    def get_content_value(results, key: str):
228
        """Return the first match of Material in content
229
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
230
231
        :type results: list|tuple|Generator
232
        :type key: str
233
        :return:
234
        """
235
        for result in results:
236
            if 'content' in list(result['data'].keys()):
237
                for content in result['data']['content']:
238
                    if re.match('{0:s}: .*'.format(key), content):
239
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
240
        return ''
241
242
    @staticmethod
243
    def merge_two_dicts(x: dict, y: dict) -> dict:
244
        """Take x dictionary and insert/overwrite y dictionary values
245
246
        :type x: dict
247
        :type y: dict
248
        :return:
249
        """
250
        z = x.copy()
251
        z.update(y)
252
        return z
253
254
    def merge_results(self, result: list, additional_result: list) -> list:
255
        """Merge two result arrays
256
257
        :type result: list
258
        :type additional_result: list
259
        :return:
260
        """
261
        if len(result) <= len(additional_result):
262
            length = len(result)
263
        else:
264
            length = len(additional_result)
265
266
        for i in range(length):
267
            for key in list(result[i].keys()):
268
                result[i][key] = self.merge_two_dicts(result[i][key], additional_result[i][key])
269
270
        return result
271