Completed
Push — main ( 6da681...4fccdc )
by Switcheolytics
12s queued 11s
created

NetworkCrawlerClient.validator_status_request()   B

Complexity

Conditions 6

Size

Total Lines 34
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 28
nop 2
dl 0
loc 34
rs 8.2746
c 0
b 0
f 0
1
import multiprocessing as mp
2
from tradehub.utils import Request
3
import random
4
from requests.exceptions import ConnectionError, HTTPError, Timeout
5
import threading
6
7
8
class NetworkCrawlerClient(object):
9
10
    def __init__(self, network: str = "testnet"):
11
        if network.lower() not in ["main", "mainnet", "test", "testnet"]:
12
            raise ValueError("Parameter network - {} - is not valid, requires main, mainnent, test, or testnet.".format(network))
13
14
        self.seed_peers_list = {
15
            "main": ["54.255.5.46", "175.41.151.35"],
16
            "mainnet": ["54.255.5.46", "175.41.151.35"],
17
            "test": ["54.255.42.175", "52.220.152.108"],
18
            "testnet": ["54.255.42.175", "52.220.152.108"],
19
        }
20
        self.tradescan_node_url = {
21
            "main": "https://switcheo.org/nodes?net=main",
22
            "mainnet": "https://switcheo.org/nodes?net=main",
23
            "test": "https://switcheo.org/nodes?net=test",
24
            "testnet": "https://switcheo.org/nodes?net=test",
25
        }
26
27
        self.all_peers_list = self.seed_peers_list[network.lower()]
28
        self.active_validator_list = []
29
        self.active_sentry_api_list = []
30
        self.validator_crawler_mp()
31
        self.sentry_status_request()
32
        self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
33
34
    def validator_crawler_mp(self):
35
        checked_peers_list = []
36
        unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
37
38
        while unchecked_peers_list:
39
40
            pool = mp.Pool(processes=10)
41
            validator_outputs = pool.map(self.validator_status_request, unchecked_peers_list)
42
            pool.close()
43
            pool.join()
44
45
            for validator in validator_outputs:
46
                self.all_peers_list.append(validator["ip"])
47
                checked_peers_list.append(validator["ip"])
48
                if validator["validator_status"] == "Active" and not validator["catching_up"]:
49
                    self.active_validator_list.append(validator["ip"])
50
                for connected_node in validator["connected_nodes"]:
51
                    self.all_peers_list.append(connected_node["node_ip"])
52
53
            self.all_peers_list = list(dict.fromkeys(self.all_peers_list))
54
            checked_peers_list = list(dict.fromkeys(checked_peers_list))
55
            self.active_validator_list = list(dict.fromkeys(self.active_validator_list))
56
            unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
57
58
            # If the initial peers do not return any reults, query Tradescan API.
59
            # if not self.active_peers_list:
60
            #     validators = Request(api_url=self.tradescan_node_url, timeout=30).get()
61
            #     for validator in validators:
62
            #         unchecked_peers_list.append(validator["ip"])
63
64
    def validator_status_request(self, validator_ip):
65
        validator_status = {}
66
        try:
67
            process_peer = True
68
            validator_status["ip"] = validator_ip
69
            i = Request(api_url="http://{}:26657".format(validator_ip), timeout=1).get(path='/net_info')
70
        except (ValueError, ConnectionError, HTTPError, Timeout) as e:
71
            validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Validator INFO - {}".format(e)
72
            validator_status["connected_nodes"] = []
73
            process_peer = False
74
75
        if process_peer:
76
            connected_nodes = []
77
78
            for connected_peer in i["result"]["peers"]:
79
                connected_nodes.append({
80
                    "node_id": connected_peer["node_info"]["id"],
81
                    "node_ip": connected_peer["remote_ip"],
82
                    "node_full": "{}@{}".format(connected_peer["node_info"]["id"], connected_peer["remote_ip"])
83
                })
84
85
            try:
86
                s = Request(api_url="http://{}:26657".format(validator_ip), timeout=1).get(path='/status')
87
            except (ValueError, ConnectionError, HTTPError, Timeout) as e:
88
                validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Status end point - {}".format(e)
89
                validator_status["connected_nodes"] = []
90
                process_peer = False
91
92
            if process_peer:
93
                validator_status = self.parse_validator_status(request_json=s, validator_ip=validator_ip)
94
                validator_status["validator_status"] = "Active"
95
                validator_status["connected_nodes"] = connected_nodes
96
97
        return validator_status
98
99
    def parse_validator_status(self, request_json, validator_ip):
100
        return {
101
            "moniker": request_json["result"]["node_info"]["moniker"],
102
            "id": request_json["result"]["node_info"]["id"],
103
            "ip": validator_ip,
104
            "version": request_json["result"]["node_info"]["version"],
105
            "network": request_json["result"]["node_info"]["network"],
106
            "latest_block_hash": request_json["result"]["sync_info"]["latest_block_hash"],
107
            "latest_block_height": request_json["result"]["sync_info"]["latest_block_height"],
108
            "latest_block_time": request_json["result"]["sync_info"]["latest_block_time"],
109
            "earliest_block_height": request_json["result"]["sync_info"]["earliest_block_height"],
110
            "earliest_block_time": request_json["result"]["sync_info"]["earliest_block_time"],
111
            "catching_up": request_json["result"]["sync_info"]["catching_up"],
112
            "validator_address": request_json["result"]["validator_info"]["address"],
113
            "validator_pub_key_type": request_json["result"]["validator_info"]["pub_key"]["type"],
114
            "validator_pub_key": request_json["result"]["validator_info"]["pub_key"]["value"],
115
            "validator_voting_power": request_json["result"]["validator_info"]["voting_power"]
116
        }
117
118
    def sentry_status_request(self):
119
        for active_validator in self.active_validator_list:
120
            try:
121
                Request(api_url="http://{}:5001".format(active_validator), timeout=1).get(path='/get_status')
122
                self.active_sentry_api_list.append(active_validator)
123
            except (ValueError, ConnectionError, HTTPError, Timeout):
124
                pass
125
        self.active_sentry_api_list = list(dict.fromkeys(self.active_sentry_api_list))
126
127
    def update_validators_and_sentries(self):
128
        threading.Timer(5.0, self.update_validators_and_sentries).start()
129
        self.validator_crawler_mp()
130
        self.sentry_status_request()
131
        self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
132
133
    def tradehub_get_request(self, path: str, params=None):
134
        try:
135
            req = Request(api_url="http://{}:5001".format(self.active_sentry_api_ip), timeout=2).get(path=path, params=params)
136
            return req
137
        except (ValueError, ConnectionError, HTTPError, Timeout):
138
            self.active_sentry_api_list.remove(self.active_sentry_api_ip)
139
            if not self.active_sentry_api_list:
140
                self.validator_crawler_mp()
141
                self.sentry_status_request()
142
            self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
143
            return self.tradehub_get_request(path=path, params=params)
144
145
    def tradehub_post_request(self, path: str, data=None, json_data=None, params=None):
146
        try:
147
            req = Request(api_url="http://{}:5001".format(self.active_sentry_api_ip), timeout=2).post(self, path=path, data=data, json_data=json_data, params=params)
148
            return req
149
        except (ValueError, ConnectionError, HTTPError, Timeout):
150
            self.active_sentry_api_list.remove(self.active_sentry_api_ip)
151
            if not self.active_sentry_api_list:
152
                self.validator_crawler_mp()
153
                self.sentry_status_request()
154
            self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
155
            return self.tradehub_post_request(path=path, data=data, json_data=json_data, params=params)
156