Passed
Push — master ( a48078...55ce0d )
by Steffen
01:44
created

GameServers.process_packet()   B

Complexity

Conditions 7

Size

Total Lines 35
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7.5375

Importance

Changes 0
Metric Value
cc 7
eloc 19
nop 3
dl 0
loc 35
ccs 14
cts 18
cp 0.7778
crap 7.5375
rs 8
c 0
b 0
f 0
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import logging
4 1
import socket
5 1
from collections import deque
6 1
from time import sleep
7 1
from typing import List
8
9 1
from tw_serverinfo import Network
10 1
from tw_serverinfo.models.game_server import GameServer
11
12
13 1
class GameServers(object):
14
    """GameServers Class containing the logic to request and parse responses from the game servers"""
15 1
    SERVER_CHUNK_SIZE = 50
16 1
    SERVER_CHUNK_SLEEP_MS = 1000
17
18 1
    _game_servers = []
19
20 1
    def fill_server_info(self, game_servers: List[GameServer]) -> None:
21
        """Send the SERVERBROWSE_GETINFO and SERVERBROWSE_GETINFO_64_LEGACY packet to each game server in the
22
        passed game_servers list, parse the response and update the GameServer objects in the game_servers argument
23
24
        :type game_servers: list
25
        :return:
26
        """
27 1
        self._game_servers = game_servers
28
        # create an udp protocol socket
29 1
        sock = socket.socket(family=Network.PROTOCOL_FAMILY, type=socket.SOCK_DGRAM)
30
        # set the socket to non blocking to allow parallel requests
31 1
        sock.setblocking(False)
32
33 1
        i = 0
34 1
        for game_server in self._game_servers:  # type: GameServer
35 1
            i += 1
36 1
            Network.send_packet(sock=sock, data=Network.PACKETS['SERVERBROWSE_GETINFO'], extra_data=b'xe',
37
                                server=game_server)
38 1
            Network.send_packet(sock=sock, data=Network.PACKETS['SERVERBROWSE_GETINFO_64_LEGACY'], extra_data=b'xe',
39
                                server=game_server)
40
41 1
            if i % self.SERVER_CHUNK_SIZE == 0 or i >= len(self._game_servers):
42 1
                duration_without_response = Network.CONNECTION_SLEEP_DURATION
43 1
                sleep(Network.CONNECTION_SLEEP_DURATION / 1000.0)
44
45 1
                while duration_without_response < Network.CONNECTION_TIMEOUT:
46 1
                    if not Network.receive_packet(sock, self._game_servers, self.process_packet):
47
                        # increase the measured duration without a response and sleep for the set duration
48 1
                        duration_without_response += Network.CONNECTION_SLEEP_DURATION
49 1
                        sleep(Network.CONNECTION_SLEEP_DURATION / 1000.0)
50
                    else:
51
                        # if we got a response reset the duration in case we receive multiple packets
52 1
                        duration_without_response = 0
53
54
        # close the socket after checking all servers
55 1
        sock.close()
56
57 1
    def process_packet(self, data: bytes, server: GameServer) -> None:
58
        """Process packet function for
59
         - SERVERBROWSE_COUNT
60
         - SERVERBROWSE_LIST
61
        packets
62
63
        :type data: bytes
64
        :type server: GameServer
65
        :return:
66
        """
67 1
        server.response = True
68 1
        slots = deque(data[14:].split(b"\x00"))
69
70
        # validate the server token
71 1
        token = int(slots.popleft().decode('utf-8'))
72 1
        if int.from_bytes(server.token, byteorder='big') != token & 0xff:
73
            logging.log(logging.INFO, 'token validation failed for GameServer response. GameServer: ' + str(server))
74
            return
75
76 1
        if data[6:6 + 8] == Network.PACKETS['SERVERBROWSE_INFO']:
77
            # vanilla
78 1
            self.parse_vanilla_response(slots, server)
79 1
        elif data[6:6 + 8] == Network.PACKETS['SERVERBROWSE_INFO_64_LEGACY']:
80
            # 64 legacy
81 1
            self.parse_64_legacy_response(slots, server)
82 1
        elif data[6:6 + 8] == Network.PACKETS['SERVERBROWSE_INFO_EXTENDED']:
83 1
            if (token & 0xffff00) >> 8 != (server.request_token[0] << 8) + server.request_token[1]:
84
                # additional server token validation failed
85
                logging.log(logging.INFO, 'request token validation failed for GameServer response. GameServer: '
86
                            + str(server))
87
                return
88
            # extended response, current default of DDNet
89 1
            self.parse_extended_response(slots, server)
90 1
        elif data[6:6 + 8] == Network.PACKETS['SERVERBROWSE_INFO_EXTENDED_MORE']:
91 1
            self.parse_extended_more_response(slots, server)
92
93 1 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
94 1
    def parse_vanilla_response(slots: deque, server: GameServer) -> None:
95
        """Parse the default response of the vanilla client
96
97
        :type slots: deque
98
        :type server: GameServer
99
        :return:
100
        """
101 1
        server.server_type = 'vanilla'
102 1
        server.version = slots.popleft().decode('utf-8')
103 1
        server.name = slots.popleft().decode('utf-8')
104 1
        server.map_name = slots.popleft().decode('utf-8')
105 1
        server.game_type = slots.popleft().decode('utf-8')
106 1
        server.flags = int(slots.popleft().decode('utf-8'))
107 1
        server.num_players = int(slots.popleft().decode('utf-8'))
108 1
        server.max_players = int(slots.popleft().decode('utf-8'))
109 1
        server.num_clients = int(slots.popleft().decode('utf-8'))
110 1
        server.max_clients = int(slots.popleft().decode('utf-8'))
111
112 1
        while len(slots) >= 5:
113
            # no idea what this is, always empty
114 1
            server.append_player({
115
                'name': slots.popleft(),
116
                'clan': slots.popleft(),
117
                'country': int(slots.popleft().decode('utf-8')),
118
                'score': int(slots.popleft().decode('utf-8')),
119
                'ingame': int(slots.popleft().decode('utf-8'))
120
            })
121
122 1 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123 1
    def parse_64_legacy_response(slots: deque, server: GameServer) -> None:
124
        """Parse the 64 slot legacy response
125
126
        :type slots: deque
127
        :type server: GameServer
128
        :return:
129
        """
130 1
        server.server_type = '64_legacy'
131 1
        server.version = slots.popleft().decode('utf-8')
132 1
        server.name = slots.popleft().decode('utf-8')
133 1
        server.map_name = slots.popleft().decode('utf-8')
134 1
        server.game_type = slots.popleft().decode('utf-8')
135 1
        server.flags = int(slots.popleft().decode('utf-8'))
136 1
        server.num_players = int(slots.popleft().decode('utf-8'))
137 1
        server.max_players = int(slots.popleft().decode('utf-8'))
138 1
        server.num_clients = int(slots.popleft().decode('utf-8'))
139 1
        server.max_clients = int(slots.popleft().decode('utf-8'))
140
141 1
        if slots[0] == b'':
142
            # no idea what this is, always empty
143 1
            slots.popleft()
144
145 1
        while len(slots) >= 5:
146 1
            server.append_player({
147
                'name': slots.popleft(),
148
                'clan': slots.popleft(),
149
                'country': int(slots.popleft().decode('utf-8')),
150
                'score': int(slots.popleft().decode('utf-8')),
151
                'ingame': int(slots.popleft().decode('utf-8'))
152
            })
153
154 1
    @staticmethod
155 1
    def parse_extended_response(slots: deque, server: GameServer) -> None:
156
        """Parse the extended server info response(default for DDNet)
157
158
        :type slots: deque
159
        :type server: GameServer
160
        :return:
161
        """
162 1
        server.server_type = 'ext'
163 1
        server.version = slots.popleft().decode('utf-8')
164 1
        server.name = slots.popleft().decode('utf-8')
165 1
        server.map_name = slots.popleft().decode('utf-8')
166 1
        server.map_crc = int(slots.popleft().decode('utf-8'))
167 1
        server.map_size = int(slots.popleft().decode('utf-8'))
168 1
        server.game_type = slots.popleft().decode('utf-8')
169 1
        server.flags = int(slots.popleft().decode('utf-8'))
170 1
        server.num_players = int(slots.popleft().decode('utf-8'))
171 1
        server.max_players = int(slots.popleft().decode('utf-8'))
172 1
        server.num_clients = int(slots.popleft().decode('utf-8'))
173 1
        server.max_clients = int(slots.popleft().decode('utf-8'))
174
175 1 View Code Duplication
        while len(slots) >= 6:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
176
            # no idea what this is, always empty
177 1
            slots.popleft()
178 1
            server.append_player({
179
                'name': slots.popleft(),
180
                'clan': slots.popleft(),
181
                'country': int(slots.popleft().decode('utf-8')),
182
                'score': int(slots.popleft().decode('utf-8')),
183
                'ingame': int(slots.popleft().decode('utf-8'))
184
            })
185
186 1
    @staticmethod
187 1
    def parse_extended_more_response(slots: deque, server: GameServer) -> None:
188
        """Parse the extended server info response(default for DDNet)
189
190
        :type slots: deque
191
        :type server: GameServer
192
        :return:
193
        """
194 1
        slots.popleft()
195
196 1
        server.server_type = 'ext+'
197 1 View Code Duplication
        while len(slots) >= 6:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
198
            # no idea what this is, always empty
199 1
            slots.popleft()
200 1
            server.append_player({
201
                'name': slots.popleft(),
202
                'clan': slots.popleft(),
203
                'country': int(slots.popleft().decode('utf-8')),
204
                'score': int(slots.popleft().decode('utf-8')),
205
                'ingame': int(slots.popleft().decode('utf-8'))
206
            })
207