rtgg_obs.RacetimeObs.process_ws_message()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
nop 3
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
import asyncio
2
from asyncio.events import AbstractEventLoop
3
import json
4
import logging
5
from datetime import datetime, timedelta, timezone
6
import gettext
7
8
import dateutil
9
import websockets
10
from websockets.client import WebSocketClientProtocol
11
from websockets.exceptions import ConnectionClosedError
12
13
import clients.racetime_client as racetime_client
14
from gadgets.coop import Coop
15
from gadgets.qualifier import Qualifier
16
from gadgets.timer import Timer
17
from gadgets.media_player import MediaPlayer
18
from helpers.LogFormatter import LogFormatter
19
from models.race import Race, race_from_dict
20
from models.chat_message import chat_message_from_dict
21
22
_ = gettext.gettext
23
24
25
def script_description():
26
    return (_(
27
        "<p>You've loaded the incorrect script.<br><br>Please remove this file"
28
        "and add 'racetime_obs.py' instead</p>")
29
    )
30
31
32
class RacetimeObs():
33
    logger = logging.Logger("racetime-obs")
34
    race: Race = None
35
    selected_race = "None"
36
    check_race_updates = False
37
    race_changed = False
38
    full_name = ""
39
    category = ""
40
    timer = Timer()
41
    coop = Coop()
42
    qualifier = Qualifier()
43
    media_player: MediaPlayer = None
44
    event_loop: AbstractEventLoop = None
45
    preview_mode = False
46
    timer_decimals: bool = True
47
48
    def __init__(self):
49
        self.event_loop = asyncio.get_event_loop()
50
        self.timer.logger = self.logger
51
        self.coop.logger = self.logger
52
        self.qualifier.logger = self.logger
53
        self.media_player = MediaPlayer()
54
        self.media_player.logger = self.logger
55
56
    def race_update_thread(self):
57
        self.logger.debug("starting race update")
58
        self.event_loop.run_until_complete(self.race_updater())
59
        self.event_loop.run_forever()
60
61
    async def race_updater(self):
62
        headers = {
63
            'User-Agent': "oro-obs-bot_alpha"
64
        }
65
        host = "racetime.gg"
66
67
        while True:
68
            if not self.timer.is_enabled():
69
                await asyncio.sleep(5.0)
70
            else:
71
                if self.race is None and self.selected_race != "None":
72
                    self.race = (
73
                        racetime_client.get_race_by_name(self.selected_race)
74
                    )
75
                    self.logger.debug("got race, trying to connect to ws")
76
                if self.race is not None and self.race.websocket_url != "":
77
                    self.logger.debug("received race, trying to connect to ws")
78
                    try:
79
                        async with websockets.connect(
80
                            "wss://racetime.gg" + self.race.websocket_url,
81
                            host=host, extra_headers=headers
82
                        ) as ws:
83
                            self.race_changed = False
84
                            self.logger.info(
85
                                "connected to websocket:"
86
                                " {self.race.websocket_url}"
87
                            )
88
                            await self.process_messages(ws)
89
                    except (ConnectionRefusedError, ConnectionClosedError):
90
                        self.logger.error("websocket closed unexpectedly.")
91
                        continue
92
            await asyncio.sleep(5.0)
93
94
    async def process_messages(self, ws: WebSocketClientProtocol):
95
        last_pong = datetime.now(timezone.utc)
96
        while True:
97
            try:
98
                if self.race_changed:
99
                    self.logger.info("new race selected")
100
                    self.race_changed = False
101
                    break
102
                message = await asyncio.wait_for(ws.recv(), 5.0)
103
                self.logger.info(f"received message from websocket: {message}")
104
                data = json.loads(message)
105
                last_pong = self.process_ws_message(data, last_pong)
106
            except asyncio.TimeoutError:
107
                await self.ping_ws(ws, last_pong)
108
                continue
109
            except websockets.ConnectionClosed:
110
                self.logger.error("websocket connection closed")
111
                self.race = None
112
                break
113
114
    async def ping_ws(self, ws: WebSocketClientProtocol, last_pong: datetime):
115
        if datetime.now(timezone.utc) - last_pong > timedelta(seconds=20):
116
            await ws.send(json.dumps({"action": "ping"}))
117
118
    def process_ws_message(self, data: dict, last_pong: datetime):
119
        if data.get("type") == "race.data":
120
            self.update_race(data)
121
        elif data.get("type") == "chat.message":
122
            self.process_chat_message(data.get("message"))
123
        elif data.get("type") == "pong":
124
            last_pong = dateutil.parser.parse(data.get("date"))
125
            pass
126
        return last_pong
127
128
    def process_chat_message(self, data: dict):
129
        message = chat_message_from_dict(data)
130
        self.logger.debug(
131
                f"received chat message. chat sounds enabled is "
132
                f"{self.media_player.ping_chat_messages}"
133
            )
134
        if (
135
                self.media_player.ping_chat_messages and
136
                message.is_bot or message.highlight
137
        ):
138
            self.logger.debug(
139
                    f"trying to play {self.media_player.chat_media_file}")
140
            self.event_loop.call_soon_threadsafe(
141
                    self.media_player.play_media_callback,
142
                    self.media_player.chat_media_file,
143
                    self.media_player.monitoring_type
144
                )
145
146
    def update_race(self, data: dict):
147
        r = race_from_dict(data.get("race"))
148
        self.logger.debug(f"race data parsed: {r}")
149
        self.logger.debug(f"current race is {self.race}")
150
        if r is not None and r.version > self.race.version:
151
            self.race = r
152
            self.logger.debug(f"self.race is {self.race}")
153
            self.coop.update_coop_text(self.race, self.full_name)
154
            self.qualifier.update_qualifier_text(self.race, self.full_name)
155
            self.event_loop.call_soon_threadsafe(
156
                    self.media_player.race_updated,
157
                    self.race, self.full_name)
158
159
    def update_logger(
160
        self, enabled: bool, log_to_file: bool, log_file: str, level: str
161
    ):
162
        self.logger.disabled = not enabled
163
        self.logger.handlers = []
164
        handler = logging.StreamHandler()
165
        if log_to_file:
166
            try:
167
                handler = logging.FileHandler(log_file)
168
            except IOError as e:
169
                self.logger.error(f"IOError while opening open {log_file}:")
170
                self.logger.error(f"{e.errno} {e.strerror}")
171
        elif level == "Debug":
172
            handler.setLevel(logging.DEBUG)
173
        elif level == "Info":
174
            handler.setLevel(logging.INFO)
175
        else:
176
            handler.setLevel(logging.ERROR)
177
        handler.setFormatter(LogFormatter())
178
        self.logger.addHandler(handler)
179