Completed
Push — main ( cff36f...bdd9b2 )
by
unknown
22s queued 12s
created

NetworkCrawlerClient.websocket_status_check()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 3
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
1
import multiprocessing as mp
2
from tradehub.utils import Request
3
import random
4
from requests.exceptions import ConnectionError, HTTPError, Timeout
5
import socket
6
import threading
7
8
9
class NetworkCrawlerClient(object):
10
11
    def __init__(self,
12
                 network: str = "testnet",
13
                 trusted_ip_list: list = None,
14
                 trusted_uri_list: list = None,
15
                 is_secure: bool = False,
16
                 is_websocket_client: bool = False):
17
        if network.lower() not in ["main", "mainnet", "test", "testnet"]:
18
            raise ValueError("Parameter network - {} - is not valid, requires main, mainnent, test, or testnet.".format(network))
19
20
        if trusted_ip_list and trusted_uri_list:
21
            raise ValueError("Can't use both IP and URI list, only pass one option.")
22
23
        if trusted_ip_list or trusted_uri_list:
24
            BYPASS_NETWORK_CRAWLER = True
25
        else:
26
            BYPASS_NETWORK_CRAWLER = False
27
28
        self.is_secure = is_secure
29
        if self.is_secure:
30
            self.http_string = 'https'
31
            self.ws_string = 'wss'
32
        else:
33
            self.http_string = 'http'
34
            self.ws_string = 'ws'
35
        self.is_websocket_client = is_websocket_client
36
        self.active_ws_uri_list = []
37
38
        if not BYPASS_NETWORK_CRAWLER:
39
            self.seed_peers_list = {
40
                "main": ["54.255.5.46", "175.41.151.35"],
41
                "mainnet": ["54.255.5.46", "175.41.151.35"],
42
                "test": ["54.255.42.175", "52.220.152.108"],
43
                "testnet": ["54.255.42.175", "52.220.152.108"],
44
            }
45
            self.tradescan_node_url = {
46
                "main": "https://switcheo.org/nodes?net=main",
47
                "mainnet": "https://switcheo.org/nodes?net=main",
48
                "test": "https://switcheo.org/nodes?net=test",
49
                "testnet": "https://switcheo.org/nodes?net=test",
50
            }
51
52
            self.all_peers_list = self.seed_peers_list[network.lower()]
53
            self.active_validator_list = []
54
            self.active_sentry_api_list = []
55
            self.validator_crawler_mp()
56
            self.sentry_status_request(uri=False)
57
        elif trusted_ip_list:
58
            self.all_peers_list = trusted_ip_list
59
            self.active_validator_list = trusted_ip_list
60
            self.active_sentry_api_list = []
61
            self.sentry_status_request(uri=False)
62
        elif trusted_uri_list:
63
            self.all_peers_list = trusted_uri_list
64
            self.active_validator_list = trusted_uri_list
65
            self.active_sentry_api_list = []
66
            self.sentry_status_request(uri=True)
67
        self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
68
        self.active_sentry_api_ip = self.active_sentry_uri.split(':')[1][2:]
69
        if self.is_websocket_client:
70
            self.active_ws_uri = self.active_ws_uri_list[random.randint(a=0, b=len(self.active_ws_uri_list)-1)]
71
            self.active_ws_ip = self.active_ws_uri.split(':')[1][2:]
72
73
    def validator_crawler_mp(self):
74
        checked_peers_list = []
75
        unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
76
77
        while unchecked_peers_list:
78
79
            pool = mp.Pool(processes=10)
80
            validator_outputs = pool.map(self.validator_status_request, unchecked_peers_list)
81
            pool.close()
82
            pool.join()
83
84
            for validator in validator_outputs:
85
                self.all_peers_list.append(validator["ip"])
86
                checked_peers_list.append(validator["ip"])
87
                if validator["validator_status"] == "Active" and not validator["catching_up"]:
88
                    self.active_validator_list.append(validator["ip"])
89
                for connected_node in validator["connected_nodes"]:
90
                    self.all_peers_list.append(connected_node["node_ip"])
91
92
            self.all_peers_list = list(dict.fromkeys(self.all_peers_list))
93
            checked_peers_list = list(dict.fromkeys(checked_peers_list))
94
            self.active_validator_list = list(dict.fromkeys(self.active_validator_list))
95
            unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
96
97
            # If the initial peers do not return any reults, query Tradescan API.
98
            # if not self.active_peers_list:
99
            #     validators = Request(api_url=self.tradescan_node_url, timeout=30).get()
100
            #     for validator in validators:
101
            #         unchecked_peers_list.append(validator["ip"])
102
103
    def validator_status_request(self, validator_ip):
104
        validator_status = {}
105
        try:
106
            process_peer = True
107
            validator_status["ip"] = validator_ip
108
            i = Request(api_url="{}://{}:26657".format(self.http_string, validator_ip), timeout=1).get(path='/net_info')
109
        except (ValueError, ConnectionError, HTTPError, Timeout) as e:
110
            validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Validator INFO - {}".format(e)
111
            validator_status["connected_nodes"] = []
112
            process_peer = False
113
114
        if process_peer:
115
            connected_nodes = []
116
117
            for connected_peer in i["result"]["peers"]:
118
                connected_nodes.append({
119
                    "node_id": connected_peer["node_info"]["id"],
120
                    "node_ip": connected_peer["remote_ip"],
121
                    "node_full": "{}@{}".format(connected_peer["node_info"]["id"], connected_peer["remote_ip"])
122
                })
123
124
            try:
125
                s = Request(api_url="{}://{}:26657".format(self.http_string, validator_ip), timeout=1).get(path='/status')
126
            except (ValueError, ConnectionError, HTTPError, Timeout) as e:
127
                validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Status end point - {}".format(e)
128
                validator_status["connected_nodes"] = []
129
                process_peer = False
130
131
            if process_peer:
132
                validator_status = self.parse_validator_status(request_json=s, validator_ip=validator_ip)
133
                validator_status["validator_status"] = "Active"
134
                validator_status["connected_nodes"] = connected_nodes
135
136
        return validator_status
137
138
    def parse_validator_status(self, request_json, validator_ip):
139
        return {
140
            "moniker": request_json["result"]["node_info"]["moniker"],
141
            "id": request_json["result"]["node_info"]["id"],
142
            "ip": validator_ip,
143
            "version": request_json["result"]["node_info"]["version"],
144
            "network": request_json["result"]["node_info"]["network"],
145
            "latest_block_hash": request_json["result"]["sync_info"]["latest_block_hash"],
146
            "latest_block_height": request_json["result"]["sync_info"]["latest_block_height"],
147
            "latest_block_time": request_json["result"]["sync_info"]["latest_block_time"],
148
            "earliest_block_height": request_json["result"]["sync_info"]["earliest_block_height"],
149
            "earliest_block_time": request_json["result"]["sync_info"]["earliest_block_time"],
150
            "catching_up": request_json["result"]["sync_info"]["catching_up"],
151
            "validator_address": request_json["result"]["validator_info"]["address"],
152
            "validator_pub_key_type": request_json["result"]["validator_info"]["pub_key"]["type"],
153
            "validator_pub_key": request_json["result"]["validator_info"]["pub_key"]["value"],
154
            "validator_voting_power": request_json["result"]["validator_info"]["voting_power"]
155
        }
156
157
    def sentry_status_request(self, uri: bool = False):
158
        for active_validator in self.active_validator_list:
159
            if uri:
160
                try:
161
                    # Have to check the "/get_status" endpoint because the port could be open and the validator fully synced but have the persistence service inactive, shutdown, stopped, or non-repsonsive.
162
                    Request(api_url=active_validator, timeout=1).get(path='/get_status')
163
                    self.active_sentry_api_list.append(active_validator)
164
                    if self.is_websocket_client:
165
                        self.websocket_status_check(ip=active_validator)
166
                except (ValueError, ConnectionError, HTTPError, Timeout):
167
                    pass
168
            else:
169
                # 1318 - Cosmos REST; 5001 - Demex REST; 5002 - Reverse Proxy for Demex and Cosmos REST; Recommended to not use proxy
170
                for port in ["5001"]:
171
                    try:
172
                        # Have to check the "/get_status" endpoint because the port could be open and the validator fully synced but have the persistence service inactive, shutdown, stopped, or non-repsonsive.
173
                        Request(api_url="{}://{}:{}".format(self.http_string, active_validator, port), timeout=1).get(path='/get_status')
174
                        self.active_sentry_api_list.append('{}://{}:{}'.format(self.http_string, active_validator, port))
175
                        if self.is_websocket_client:
176
                            self.websocket_status_check(ip=active_validator)
177
                    except (ValueError, ConnectionError, HTTPError, Timeout):
178
                        pass
179
180
        self.active_sentry_api_list = list(dict.fromkeys(self.active_sentry_api_list))
181
        self.active_ws_uri_list = list(dict.fromkeys(self.active_ws_uri_list))
182
183
    def websocket_status_check(self, ip: str, port: int = 5000):
184
        try:
185
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
186
            location = (ip, port)
187
            result_of_check = s.connect_ex(location)
188
            if result_of_check == 0:
189
                self.active_ws_uri_list.append('{}://{}:{}/ws'.format(self.ws_string, ip, port))
190
            s.close()
191
        except socket.error:
192
            pass
193
194
    def update_validators_and_sentries(self):
195
        threading.Timer(5.0, self.update_validators_and_sentries).start()
196
        self.validator_crawler_mp()
197
        self.sentry_status_request()
198
        self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
199
200
    def tradehub_get_request(self, path: str, params=None):
201
        try:
202
            req = Request(api_url=self.active_sentry_uri, timeout=2).get(path=path, params=params)
203
            return req
204
        except (ValueError, ConnectionError, HTTPError, Timeout):
205
            self.active_sentry_api_list.remove(self.active_sentry_uri)
206
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
207
                self.validator_crawler_mp()
208
                self.sentry_status_request()
209
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
210
                raise ValueError("Provided Sentry API IP addresses are not responding.")
211
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
212
            return self.tradehub_get_request(path=path, params=params)
213
214
    def tradehub_post_request(self, path: str, data=None, json_data=None, params=None):
215
        try:
216
            req = Request(api_url=self.active_sentry_uri, timeout=2).post(path=path, data=data, json_data=json_data, params=params)
217
            return req
218
        except (ValueError, ConnectionError, HTTPError, Timeout):
219
            self.active_sentry_api_list.remove(self.active_sentry_uri)
220
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
221
                self.validator_crawler_mp()
222
                self.sentry_status_request()
223
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
224
                raise ValueError("Provided Sentry API IP addresses are not responding.")
225
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
226
            return self.tradehub_post_request(path=path, data=data, json_data=json_data, params=params)
227