ClientBaseManager.broadcast()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 51
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
eloc 32
nop 2
dl 0
loc 51
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ui.clientmanager.basemanager.ClientBaseManager.broadcast() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
# Isomer - The distributed application framework
5
# ==============================================
6
# Copyright (C) 2011-2020 Heiko 'riot' Weinen <[email protected]> and others.
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU Affero General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
# GNU Affero General Public License for more details.
17
#
18
# You should have received a copy of the GNU Affero General Public License
19
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21
"""
22
23
24
Module clientmanager.basemanager
25
================================
26
27
Basic client management functionality and component set up.
28
29
30
"""
31
32
import json
33
from base64 import b64decode
34
from time import time
35
from uuid import uuid4
36
from socket import socket
37
38
from circuits.net.events import write
39
from isomer.component import ConfigurableComponent, handler
40
from isomer.database import objectmodels
41
from isomer.events.client import clientdisconnect, userlogout, send
42
43
from isomer.logger import debug, critical, verbose, error, warn, network
44
from isomer.ui.clientobjects import Socket, Client, User
45
46
from isomer.ui.clientmanager.encoder import ComplexEncoder
47
48
49
from isomer.events.objectmanager import search, get
50
51
52
class ClientBaseManager(ConfigurableComponent):
53
    """
54
    Handles client connections and requests as well as client-outbound
55
    communication.
56
    """
57
58
    channel = "isomer-web"
59
60
    def __init__(self, *args, **kwargs):
61
        super(ClientBaseManager, self).__init__("CM", *args, **kwargs)
62
63
        self._public_access = True
64
65
        self._public_access_events = [search, get]
66
67
        self._clients = {}
68
        self._sockets = {}
69
        self._users = {}
70
        self._count = 0
71
        self._user_mapping = {}
72
73
        self._erroneous_clients = {}
74
        self._bans = {}
75
76
    @handler("disconnect", channel="wsserver")
77
    def disconnect(self, sock):
78
        """Handles socket disconnections"""
79
80
        self.log("Disconnect ", sock, lvl=debug)
81
82
        try:
83
            if sock in self._sockets:
84
                self.log("Getting socket", lvl=debug)
85
                socket_object = self._sockets[sock]
86
                self.log("Getting clientuuid", lvl=debug)
87
                clientuuid = socket_object.clientuuid
88
                self.log("getting useruuid", lvl=debug)
89
                useruuid = self._clients[clientuuid].useruuid
90
91
                self.log("Firing disconnect event", lvl=debug)
92
                self.fireEvent(
93
                    clientdisconnect(clientuuid, self._clients[clientuuid].useruuid)
94
                )
95
96
                self.log("Logging out relevant client", lvl=debug)
97
                if useruuid is not None:
98
                    self.log("Client was logged in", lvl=debug)
99
                    try:
100
                        self._logout_client(useruuid, clientuuid)
101
                        self.log("Client logged out", useruuid, clientuuid)
102
                    except Exception as e:
103
                        self.log(
104
                            "Couldn't clean up logged in user! ",
105
                            self._users[useruuid],
106
                            e,
107
                            type(e),
108
                            lvl=critical,
109
                        )
110
                self.log("Deleting Client (", self._clients.keys, ")", lvl=debug)
111
                del self._clients[clientuuid]
112
                self.log("Deleting Socket", lvl=debug)
113
                del self._sockets[sock]
114
        except Exception as e:
115
            self.log("Error during disconnect handling: ", e, type(e), lvl=critical)
116
117
    def _logout_client(self, useruuid, clientuuid):
118
        """Log out a client and possibly associated user"""
119
120
        self.log("Cleaning up client of logged in user.", lvl=debug)
121
        try:
122
            self._users[useruuid].clients.remove(clientuuid)
123
            if len(self._users[useruuid].clients) == 0:
124
                self.log("Last client of user disconnected.", lvl=verbose)
125
126
                self.fireEvent(userlogout(useruuid, clientuuid))
127
                del self._users[useruuid]
128
129
            self._clients[clientuuid].useruuid = None
130
        except Exception as e:
131
            self.log(
132
                "Error during client logout: ",
133
                e,
134
                type(e),
135
                clientuuid,
136
                useruuid,
137
                lvl=error,
138
                exc=True,
139
            )
140
141
    @handler("connect", channel="wsserver")
142
    def connect(self, *args):
143
        """Registers new sockets and their clients and allocates uuids"""
144
145
        self.log("Connect ", args, lvl=verbose)
146
147
        try:
148
            sock = args[0]
149
            ip = args[1]
150
151
            if sock not in self._sockets:
152
                self.log("New client connected:", ip, lvl=debug)
153
                clientuuid = str(uuid4())
154
                self._sockets[sock] = Socket(ip, clientuuid)
155
                # Key uuid is temporary, until signin, will then be replaced
156
                #  with account uuid
157
158
                self._clients[clientuuid] = Client(
159
                    sock=sock, ip=ip, clientuuid=clientuuid
160
                )
161
162
                self.log("Client connected:", clientuuid, lvl=debug)
163
            else:
164
                self.log("Old IP reconnected!", lvl=warn)
165
                #     self.fireEvent(write(sock, "Another client is
166
                # connecting from your IP!"))
167
                #     self._sockets[sock] = (ip, uuid.uuid4())
168
        except Exception as e:
169
            self.log("Error during connect: ", e, type(e), lvl=critical)
170
171
    def send(self, event):
172
        """Sends a packet to an already known user or one of his clients by
173
        UUID"""
174
175
        try:
176
            jsonpacket = json.dumps(event.packet, cls=ComplexEncoder)
177
            if event.sendtype == "user":
178
                # TODO: I think, caching a user name <-> uuid table would
179
                # make sense instead of looking this up all the time.
180
181
                if event.uuid is None:
182
                    userobject = objectmodels["user"].find_one({"name": event.username})
183
                else:
184
                    userobject = objectmodels["user"].find_one({"uuid": event.uuid})
185
186
                if userobject is None:
187
                    self.log("No user by that name known.", lvl=warn)
188
                    return
189
                else:
190
                    uuid = userobject.uuid
191
192
                self.log(
193
                    "Broadcasting to all of users clients: '%s': '%s"
194
                    % (uuid, str(event.packet)[:20]),
195
                    lvl=network,
196
                )
197
                if uuid not in self._users:
198
                    self.log("User not connected!", event, lvl=critical)
199
                    return
200
                clients = self._users[uuid].clients
201
202
                for clientuuid in clients:
203
                    sock = self._clients[clientuuid].sock
204
205
                    if not event.raw:
206
                        self.log("Sending json to client", jsonpacket[:50], lvl=network)
207
208
                        self.fireEvent(write(sock, jsonpacket), "wsserver")
209
                    else:
210
                        self.log("Sending raw data to client")
211
                        self.fireEvent(write(sock, event.packet), "wsserver")
212
            else:  # only to client
213
                self.log(
214
                    "Sending to user's client: '%s': '%s'"
215
                    % (event.uuid, jsonpacket[:50]),
216
                    lvl=network,
217
                )
218
                if event.uuid not in self._clients:
219
                    if not event.fail_quiet:
220
                        self.log("Unknown client!", event.uuid, lvl=critical)
221
                        self.log("Clients:", self._clients, lvl=debug)
222
                    return
223
224
                sock = self._clients[event.uuid].sock
225
                if not event.raw:
226
                    self.fireEvent(write(sock, jsonpacket), "wsserver")
227
                else:
228
                    self.log("Sending raw data to client", lvl=network)
229
                    self.fireEvent(write(sock, event.packet[:20]), "wsserver")
230
231
        except Exception as e:
232
            self.log(
233
                "Exception during sending: %s (%s)" % (e, type(e)),
234
                lvl=critical,
235
                exc=True,
236
            )
237
238
    def broadcast(self, event):
239
        """Broadcasts an event either to all users or clients or a given group,
240
        depending on event flag"""
241
        try:
242
            if event.broadcasttype == "users":
243
                if len(self._users) > 0:
244
                    self.log("Broadcasting to all users:", event.content, lvl=network)
245
                    for useruuid in self._users.keys():
246
                        self.fireEvent(send(useruuid, event.content, sendtype="user"))
247
                        # else:
248
                        #    self.log("Not broadcasting, no users connected.",
249
                        #            lvl=debug)
250
251
            elif event.broadcasttype == "clients":
252
                if len(self._clients) > 0:
253
                    self.log(
254
                        "Broadcasting to all clients: ", event.content, lvl=network
255
                    )
256
                    for client in self._clients.values():
257
                        self.fireEvent(write(client.sock, event.content), "wsserver")
258
                        # else:
259
                        #    self.log("Not broadcasting, no clients
260
                        # connected.",
261
                        #            lvl=debug)
262
            elif event.broadcasttype in ("usergroup", "clientgroup"):
263
                if len(event.group) > 0:
264
                    self.log(
265
                        "Broadcasting to group: ", event.content, event.group,
266
                        lvl=network
267
                    )
268
                    for participant in set(event.group):
269
                        if event.broadcasttype == 'usergroup':
270
                            broadcast_type = "user"
271
                        else:
272
                            broadcast_type = "client"
273
274
                        broadcast = send(participant, event.content,
275
                                         sendtype=broadcast_type)
276
                        self.fireEvent(broadcast)
277
            elif event.broadcasttype == "socks":
278
                if len(self._sockets) > 0:
279
                    self.log("Emergency?! Broadcasting to all sockets: ", event.content)
280
                    for sock in self._sockets:
281
                        self.fireEvent(write(sock, event.content), "wsserver")
282
                        # else:
283
                        #    self.log("Not broadcasting, no sockets
284
                        # connected.",
285
                        #            lvl=debug)
286
287
        except Exception as e:
288
            self.log("Error during broadcast: ", e, type(e), lvl=critical)
289
290
    @handler("read", channel="wsserver")
291
    def read(self, *args):
292
        """Handles raw client requests and distributes them to the
293
        appropriate components"""
294
295
        self.log("Beginning new transaction: ", args, lvl=network)
296
297
        sock = msg = user = password = client = client_uuid = \
298
            user_uuid = request_data = request_action = None
299
300
        try:
301
            sock = args[0]  # type: socket
302
            msg = args[1]
303
304
            # self.log("", msg)
305
306
            # TODO: Harmonize this with flood protection and other still to do
307
            #  protections and optimize administrative
308
            ip = sock.getpeername()[0]
309
            if ip in self._bans:
310
                return
311
312
            if self._erroneous_clients.get(ip, 0) > 5:
313
                self.log('Ignoring erroneous client that sent too much garbage before', lvl=warn)
314
                self._bans[ip] = time()
315
                return
316
317
            client_uuid = self._sockets[sock].clientuuid
318
        except Exception as e:
319
            self.log("Receiving error: ", e, type(e), lvl=error, exc=True)
320
            return
321
322
        if sock is None or msg is None:
323
            self.log("Socket or message are invalid!", lvl=error)
324
            return
325
326
        if client_uuid in self._flooding:
327
            return
328
329
        try:
330
            msg = json.loads(msg)
331
            self.log("Message from client received: ", msg, lvl=network)
332
        except Exception as e:
333
            self.log("JSON Decoding failed! %s (%s of %s)" % (msg, e, type(e)))
334
            ip = sock.getpeername()[0]
335
            if ip in self._erroneous_clients:
336
                self._erroneous_clients[ip] += 1
337
            else:
338
                self._erroneous_clients[ip] = 1
339
340
            return
341
342
        try:
343
            request_component = msg["component"]
344
            request_action = msg["action"]
345
        except (KeyError, AttributeError) as e:
346
            self.log("Unpacking error: ", msg, e, type(e), lvl=error)
347
            return
348
349
        if self._check_flood_protection(request_component, request_action, client_uuid):
350
            self.log("Flood protection triggered")
351
            self._flooding[client_uuid] = time()
352
353
        try:
354
            # TODO: Do not unpickle or decode anything from unsafe events
355
            request_data = msg["data"]
356
            if isinstance(request_data, (dict, list)) and "raw" in request_data:
357
                # self.log(request_data['raw'], lvl=critical)
358
                request_data["raw"] = b64decode(request_data["raw"])
359
                # self.log(request_data['raw'])
360
        except (KeyError, AttributeError) as e:
361
            self.log("No payload.", lvl=network)
362
            request_data = None
363
364
        if request_component == "auth":
365
            self._handle_authentication_events(
366
                request_data, request_action, client_uuid, sock
367
            )
368
            return
369
        else:
370
            self._forward_event(
371
                client_uuid, request_component, request_action, request_data
372
            )
373
374
    def _forward_event(
375
        self, client_uuid, request_component, request_action, request_data
376
    ):
377
        """Determine what exactly to do with the event and forward it to its
378
        destination"""
379
380
        try:
381
            client = self._clients[client_uuid]
382
        except KeyError as e:
383
            self.log("Could not get client for request!", e, type(e), lvl=warn)
384
            return
385
386
        if (
387
            request_component in self.anonymous_events
388
            and request_action in self.anonymous_events[request_component]
389
        ):
390
            self.log("Executing anonymous event:", request_component, request_action)
391
            try:
392
                self._handle_anonymous_events(
393
                    request_component, request_action, request_data, client
394
                )
395
            except Exception as e:
396
                self.log("Anonymous request failed:", e, type(e), lvl=warn, exc=True)
397
            return
398
399
        elif request_component in self.authorized_events:
400
            try:
401
                user_uuid = client.useruuid
402
                self.log(
403
                    "Authenticated operation requested by ",
404
                    user_uuid,
405
                    client.config,
406
                    lvl=network,
407
                )
408
            except Exception as e:
409
                self.log("No user_uuid!", e, type(e), lvl=critical)
410
                return
411
412
            self.log("Checking if user is logged in", lvl=verbose)
413
414
            try:
415
                user = self._users[user_uuid]
416
            except KeyError:
417
                if not (
418
                    request_action == "ping"
419
                    and request_component == "isomer.ui.clientmanager.latency"
420
                ):
421
                    self.log("User not logged in.", lvl=warn)
422
                user = None
423
424
            if user is None:
425
                if self._public_access is True:
426
                    self.log("Setting anonymous guest user")
427
                    tempory_account = objectmodels['user']({
428
                        "name": "Guest",
429
                        "uuid": str(uuid4()),
430
                        "roles": ["public"]
431
                    })
432
                    user = User(tempory_account, {}, uuid4())
433
434
            if user is None:
435
                self.log("Public access is deactivated and user is not logged in")
436
                return
437
438
            self.log("Handling event:", request_component, request_action, lvl=verbose)
439
            try:
440
                self._handle_authorized_events(
441
                    request_component, request_action, request_data, user, client
442
                )
443
            except Exception as e:
444
                self.log("User request failed: ", e, type(e), lvl=warn, exc=True)
445
        else:
446
            self.log(
447
                "Invalid event received:", request_component, request_action, lvl=warn
448
            )
449