Passed
Pull Request — main (#27)
by Switcheolytics
52s
created

NetworkCrawlerClient.sentry_status_request()   C

Complexity

Conditions 10

Size

Total Lines 36
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 30
nop 2
dl 0
loc 36
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like tradehub.decentralized_client.NetworkCrawlerClient.sentry_status_request() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import multiprocessing as mp
2
from tradehub.utils import Request
3
import random
4
from requests.exceptions import ConnectionError, HTTPError, Timeout
5
import socket
6
import threading
7
8
9
class NetworkCrawlerClient(object):
10
11
    def __init__(self,
12
                 network: str = "testnet",
13
                 trusted_ip_list: list = None,
14
                 trusted_uri_list: list = None,
15
                 is_secure: bool = False,
16
                 is_websocket_client: bool = False):
17
        if network.lower() not in ["main", "mainnet", "test", "testnet"]:
18
            raise ValueError("Parameter network - {} - is not valid, requires main, mainnent, test, or testnet.".format(network))
19
20
        if trusted_ip_list and trusted_uri_list:
21
            raise ValueError("Can't use both IP and URI list, only pass one option.")
22
23
        if trusted_ip_list or trusted_uri_list:
24
            BYPASS_NETWORK_CRAWLER = True
25
        else:
26
            BYPASS_NETWORK_CRAWLER = False
27
28
        self.is_secure = is_secure
29
        if self.is_secure:
30
            self.http_string = 'https'
31
            self.ws_string = 'wss'
32
        else:
33
            self.http_string = 'http'
34
            self.ws_string = 'ws'
35
        self.is_websocket_client = is_websocket_client
36
        self.active_ws_uri_list = []
37
38
        if not BYPASS_NETWORK_CRAWLER:
39
            self.seed_peers_list = {
40
                "main": ["54.255.5.46", "175.41.151.35"],
41
                "mainnet": ["54.255.5.46", "175.41.151.35"],
42
                "test": ["54.255.42.175", "52.220.152.108"],
43
                "testnet": ["54.255.42.175", "52.220.152.108"],
44
            }
45
            self.tradescan_node_url = {
46
                "main": "https://switcheo.org/nodes?net=main",
47
                "mainnet": "https://switcheo.org/nodes?net=main",
48
                "test": "https://switcheo.org/nodes?net=test",
49
                "testnet": "https://switcheo.org/nodes?net=test",
50
            }
51
52
            self.all_peers_list = self.seed_peers_list[network.lower()]
53
            self.active_validator_list = []
54
            self.active_sentry_api_list = []
55
            self.validator_crawler_mp()
56
            self.sentry_status_request(uri=False)
57
        elif trusted_ip_list:
58
            self.all_peers_list = trusted_ip_list
59
            self.active_validator_list = trusted_ip_list
60
            self.active_sentry_api_list = []
61
            self.sentry_status_request(uri=False)
62
        elif trusted_uri_list:
63
            self.all_peers_list = trusted_uri_list
64
            self.active_validator_list = trusted_uri_list
65
            self.active_sentry_api_list = []
66
            self.sentry_status_request(uri=True)
67
        self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
68
        self.active_sentry_api_ip = self.active_sentry_uri.split(':')[1][2:]
69
        if self.is_websocket_client:
70
            self.active_ws_uri = self.active_ws_uri_list[random.randint(a=0, b=len(self.active_ws_uri_list)-1)]
71
            self.active_ws_ip = self.active_ws_uri.split(':')[1][2:]
72
73
    def validator_crawler_mp(self):
74
        checked_peers_list = []
75
        unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
76
77
        while unchecked_peers_list:
78
79
            pool = mp.Pool(processes=10)
80
            validator_outputs = pool.map(self.validator_status_request, unchecked_peers_list)
81
            pool.close()
82
            pool.join()
83
84
            for validator in validator_outputs:
85
                self.all_peers_list.append(validator["ip"])
86
                checked_peers_list.append(validator["ip"])
87
                if validator["validator_status"] == "Active" and not validator["catching_up"]:
88
                    self.active_validator_list.append(validator["ip"])
89
                for connected_node in validator["connected_nodes"]:
90
                    self.all_peers_list.append(connected_node["node_ip"])
91
92
            self.all_peers_list = list(dict.fromkeys(self.all_peers_list))
93
            checked_peers_list = list(dict.fromkeys(checked_peers_list))
94
            self.active_validator_list = list(dict.fromkeys(self.active_validator_list))
95
            unchecked_peers_list = list(set(self.all_peers_list) - set(checked_peers_list))
96
97
            # If the initial peers do not return any reults, query Tradescan API.
98
            # if not self.active_peers_list:
99
            #     validators = Request(api_url=self.tradescan_node_url, timeout=30).get()
100
            #     for validator in validators:
101
            #         unchecked_peers_list.append(validator["ip"])
102
103
    def validator_status_request(self, validator_ip):
104
        validator_status = {}
105
        try:
106
            process_peer = True
107
            validator_status["ip"] = validator_ip
108
            i = Request(api_url="{}://{}:26657".format(self.http_string, validator_ip), timeout=1).get(path='/net_info')
109
        except (ValueError, ConnectionError, HTTPError, Timeout) as e:
110
            validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Validator INFO - {}".format(e)
111
            validator_status["connected_nodes"] = []
112
            process_peer = False
113
114
        if process_peer:
115
            connected_nodes = []
116
117
            for connected_peer in i["result"]["peers"]:
118
                connected_nodes.append({
119
                    "node_id": connected_peer["node_info"]["id"],
120
                    "node_ip": connected_peer["remote_ip"],
121
                    "node_full": "{}@{}".format(connected_peer["node_info"]["id"], connected_peer["remote_ip"])
122
                })
123
124
            try:
125
                s = Request(api_url="{}://{}:26657".format(self.http_string, validator_ip), timeout=1).get(path='/status')
126
            except (ValueError, ConnectionError, HTTPError, Timeout) as e:
127
                validator_status["validator_status"] = "Unknown - Cannot Connect to Retrieve Status end point - {}".format(e)
128
                validator_status["connected_nodes"] = []
129
                process_peer = False
130
131
            if process_peer:
132
                validator_status = self.parse_validator_status(request_json=s, validator_ip=validator_ip)
133
                validator_status["validator_status"] = "Active"
134
                validator_status["connected_nodes"] = connected_nodes
135
136
        return validator_status
137
138
    def parse_validator_status(self, request_json, validator_ip):
139
        return {
140
            "moniker": request_json["result"]["node_info"]["moniker"],
141
            "id": request_json["result"]["node_info"]["id"],
142
            "ip": validator_ip,
143
            "version": request_json["result"]["node_info"]["version"],
144
            "network": request_json["result"]["node_info"]["network"],
145
            "latest_block_hash": request_json["result"]["sync_info"]["latest_block_hash"],
146
            "latest_block_height": request_json["result"]["sync_info"]["latest_block_height"],
147
            "latest_block_time": request_json["result"]["sync_info"]["latest_block_time"],
148
            "earliest_block_height": request_json["result"]["sync_info"]["earliest_block_height"],
149
            "earliest_block_time": request_json["result"]["sync_info"]["earliest_block_time"],
150
            "catching_up": request_json["result"]["sync_info"]["catching_up"],
151
            "validator_address": request_json["result"]["validator_info"]["address"],
152
            "validator_pub_key_type": request_json["result"]["validator_info"]["pub_key"]["type"],
153
            "validator_pub_key": request_json["result"]["validator_info"]["pub_key"]["value"],
154
            "validator_voting_power": request_json["result"]["validator_info"]["voting_power"]
155
        }
156
157
    def sentry_status_request(self, uri: bool = False):
158
        for active_validator in self.active_validator_list:
159
            if uri:
160
                try:
161
                    # Have to check the "/get_status" endpoint because the port could be open and the validator fully synced but have the persistence service inactive, shutdown, stopped, or non-repsonsive.
162
                    Request(api_url=active_validator, timeout=1).get(path='/get_status')
163
                    self.active_sentry_api_list.append(active_validator)
164
                    if self.is_websocket_client:
165
                        self.websocket_status_check(ip=active_validator)
166
                except (ValueError, ConnectionError, HTTPError, Timeout):
167
                    pass
168
            else:
169
                # 1318 - Cosmos REST; 5001 - Demex REST; 5002 - Reverse Proxy for Demex and Cosmos REST; Recommended to not use proxy
170
                for port in ["5001"]:
171
                    try:
172
                        # Have to check the "/get_status" endpoint because the port could be open and the validator fully synced but have the persistence service inactive, shutdown, stopped, or non-repsonsive.
173
                        Request(api_url="{}://{}:{}".format(self.http_string, active_validator, port), timeout=1).get(path='/get_status')
174
                        self.active_sentry_api_list.append('{}://{}:{}'.format(self.http_string, active_validator, port))
175
                        if self.is_websocket_client:
176
                            self.websocket_status_check(ip=active_validator)
177
                    except (ValueError, ConnectionError, HTTPError, Timeout):
178
                        pass
179
180
        self.active_sentry_api_list = list(dict.fromkeys(self.active_sentry_api_list))
181
        self.active_ws_uri_list = list(dict.fromkeys(self.active_ws_uri_list))
182
183
        def websocket_status_check(self, ip: str, port: int = 5000):
184
            try:
185
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
186
                location = (active_validator, port)
0 ignored issues
show
introduced by
The variable active_validator does not seem to be defined in case the for loop on line 158 is not entered. Are you sure this can never be the case?
Loading history...
187
                result_of_check = s.connect_ex(location)
188
                if result_of_check == 0:
189
                    self.active_ws_uri_list.append('{}://{}:{}/ws'.format(self.ws_string, active_validator, port))
190
                s.close()
191
            except socket.error:
192
                pass
193
194
    def update_validators_and_sentries(self):
195
        threading.Timer(5.0, self.update_validators_and_sentries).start()
196
        self.validator_crawler_mp()
197
        self.sentry_status_request()
198
        self.active_sentry_api_ip = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
199
200
    def tradehub_get_request(self, path: str, params=None):
201
        try:
202
            req = Request(api_url=self.active_sentry_uri, timeout=2).get(path=path, params=params)
203
            return req
204
        except (ValueError, ConnectionError, HTTPError, Timeout):
205
            self.active_sentry_api_list.remove(self.active_sentry_uri)
206
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
207
                self.validator_crawler_mp()
208
                self.sentry_status_request()
209
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
210
                raise ValueError("Provided Sentry API IP addresses are not responding.")
211
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
212
            return self.tradehub_get_request(path=path, params=params)
213
214
    def tradehub_post_request(self, path: str, data=None, json_data=None, params=None):
215
        try:
216
            req = Request(api_url=self.active_sentry_uri, timeout=2).post(path=path, data=data, json_data=json_data, params=params)
217
            return req
218
        except (ValueError, ConnectionError, HTTPError, Timeout):
219
            self.active_sentry_api_list.remove(self.active_sentry_uri)
220
            if not self.active_sentry_api_list and not self.BYPASS_NETWORK_CRAWLER:
221
                self.validator_crawler_mp()
222
                self.sentry_status_request()
223
            elif not self.active_sentry_api_list and self.BYPASS_NETWORK_CRAWLER:
224
                raise ValueError("Provided Sentry API IP addresses are not responding.")
225
            self.active_sentry_uri = self.active_sentry_api_list[random.randint(a=0, b=len(self.active_sentry_api_list)-1)]
226
            return self.tradehub_post_request(path=path, data=data, json_data=json_data, params=params)
227