libs.analytics.Analytics.check_from_database()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 38
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 38
rs 6.6666
c 0
b 0
f 0
cc 9
nop 3
1
import ipaddress
2
import json
3
import sys
4
from configparser import ConfigParser
5
from hashlib import sha256
6
from urllib.parse import urlparse
7
8
import requests
9
import validators
10
from url_normalize import url_normalize
11
12
from .callback import WebServer
13
from .cron import Cron
14
from .data import Data
15
from .initialize import Initialize
16
from .survey import View, GoogleSafeBrowsing, PhishTank
17
from .tools import Tools
18
19
"""
20
    Copyright (c) 2020 Star Inc.(https://starinc.xyz)
21
22
    This Source Code Form is subject to the terms of the Mozilla Public
23
    License, v. 2.0. If a copy of the MPL was not distributed with this
24
    file, You can obtain one at http://mozilla.org/MPL/2.0/.
25
"""
26
27
28
class Analytics:
29
    # Loading Configs
30
    cfg = {}
31
32
    def __init__(self, config: str):
33
        # Read Config
34
        if config == "ENV":
35
            self.config = config
36
        else:
37
            self.config = ConfigParser()
38
            self.config.read(config)
39
        # Initialization
40
        Initialize(self)
41
        self.data_control = Data(self)
42
        self.view_survey = View(self)
43
        self.cron_job = Cron(self)
44
        self.safe_browsing = GoogleSafeBrowsing(
45
            self.cfg["SafeBrowsing"]["google_api_key"]
46
        )
47
        self.phishtank = PhishTank(
48
            self.cfg["PhishTank"]["username"],
49
            self.cfg["PhishTank"]["api_key"]
50
        )
51
        Tools.set_ready(False)
52
53
    def start(self, port: int = 2020):
54
        """
55
        Start web service
56
57
        :param port: integer of port to listen online
58
        :return:
59
        """
60
        try:
61
            server = WebServer(self)
62
            self.cron_job.start()
63
            while not Tools.check_ready():
64
                pass
65
            print(
66
                Tools.get_time(),
67
                "[Start] Listening WebServer on port {}".format(port)
68
            )
69
            server.listen(port)
70
        except KeyboardInterrupt:
71
            self.stop()
72
73
    def stop(self):
74
        """
75
        Shutdown web service
76
77
        :return:
78
        """
79
        self.cron_job.stop()
80
        sys.exit(0)
81
82
    async def analyze(self, data: dict):
83
        """
84
        Do analysis from URL sent by message with databases
85
86
        :param data: dict from message decoded
87
        :return: dict to response
88
        """
89
        url = url_normalize(data.get("url"))
90
91
        result_from_db = await self.check_from_database(url)
92
        if result_from_db is not None:
93
            return {
94
                "status": 200,
95
                "url": url,
96
                "trust_score": result_from_db
97
            }
98
99
        try:
100
            response = requests.get(url)
101
        except requests.exceptions.ConnectionError as e:
102
            return {
103
                "status": 403,
104
                "reason": str(e)
105
            }
106
107
        if response.status_code != 200:
108
            return {
109
                "status": 404,
110
                "http_code": response.status_code
111
            }
112
113
        if "text/html" not in response.headers["content-type"]:
114
            return {
115
                "status": 405
116
            }
117
118
        url = response.url
119
120
        host = urlparse(url).hostname if urlparse(
121
            url
122
        ).hostname != "localhost" else "127.0.0.1"
123
124
        if (validators.ipv4(host) or validators.ipv6(host)) and \
125
                ipaddress.ip_address(host).is_private:
126
            return {
127
                "status": 403,
128
                "reason": "forbidden"
129
            }
130
131
        result_from_db = await self.check_from_database(url, host)
132
        if result_from_db is not None:
133
            return {
134
                "status": 200,
135
                "url": url,
136
                "trust_score": result_from_db
137
            }
138
139
        return {
140
            "status": 200,
141
            "url": url,
142
            "trust_score": await self._deep_analyze(url)
143
        }
144
145
    async def check_from_database(self, url: str, host: str = None):
146
        """
147
        Check URL whether existed in database
148
149
        :param url: URL from request
150
        :param url_hash: URL hashed
151
        :param host: host from URL decoded
152
        :return: trust_score or NoneType
153
        """
154
        if host is None:
155
            host = urlparse(url).hostname
156
        url_hash = sha256(url.encode("utf-8")).hexdigest()
157
        cache = self.data_control.find_result_cache_by_url_hash(url_hash)
158
159
        if cache is not None:
160
            score = cache
161
162
        elif self.data_control.check_trustlist(url) or \
163
                self.data_control.check_trust_domain(host):
164
            score = 1
165
166
        elif self.data_control.check_warnlist(url):
167
            score = 0.5
168
169
        elif self.data_control.check_blacklist(url):
170
            score = 0
171
172
        elif self.safe_browsing.lookup([url]):
173
            self.data_control.mark_as_blacklist(url)
174
            score = 0
175
176
        else:
177
            return None
178
179
        if cache is None:
180
            self.data_control.upload_result_cache(url_hash, score)
181
182
        return score
183
184
    async def _deep_analyze(self, url: str):
185
        """
186
        Analyze URL with PageView
187
188
        :param url: URL that latest get via `requests`
189
        :return: float of the-trust-score between 0 to 1
190
        """
191
        origin_urls = []
192
        async for origin_url in self.view_survey.analyze(url):
193
            if origin_url:
194
                origin_urls.append(origin_url)
195
196
        if origin_urls:
197
            origin_urls_json = json.dumps(origin_urls)
198
            self.data_control.mark_as_warnlist(url, origin_urls_json)
199
            return 0.5
200
        return 1
201
202
    async def gen_sample(self):
203
        """
204
        Generate PageView samples with trustlist
205
206
        :return:
207
        """
208
        await self.view_survey.generate()
209
210
    def update_blacklist_from_phishtank(self):
211
        """
212
        Update database for blacklist from PhishTank
213
214
        :return:
215
        """
216
        try:
217
            blacklist = self.phishtank.get_database()
218
        except OSError:
219
            print(Tools.get_time(), "[Notice] PhishTank forbidden temporary.")
220
            return
221
222
        self.data_control.mark_as_blacklist_mass(
223
            [target.get("url") for target in blacklist]
224
        )
225