Passed
Push — main ( b56a82...4c7d8c )
by Yohann
01:58
created

__main__.main()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 0
dl 0
loc 17
rs 9.75
c 0
b 0
f 0
1
import json
2
import logging
3
import sys
4
5
import dotenv
6
from websocket_server import WebsocketServer
7
8
users = {}
9
10
11
def on_client(client, _srv):
12
    print(
13
        '-> Accepted new connection from',
14
        ':'.join(map(str, client['address'])),
15
        'connected'
16
    )
17
18
19
def on_client_left(client, server):
20
    print('-> Client left:', client['address'])
21
    username = users.pop(client['address'])
22
    server.send_message_to_all(
23
        msg=json.dumps(
24
            {
25
                'type': 'logout',
26
                'username': username
27
            }
28
        )
29
    )
30
31
32
def on_message_received(_client, server, data):
33
    print('-> Received message from', _client['address'], ':', data)
34
35
    data = json.JSONDecoder().decode(data)
36
37
    if data['type'] == 'login':
38
        users[_client['address']] = data['username']
39
        server.send_message_to_all(
40
            msg=json.dumps(
41
                {
42
                    'type': 'login',
43
                    'username': data['username']
44
                }
45
            )
46
        )
47
48
        server.send_message(
49
            client=_client,
50
            msg=json.dumps(
51
                {
52
                    'type': 'users',
53
                    'users': list(users.values())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable users does not seem to be defined.
Loading history...
54
                }
55
            )
56
        )
57
58
    elif data['type'] == 'message':
59
        print(users[_client['address']])
60
        server.send_message_to_all(
61
            msg=json.dumps(
62
                {
63
                    'type': 'message',
64
                    'username': users[_client['address']],
65
                    'message': data['content']
66
                }
67
            )
68
        )
69
70
71
def main():
72
    port = sys.argv[1] if (len(sys.argv) > 1) else '8080'
73
74
    if not port.isdigit():
75
        print("Port must be a number.")
76
        return
77
78
    server = WebsocketServer(
79
        host='0.0.0.0',
80
        port=int(port),
81
        loglevel=logging.DEBUG
82
    )
83
84
    server.set_fn_new_client(on_client)
85
    server.set_fn_message_received(on_message_received)
86
    server.set_fn_client_left(on_client_left)
87
    server.run_forever()
88
89
90
if __name__ == '__main__':
91
    main()
92