Passed
Push — master ( 95920f...059214 )
by Steffen
04:34
created

saucenao.saucenao.SauceNao.merge_two_dicts()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.216

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
ccs 2
cts 5
cp 0.4
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1.216
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import json
4 1
import logging
5 1
import os
6 1
import re
7 1
import time
8 1
from mimetypes import MimeTypes
9 1
from typing import Generator
10
11 1
import requests
12 1
from bs4 import BeautifulSoup as Soup
13 1
from bs4 import element
14
15 1
from saucenao import http
16 1
from saucenao.exceptions import *
17
18
19 1
class SauceNao(object):
20
    """"
21
    small script to work with SauceNao locally
22
    """
23
24 1
    SEARCH_POST_URL = 'http://saucenao.com/search.php'
25
26 1
    ACCOUNT_TYPE_UNREGISTERED = ""
27 1
    ACCOUNT_TYPE_REGISTERED = "basic"
28
29
    # individual search usage limitations
30 1
    LIMIT_30_SECONDS = {
31
        ACCOUNT_TYPE_UNREGISTERED: 4,
32
        ACCOUNT_TYPE_REGISTERED: 15,
33
    }
34
35
    # 0=html, 2=json but json is omitting important data but includes more data about authors
36
    # taken from the API documentation(requires login): https://saucenao.com/user.php?page=search-api
37 1
    API_HTML_TYPE = 0
38 1
    API_JSON_TYPE = 2
39
40 1
    CONTENT_CATEGORY_KEY = 'Material'
41 1
    CONTENT_AUTHOR_KEY = 'Creator'
42 1
    CONTENT_CHARACTERS_KEY = 'Characters'
43
44 1
    mime = None
45 1
    logger = None
46
47 1
    def __init__(self, directory, databases=999, minimum_similarity=65, combine_api_types=False, api_key=None,
48
                 exclude_categories='', move_to_categories=False, use_author_as_category=False,
49
                 output_type=API_HTML_TYPE, start_file=None, log_level=logging.ERROR, title_minimum_similarity=90):
50
        """Initializing function
51
52
        :type directory: str
53
        :type databases: int
54
        :type minimum_similarity: float
55
        :type combine_api_types: bool
56
        :type api_key: str
57
        :type exclude_categories: str
58
        :type move_to_categories: bool
59
        :type use_author_as_category: bool
60
        :type output_type: int
61
        :type start_file: str
62
        :type log_level: int
63
        :type title_minimum_similarity: float
64
        """
65 1
        self.directory = directory
66 1
        self.databases = databases
67 1
        self.minimum_similarity = minimum_similarity
68 1
        self.combine_api_types = combine_api_types
69 1
        self.api_key = api_key
70 1
        self.exclude_categories = exclude_categories
71 1
        self.move_to_categories = move_to_categories
72 1
        self.use_author_as_category = use_author_as_category
73 1
        self.output_type = output_type
74 1
        self.start_file = start_file
75 1
        self.title_minimum_similarity = title_minimum_similarity
76
77 1
        if self.api_key:
78
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_REGISTERED]
79
        else:
80 1
            self.search_limit_30s = self.LIMIT_30_SECONDS[self.ACCOUNT_TYPE_UNREGISTERED]
81
82 1
        if self.combine_api_types:
83
            # if we combine the API types we require twice as many API requests, so half the limit per 30 seconds
84
            self.search_limit_30s /= 2
85
86 1
        self.previous_status_code = None
87
88 1
        self.mime = MimeTypes()
89 1
        logging.basicConfig(level=log_level)
90 1
        self.logger = logging.getLogger("saucenao_logger")
91
92 1
    def check_file(self, file_name: str) -> list:
93
        """Check the given file for results on SauceNAO
94
95
        :type file_name: str
96
        :return:
97
        """
98 1
        self.logger.info("checking file: {0:s}".format(file_name))
99 1
        if self.combine_api_types:
100
            result = self.__check_image(file_name, self.API_HTML_TYPE)
101
            sorted_results = self.parse_results_json(result)
102
103
            additional_result = self.__check_image(file_name, self.API_JSON_TYPE)
104
            additional_sorted_results = self.parse_results_json(additional_result)
105
            sorted_results = self.__merge_results(sorted_results, additional_sorted_results)
106
        else:
107 1
            result = self.__check_image(file_name, self.output_type)
108 1
            sorted_results = self.parse_results_json(result)
109
110 1
        filtered_results = self.__filter_results(sorted_results)
111 1
        return filtered_results
112
113 1
    def __get_http_data(self, file_path: str, output_type: int):
114
        """Prepare the http relevant data(files, headers, params) for the given file path and output type
115
116
        :param file_path:
117
        :param output_type:
118
        :return:
119
        """
120 1
        with open(file_path, 'rb') as file_object:
121 1
            files = {'file': file_object.read()}
122
123 1
        headers = {
124
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
125
                          'Chrome/63.0.3239.84 Safari/537.36',
126
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
127
            'Accept-Language': 'en-DE,en-US;q=0.9,en;q=0.8',
128
            'Accept-Encoding': 'gzip, deflate, br',
129
            'DNT': '1',
130
            'Connection': 'keep-alive'
131
        }
132
133 1
        params = {
134
            'file': file_path,
135
            'Content-Type': self.mime.guess_type(file_path),
136
            # parameters taken from form on main page: https://saucenao.com/
137
            'url': None,
138
            'frame': 1,
139
            'hide': 0,
140
            # parameters taken from API documentation: https://saucenao.com/user.php?page=search-api
141
            'output_type': output_type,
142
            'db': self.databases,
143
        }
144
145 1
        if self.api_key:
146
            params['api_key'] = self.api_key
147
148 1
        return files, params, headers
149
150 1
    def __check_image(self, file_name: str, output_type: int) -> str:
151
        """Check the possible sources for the given file
152
153
        :type output_type: int
154
        :type file_name: str
155
        :return:
156
        """
157 1
        file_path = os.path.join(self.directory, file_name)
158
159 1
        files, params, headers = self.__get_http_data(file_path=file_path, output_type=output_type)
160 1
        link = requests.post(url=self.SEARCH_POST_URL, files=files, params=params, headers=headers)
161
162 1
        code, msg = http.verify_status_code(link, file_name)
163
164 1
        if code == http.STATUS_CODE_SKIP:
165
            self.logger.error(msg)
166
            return json.dumps({'results': []})
167 1
        elif code == http.STATUS_CODE_REPEAT:
168
            if not self.previous_status_code:
169
                self.previous_status_code = code
170
                self.logger.info(
171
                    "Received an unexpected status code (message: {msg}), repeating after 10 seconds...".format(msg=msg)
172
                )
173
                time.sleep(10)
174
                return self.__check_image(file_name, output_type)
175
            else:
176
                raise UnknownStatusCodeException(msg)
177
        else:
178 1
            self.previous_status_code = None
179
180 1
        if output_type == self.API_HTML_TYPE:
181 1
            return self.parse_results_html_to_json(link.text)
182
183
        return link.text
184
185 1
    @staticmethod
186 1
    def parse_results_html_to_json(html: str) -> str:
187
        """Parse the results and sort them descending by similarity
188
189
        :type html: str
190
        :return:
191
        """
192 1
        soup = Soup(html, 'html.parser')
193
        # basic format of json API response
194 1
        results = {'header': {}, 'results': []}
195
196 1
        for res in soup.find_all('td', attrs={"class": "resulttablecontent"}):  # type: element.Tag
197
            # optional field in SauceNao
198 1
            title_tag = res.find_next('div', attrs={"class": "resulttitle"})
199 1
            if title_tag:
200 1
                title = title_tag.text
201
            else:
202
                title = ''
203
204
            # mandatory field in SauceNao
205 1
            similarity = res.find_next('div', attrs={"class": "resultsimilarityinfo"}).text.replace('%', '')
206 1
            alternate_links = [a_tag['href'] for a_tag in
207
                               res.find_next('div', attrs={"class": "resultmiscinfo"}).find_all('a', href=True)]
208 1
            content_column = []
209 1
            content_column_tags = res.find_all('div', attrs={"class": "resultcontentcolumn"})
210 1
            for content_column_tag in content_column_tags:
211 1
                for br in content_column_tag.find_all('br'):
212 1
                    br.replace_with('\n')
213 1
                content_column.append(content_column_tag.text)
214
215 1
            result = {
216
                'header': {
217
                    'similarity': similarity
218
                },
219
                'data': {
220
                    'title': title,
221
                    'content': content_column,
222
                    'ext_urls': alternate_links
223
                }
224
            }
225 1
            results['results'].append(result)
226
227 1
        return json.dumps(results)
228
229 1
    @staticmethod
230 1
    def parse_results_json(text: str) -> list:
231
        """Parse the results and sort them descending by similarity
232
233
        :type text: str
234
        :return:
235
        """
236 1
        result = json.loads(text)
237 1
        results = [res for res in result['results']]
238 1
        return sorted(results, key=lambda k: float(k['header']['similarity']), reverse=True)
239
240 1
    def __filter_results(self, sorted_results) -> list:
241
        """Return results with a similarity bigger or the same as the defined similarity from the arguments (default 65%)
242
243
        :type sorted_results: list|tuple|Generator
244
        :return:
245
        """
246 1
        filtered_results = []
247 1
        for res in sorted_results:
248 1
            if float(res['header']['similarity']) >= float(self.minimum_similarity):
249 1
                filtered_results.append(res)
250
            else:
251
                # we can break here since the results are sorted by similarity anyways
252
                break
253 1
        return filtered_results
254
255 1
    @staticmethod
256 1
    def get_content_value(results, key: str):
257
        """Return the first match of Material in content
258
        multiple sites have a categorisation which SauceNao utilizes to provide it in the content section
259
260
        :type results: list|tuple|Generator
261
        :type key: str
262
        :return:
263
        """
264 1
        for result in results:
265 1
            if 'content' in list(result['data'].keys()):
266 1
                for content in result['data']['content']:
267 1
                    if re.search(r'{0:s}: .*'.format(key), content):
268
                        return ''.join(re.split(r'{0:s}: '.format(key), content)[1:]).rstrip("\n").split('\n')
269 1
        return ''
270
271 1
    @staticmethod
272 1
    def get_title_value(results, key: str):
273
        """Return the first match of Material in the title section
274
        SauceNAO provides the authors name in the title section f.e. if provided by the indexed entry
275
276
        :type results: list|tuple|Generator
277
        :type key: str
278
        :return:
279
        """
280
        for result in results:
281
            if 'title' in list(result['data'].keys()):
282
                if re.match('{0:s}: .*'.format(key), result['data']['title']):
283
                    return ''.join(re.split(r'{0:s}: '.format(key), result['data']['title'])[1:]).rstrip("\n") \
284
                        .split('\n')
285
        return ''
286
287 1
    @staticmethod
288 1
    def merge_dicts(x: dict, y: dict) -> dict:
289
        """Take x dictionary and insert/overwrite y dictionary values
290
291
        :type x: dict
292
        :type y: dict
293
        :return:
294
        """
295
        z = x.copy()
296
        z.update(y)
297
        return z
298
299 1
    def __merge_results(self, result: list, additional_result: list) -> list:
300
        """Merge two result arrays
301
302
        :type result: list
303
        :type additional_result: list
304
        :return:
305
        """
306
        if len(result) <= len(additional_result):
307
            length = len(result)
308
        else:
309
            length = len(additional_result)
310
311
        for i in range(length):
312
            for key in list(result[i].keys()):
313
                result[i][key] = self.merge_dicts(result[i][key], additional_result[i][key])
314
315
        return result
316