Passed
Pull Request — main (#27)
by Switcheolytics
01:11
created

NetworkCrawlerClient.parse_validator_status()   A

Complexity

Conditions 1

Size

Total Lines 17
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 17
nop 3
dl 0
loc 17
rs 9.55
c 0
b 0
f 0
1
import multiprocessing as mp
2
from tradehub.utils import Request
3
import random
4
from requests.exceptions import ConnectionError, HTTPError, Timeout
5
import socket
6
import threading
7
8
9
class NetworkCrawlerClient(object):
10
11
    def __init__(self, network: str = "testnet", trusted_ip_list: list = None, trusted_uri_list: list = None, is_websocket_client: bool = False):
12
        if network.lower() not in ["main", "mainnet", "test", "testnet"]:
13
            raise ValueError("Parameter network - {} - is not valid, requires main, mainnent, test, or testnet.".format(network))
14
15
        if trusted_ip_list and trusted_uri_list:
16
            raise ValueError("Can't use both IP and URI list, only pass one option.")
17
18
        if trusted_ip_list or trusted_uri_list:
19
            BYPASS_NETWORK_CRAWLER = True
20
        else:
21
            BYPASS_NETWORK_CRAWLER = False
22
23
        self.is_websocket_client = is_websocket_client
24
        self.active_ws_uri_list = []
25
26
        if not BYPASS_NETWORK_CRAWLER:
27
            self.seed_peers_list = {
28
                "main": ["54.255.5.46", "175.41.151.35"],
29
                "mainnet": ["54.255.5.46", "175.41.151.35"],
30
                "test": ["54.255.42.175", "52.220.152.108"],
31
                "testnet": ["54.255.42.175", "52.220.152.108"],
32
            }
33
            self.tradescan_node_url = {
34
                "main": "https://switcheo.org/nodes?net=main",
35
                "mainnet": "https://switcheo.org/nodes?net=main",
36
                "test": "https://switcheo.org/nodes?net=test",
37
                "testnet": "https://switcheo.org/nodes?net=test",
38
            }
39
40
            self.all_peers_list = self.seed_peers_list[network.lower()]
41
            self.active_validator_list = []
42
            self.active_sentry_api_list = []
43
            self.validator_crawler_mp()
44
            self.sentry_status_request(uri=False)
45
        elif trusted_ip_list:
46
            self.all_peers_list = trusted_ip_list
47
            self.active_validator_list = trusted_ip_list
48
            self.active_sentry_api_list = []
49
            self.sentry_status_request(uri=False)
50
        elif trusted_uri_list:
51
            self.all_peers_list = trusted_uri_list
52
            self.active_validator_list = trusted_uri_list
53
            self.active_sentry_api_list = []
54
            self.sentry_status_request(uri=True)
55
        self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
56
        self.active_sentry_api_ip = self.active_sentry_uri.split(':')[1][2:]
57
        if self.is_websocket_client:
58
            self.active_ws_uri = self.active_ws_uri_list[random.randint(a=0, b=len(self.active_ws_uri_list)-1)]
59
            self.active_ws_ip = self.active_ws_uri.split(':')[1][2:]
60
61
    def validator_crawler_mp(self):
62
        checked_peers_list = []
63
        unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
64
65
        while unchecked_peers_list:
66
67
            pool = mp.Pool(processes=10)
68
            validator_outputs = pool.map(self.validator_status_request, unchecked_peers_list)
69
            pool.close()
70
            pool.join()
71
72
            for validator in validator_outputs:
73
                self.all_peers_list.append(validator["ip"])
74
                checked_peers_list.append(validator["ip"])
75
                if validator["validator_status"] == "Active" and not validator["catching_up"]:
76
                    self.active_validator_list.append(validator["ip"])
77
                for connected_node in validator["connected_nodes"]:
78
                    self.all_peers_list.append(connected_node["node_ip"])
79
80
            self.all_peers_list = list(dict.fromkeys(self.all_peers_list))
81
            checked_peers_list = list(dict.fromkeys(checked_peers_list))
82
            self.active_validator_list = list(dict.fromkeys(self.active_validator_list))
83
            unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
84
85
            # If the initial peers do not return any reults, query Tradescan API.
86
            # if not self.active_peers_list:
87
            #     validators = Request(api_url=self.tradescan_node_url, timeout=30).get()
88
            #     for validator in validators:
89
            #         unchecked_peers_list.append(validator["ip"])
90
91
    def validator_status_request(self, validator_ip):
92
        validator_status = {}
93
        try:
94
            process_peer = True
95
            validator_status["ip"] = validator_ip
96
            i = Request(api_url="http://{}:26657".format(validator_ip), timeout=1).get(path='/net_info')
97
        except (ValueError, ConnectionError, HTTPError, Timeout) as e:
98
            validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Validator INFO - {}".format(e)
99
            validator_status["connected_nodes"] = []
100
            process_peer = False
101
102
        if process_peer:
103
            connected_nodes = []
104
105
            for connected_peer in i["result"]["peers"]:
106
                connected_nodes.append({
107
                    "node_id": connected_peer["node_info"]["id"],
108
                    "node_ip": connected_peer["remote_ip"],
109
                    "node_full": "{}@{}".format(connected_peer["node_info"]["id"], connected_peer["remote_ip"])
110
                })
111
112
            try:
113
                s = Request(api_url="http://{}:26657".format(validator_ip), timeout=1).get(path='/status')
114
            except (ValueError, ConnectionError, HTTPError, Timeout) as e:
115
                validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Status end point - {}".format(e)
116
                validator_status["connected_nodes"] = []
117
                process_peer = False
118
119
            if process_peer:
120
                validator_status = self.parse_validator_status(request_json=s, validator_ip=validator_ip)
121
                validator_status["validator_status"] = "Active"
122
                validator_status["connected_nodes"] = connected_nodes
123
124
        return validator_status
125
126
    def parse_validator_status(self, request_json, validator_ip):
127
        return {
128
            "moniker": request_json["result"]["node_info"]["moniker"],
129
            "id": request_json["result"]["node_info"]["id"],
130
            "ip": validator_ip,
131
            "version": request_json["result"]["node_info"]["version"],
132
            "network": request_json["result"]["node_info"]["network"],
133
            "latest_block_hash": request_json["result"]["sync_info"]["latest_block_hash"],
134
            "latest_block_height": request_json["result"]["sync_info"]["latest_block_height"],
135
            "latest_block_time": request_json["result"]["sync_info"]["latest_block_time"],
136
            "earliest_block_height": request_json["result"]["sync_info"]["earliest_block_height"],
137
            "earliest_block_time": request_json["result"]["sync_info"]["earliest_block_time"],
138
            "catching_up": request_json["result"]["sync_info"]["catching_up"],
139
            "validator_address": request_json["result"]["validator_info"]["address"],
140
            "validator_pub_key_type": request_json["result"]["validator_info"]["pub_key"]["type"],
141
            "validator_pub_key": request_json["result"]["validator_info"]["pub_key"]["value"],
142
            "validator_voting_power": request_json["result"]["validator_info"]["voting_power"]
143
        }
144
145
    def sentry_status_request(self, uri: bool = False):
146
        for active_validator in self.active_validator_list:
147
            if uri:
148
                try:
149
                    # Have to check the "/get_status" endpoint because the port could be open and the validator fully synced but have the persistence service inactive, shutdown, stopped, or non-repsonsive.
150
                    Request(api_url=active_validator, timeout=1).get(path='/get_status')
151
                    self.active_sentry_api_list.append(active_validator)
152
                except (ValueError, ConnectionError, HTTPError, Timeout):
153
                    pass
154
            else:
155
                # 1318 - Cosmos REST; 5001 - Demex REST; 5002 - Reverse Proxy for Demex and Cosmos REST; Recommended to not use proxy
156
                for port in ["5001"]:
157
                    try:
158
                        # Have to check the "/get_status" endpoint because the port could be open and the validator fully synced but have the persistence service inactive, shutdown, stopped, or non-repsonsive.
159
                        Request(api_url="http://{}:{}".format(active_validator, port), timeout=1).get(path='/get_status')
160
                        self.active_sentry_api_list.append('http://{}:{}'.format(active_validator, port))
161
                    except (ValueError, ConnectionError, HTTPError, Timeout):
162
                        pass
163
            if self.is_websocket_client:
164
                port = 5000
165
                try:
166
                    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
167
                    location = (active_validator, port)
168
                    result_of_check = s.connect_ex(location)
169
                    if result_of_check == 0:
170
                        self.active_ws_uri_list.append('ws://{}:{}/ws'.format(active_validator, port))
171
                    s.close()
172
                except socket.error:
173
                    pass
174
175
        self.active_sentry_api_list = list(dict.fromkeys(self.active_sentry_api_list))
176
        self.active_ws_uri_list = list(dict.fromkeys(self.active_ws_uri_list))
177
178
    def update_validators_and_sentries(self):
179
        threading.Timer(5.0, self.update_validators_and_sentries).start()
180
        self.validator_crawler_mp()
181
        self.sentry_status_request()
182
        self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
183
184
    def tradehub_get_request(self, path: str, params=None):
185
        try:
186
            req = Request(api_url=self.active_sentry_uri, timeout=2).get(path=path, params=params)
187
            return req
188
        except (ValueError, ConnectionError, HTTPError, Timeout):
189
            self.active_sentry_api_list.remove(self.active_sentry_uri)
190
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
191
                self.validator_crawler_mp()
192
                self.sentry_status_request()
193
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
194
                raise ValueError("Provided Sentry API IP addresses are not responding.")
195
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
196
            return self.tradehub_get_request(path=path, params=params)
197
198
    def tradehub_post_request(self, path: str, data=None, json_data=None, params=None):
199
        try:
200
            req = Request(api_url=self.active_sentry_uri, timeout=2).post(path=path, data=data, json_data=json_data, params=params)
201
            return req
202
        except (ValueError, ConnectionError, HTTPError, Timeout):
203
            self.active_sentry_api_list.remove(self.active_sentry_uri)
204
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
205
                self.validator_crawler_mp()
206
                self.sentry_status_request()
207
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
208
                raise ValueError("Provided Sentry API IP addresses are not responding.")
209
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
210
            return self.tradehub_post_request(path=path, data=data, json_data=json_data, params=params)
211