Completed
Push — main ( 01939e...df8969 )
by
unknown
27s queued 12s
created

NetworkCrawlerClient.tradehub_get_request()   B

Complexity

Conditions 6

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 3
dl 0
loc 13
rs 8.6666
c 0
b 0
f 0
1
import multiprocessing as mp
2
from tradehub.utils import Request
3
import random
4
from requests.exceptions import ConnectionError, HTTPError, Timeout
5
import threading
6
7
8
class NetworkCrawlerClient(object):
9
10
    def __init__(self, network: str = "testnet", trusted_ip_list: list = None, trusted_uri_list: list = None):
11
        if network.lower() not in ["main", "mainnet", "test", "testnet"]:
12
            raise ValueError("Parameter network - {} - is not valid, requires main, mainnent, test, or testnet.".format(network))
13
14
        if trusted_ip_list and trusted_uri_list:
15
            raise ValueError("Can't use both IP and URI list, only pass one option.")
16
17
        if trusted_ip_list or trusted_uri_list:
18
            BYPASS_NETWORK_CRAWLER = True
19
        else:
20
            BYPASS_NETWORK_CRAWLER = False
21
22
        if not BYPASS_NETWORK_CRAWLER:
23
            self.seed_peers_list = {
24
                "main": ["54.255.5.46", "175.41.151.35"],
25
                "mainnet": ["54.255.5.46", "175.41.151.35"],
26
                "test": ["54.255.42.175", "52.220.152.108"],
27
                "testnet": ["54.255.42.175", "52.220.152.108"],
28
            }
29
            self.tradescan_node_url = {
30
                "main": "https://switcheo.org/nodes?net=main",
31
                "mainnet": "https://switcheo.org/nodes?net=main",
32
                "test": "https://switcheo.org/nodes?net=test",
33
                "testnet": "https://switcheo.org/nodes?net=test",
34
            }
35
36
            self.all_peers_list = self.seed_peers_list[network.lower()]
37
            self.active_validator_list = []
38
            self.active_sentry_api_list = []
39
            self.validator_crawler_mp()
40
            self.sentry_status_request()
41
        elif trusted_ip_list:
42
            self.all_peers_list = trusted_ip_list
43
            self.active_validator_list = trusted_ip_list
44
            self.active_sentry_api_list = []
45
            self.sentry_status_request()
46
        elif trusted_uri_list:
47
            self.all_peers_list = trusted_uri_list
48
            self.active_validator_list = trusted_uri_list
49
            self.active_sentry_api_list = []
50
            self.sentry_status_request(uri=True)
51
        self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
52
        self.active_sentry_api_ip = self.active_sentry_uri.split(':')[1][2:]
53
54
    def validator_crawler_mp(self):
55
        checked_peers_list = []
56
        unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
57
58
        while unchecked_peers_list:
59
60
            pool = mp.Pool(processes=10)
61
            validator_outputs = pool.map(self.validator_status_request, unchecked_peers_list)
62
            pool.close()
63
            pool.join()
64
65
            for validator in validator_outputs:
66
                self.all_peers_list.append(validator["ip"])
67
                checked_peers_list.append(validator["ip"])
68
                if validator["validator_status"] == "Active" and not validator["catching_up"]:
69
                    self.active_validator_list.append(validator["ip"])
70
                for connected_node in validator["connected_nodes"]:
71
                    self.all_peers_list.append(connected_node["node_ip"])
72
73
            self.all_peers_list = list(dict.fromkeys(self.all_peers_list))
74
            checked_peers_list = list(dict.fromkeys(checked_peers_list))
75
            self.active_validator_list = list(dict.fromkeys(self.active_validator_list))
76
            unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
77
78
            # If the initial peers do not return any reults, query Tradescan API.
79
            # if not self.active_peers_list:
80
            #     validators = Request(api_url=self.tradescan_node_url, timeout=30).get()
81
            #     for validator in validators:
82
            #         unchecked_peers_list.append(validator["ip"])
83
84
    def validator_status_request(self, validator_ip):
85
        validator_status = {}
86
        try:
87
            process_peer = True
88
            validator_status["ip"] = validator_ip
89
            i = Request(api_url="http://{}:26657".format(validator_ip), timeout=1).get(path='/net_info')
90
        except (ValueError, ConnectionError, HTTPError, Timeout) as e:
91
            validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Validator INFO - {}".format(e)
92
            validator_status["connected_nodes"] = []
93
            process_peer = False
94
95
        if process_peer:
96
            connected_nodes = []
97
98
            for connected_peer in i["result"]["peers"]:
99
                connected_nodes.append({
100
                    "node_id": connected_peer["node_info"]["id"],
101
                    "node_ip": connected_peer["remote_ip"],
102
                    "node_full": "{}@{}".format(connected_peer["node_info"]["id"], connected_peer["remote_ip"])
103
                })
104
105
            try:
106
                s = Request(api_url="http://{}:26657".format(validator_ip), timeout=1).get(path='/status')
107
            except (ValueError, ConnectionError, HTTPError, Timeout) as e:
108
                validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Status end point - {}".format(e)
109
                validator_status["connected_nodes"] = []
110
                process_peer = False
111
112
            if process_peer:
113
                validator_status = self.parse_validator_status(request_json=s, validator_ip=validator_ip)
114
                validator_status["validator_status"] = "Active"
115
                validator_status["connected_nodes"] = connected_nodes
116
117
        return validator_status
118
119
    def parse_validator_status(self, request_json, validator_ip):
120
        return {
121
            "moniker": request_json["result"]["node_info"]["moniker"],
122
            "id": request_json["result"]["node_info"]["id"],
123
            "ip": validator_ip,
124
            "version": request_json["result"]["node_info"]["version"],
125
            "network": request_json["result"]["node_info"]["network"],
126
            "latest_block_hash": request_json["result"]["sync_info"]["latest_block_hash"],
127
            "latest_block_height": request_json["result"]["sync_info"]["latest_block_height"],
128
            "latest_block_time": request_json["result"]["sync_info"]["latest_block_time"],
129
            "earliest_block_height": request_json["result"]["sync_info"]["earliest_block_height"],
130
            "earliest_block_time": request_json["result"]["sync_info"]["earliest_block_time"],
131
            "catching_up": request_json["result"]["sync_info"]["catching_up"],
132
            "validator_address": request_json["result"]["validator_info"]["address"],
133
            "validator_pub_key_type": request_json["result"]["validator_info"]["pub_key"]["type"],
134
            "validator_pub_key": request_json["result"]["validator_info"]["pub_key"]["value"],
135
            "validator_voting_power": request_json["result"]["validator_info"]["voting_power"]
136
        }
137
138
    def sentry_status_request(self, uri: bool = False):
139
        for active_validator in self.active_validator_list:
140
            if uri:
141
                try:
142
                    Request(api_url=active_validator, timeout=1).get(path='/get_status')
143
                    self.active_sentry_api_list.append(active_validator)
144
                except (ValueError, ConnectionError, HTTPError, Timeout):
145
                    pass
146
            else:
147
                try:
148
                    Request(api_url="http://{}:5001".format(active_validator), timeout=1).get(path='/get_status')
149
                    self.active_sentry_api_list.append('http://{}:5001'.format(active_validator))
150
                except (ValueError, ConnectionError, HTTPError, Timeout):
151
                    pass
152
153
        self.active_sentry_api_list = list(dict.fromkeys(self.active_sentry_api_list))
154
155
        # if uri:
156
157
        # else:
158
        #     self.active_sentry_api_list = [dict(t) for t in {tuple(d.items()) for d in self.active_sentry_api_list}]
159
160
    def update_validators_and_sentries(self):
161
        threading.Timer(5.0, self.update_validators_and_sentries).start()
162
        self.validator_crawler_mp()
163
        self.sentry_status_request()
164
        self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
165
166
    def tradehub_get_request(self, path: str, params=None):
167
        try:
168
            req = Request(api_url=self.active_sentry_uri, timeout=2).get(path=path, params=params)
169
            return req
170
        except (ValueError, ConnectionError, HTTPError, Timeout):
171
            self.active_sentry_api_list.remove(self.active_sentry_uri)
172
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
173
                self.validator_crawler_mp()
174
                self.sentry_status_request()
175
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
176
                raise ValueError("Provided Sentry API IP addresses are not responding.")
177
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
178
            return self.tradehub_get_request(path=path, params=params)
179
180
    def tradehub_post_request(self, path: str, data=None, json_data=None, params=None):
181
        try:
182
            req = Request(api_url=self.active_sentry_uri, timeout=2).post(path=path, data=data, json_data=json_data, params=params)
183
            return req
184
        except (ValueError, ConnectionError, HTTPError, Timeout):
185
            self.active_sentry_api_list.remove(self.active_sentry_uri)
186
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
187
                self.validator_crawler_mp()
188
                self.sentry_status_request()
189
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
190
                raise ValueError("Provided Sentry API IP addresses are not responding.")
191
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
192
            return self.tradehub_post_request(path=path, data=data, json_data=json_data, params=params)
193