Passed
Push — master ( 4b6ab6...72d98f )
by Randy
02:02
created

libs.analytics.Analytics.analyze()   F

Complexity

Conditions 15

Size

Total Lines 70
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 44
dl 0
loc 70
rs 2.9998
c 0
b 0
f 0
cc 15
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like libs.analytics.Analytics.analyze() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import ipaddress
2
import json
3
import sys
4
from configparser import ConfigParser
5
from hashlib import sha256
6
from urllib.parse import urlparse
7
8
import requests
9
import validators
10
from url_normalize import url_normalize
11
12
from .callback import WebServer
13
from .cron import Cron
14
from .data import Data
15
from .initialize import Initialize
16
from .survey import View, GoogleSafeBrowsing, PhishTank
17
from .tools import Tools
18
19
"""
20
    Copyright (c) 2020 Star Inc.(https://starinc.xyz)
21
22
    This Source Code Form is subject to the terms of the Mozilla Public
23
    License, v. 2.0. If a copy of the MPL was not distributed with this
24
    file, You can obtain one at http://mozilla.org/MPL/2.0/.
25
"""
26
27
28
class Analytics:
29
    # Loading Configs
30
    cfg = {}
31
    config = ConfigParser()
32
33
    def __init__(self, config: str = "config.ini"):
34
        # Read Config
35
        self.config.read(config)
36
        # Initialization
37
        Initialize(self)
38
        self.data_control = Data(self)
39
        self.view_survey = View(self)
40
        self.cron_job = Cron(self)
41
        self.safe_browsing = GoogleSafeBrowsing(
42
            self.cfg["Google Safe Browsing"]["google_api_key"]
43
        )
44
        self.phishtank = PhishTank(
45
            self.cfg["PhishTank"]["username"],
46
            self.cfg["PhishTank"]["api_key"]
47
        )
48
        Tools.set_ready(False)
49
50
    def start(self, port: int = 2020):
51
        """
52
        Start web service
53
54
        :param port: integer of port to listen online
55
        :return:
56
        """
57
        try:
58
            server = WebServer(self)
59
            self.cron_job.start()
60
            while not Tools.check_ready():
61
                pass
62
            print(
63
                Tools.get_time(),
64
                "[Start] Listening WebServer on port {}".format(port)
65
            )
66
            server.listen(port)
67
        except KeyboardInterrupt:
68
            self.stop()
69
70
    def stop(self):
71
        """
72
        Shutdown web service
73
74
        :return:
75
        """
76
        self.cron_job.stop()
77
        sys.exit(0)
78
79
    async def server_response(self, message: str):
80
        """
81
        Check responses from web service
82
83
        :param message: string of JSON format
84
        :return: dict to response
85
        """
86
        try:
87
            req_res = json.loads(message)
88
        except json.decoder.JSONDecodeError:
89
            return {"status": 401}
90
        if req_res.get("version") is not None:
91
            try:
92
                return await self._server_response(req_res)
93
            except:
94
                error_report = Tools.error_report()
95
                Tools.logger(error_report)
96
                return {"status": 500}
97
        return {"status": 400}
98
99
    async def _server_response(self, data: dict):
100
        """
101
        Handle responses from web service
102
103
        :param data: dict from message decoded
104
        :return: dict to response
105
        """
106
        if data.get("version") < 1:
107
            return {
108
                "status": 505
109
            }
110
111
        if "url" in data and validators.url(data["url"]):
112
            return await self.analyze(data)
113
114
        return {
115
            "status": 401
116
        }
117
118
    async def analyze(self, data: dict):
119
        """
120
        Do analysis from URL sent by message with databases
121
122
        :param data: dict from message decoded
123
        :return: dict to response
124
        """
125
        url = url_normalize(data.get("url"))
126
        url_hash = sha256(url.encode("utf-8")).hexdigest()
127
128
        try:
129
            response = requests.get(url)
130
        except requests.exceptions.ConnectionError as e:
131
            return {
132
                "status": 403,
133
                "reason": str(e)
134
            }
135
136
        if response.status_code != 200:
137
            return {
138
                "status": 404,
139
                "http_code": response.status_code
140
            }
141
142
        if "text/html" not in response.headers["content-type"]:
143
            return {
144
                "status": 405
145
            }
146
147
        url = response.url
148
149
        host = urlparse(url).hostname if urlparse(
150
            url).hostname != "localhost" else "127.0.0.1"
151
        if (validators.ipv4(host) or validators.ipv6(host)) and ipaddress.ip_address(host).is_private:
152
            return {
153
                "status": 403,
154
                "reason": "forbidden"
155
            }
156
157
        cache = self.data_control.find_result_cache_by_url_hash(url_hash)
158
159
        if cache is not None:
160
            score = cache
161
162
        elif self.data_control.check_trustlist(url):
163
            score = 1
164
165
        elif self.data_control.check_trust_domain(host):
166
            score = 1
167
168
        elif self.data_control.check_blacklist(url):
169
            score = 0
170
171
        elif self.data_control.check_warnlist(url):
172
            score = 0.5
173
174
        elif self.safe_browsing.lookup([url]):
175
            score = 0
176
            self.data_control.mark_as_blacklist(url)
177
178
        else:
179
            score = await self._deep_analyze(url)
180
181
        if cache is None:
182
            self.data_control.upload_result_cache(url_hash, score)
183
184
        return {
185
            "status": 200,
186
            "url": url,
187
            "trust_score": score
188
        }
189
190
    async def _deep_analyze(self, url: str):
191
        """
192
        Analyze URL with PageView
193
194
        :param url: URL that latest get via `requests`
195
        :return: float of the-trust-score between 0 to 1
196
        """
197
        origin_urls = []
198
        async for origin_url in self.view_survey.analyze(url):
199
            if origin_url:
200
                origin_urls.append(origin_url)
201
202
        if origin_urls:
203
            origin_urls_json = json.dumps(origin_urls)
204
            self.data_control.mark_as_warnlist(url, origin_urls_json)
205
            return 0.5
206
        return 1
207
208
    async def gen_sample(self):
209
        """
210
        Generate PageView samples with trustlist
211
212
        :return:
213
        """
214
        await self.view_survey.generate()
215
216
    def update_blacklist_from_phishtank(self):
217
        """
218
        Update database for blacklist from PhishTank
219
220
        :return:
221
        """
222
        try:
223
            blacklist = self.phishtank.get_database()
224
        except OSError:
225
            print(Tools.get_time(), "[Notice] PhishTank forbidden temporary.")
226
            return
227
228
        self.data_control.mark_as_blacklist_mass(
229
            [target.get("url") for target in blacklist]
230
        )
231