Passed
Push — master ( 518ae3...863c37 )
by manny
46s queued 12s
created

rtgg_obs.RacetimeObs.race_updater()   C

Complexity

Conditions 9

Size

Total Lines 32
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 26
nop 1
dl 0
loc 32
rs 6.6666
c 0
b 0
f 0
1
import asyncio
2
import json
3
import logging
4
from datetime import datetime, timedelta, timezone
5
6
import dateutil
7
import websockets
8
from websockets.client import WebSocketClientProtocol
9
from websockets.exceptions import ConnectionClosedError
10
11
import racetime_client
12
from gadgets.coop import Coop
13
from gadgets.qualifier import Qualifier
14
from gadgets.timer import Timer
15
from gadgets.media_player import MediaPlayer
16
from helpers.LogFormatter import LogFormatter
17
from models.race import Race, race_from_dict
18
19
20
def script_description():
21
    return (
22
        "<p>You've loaded the incorrect script.<br><br>Please remove this file"
23
        "and add 'racetime_obs.py' instead</p>"
24
    )
25
26
27
class RacetimeObs():
28
    logger = logging.Logger("racetime-obs")
29
    race: Race = None
30
    selected_race = "None"
31
    check_race_updates = False
32
    race_changed = False
33
    full_name = ""
34
    category = ""
35
    timer = Timer()
36
    coop = Coop()
37
    qualifier = Qualifier()
38
    media_player: MediaPlayer = None
39
    event_loop = asyncio.get_event_loop()
40
41
    def __init__(self):
42
        self.timer.logger = self.logger
43
        self.coop.logger = self.logger
44
        self.qualifier.logger = self.logger
45
        self.media_player = MediaPlayer()
46
        self.media_player.logger = self.logger
47
48
    def race_update_thread(self):
49
        self.logger.debug("starting race update")
50
        self.event_loop.run_until_complete(self.race_updater())
51
        self.event_loop.run_forever()
52
53
    async def race_updater(self):
54
        headers = {
55
            'User-Agent': "oro-obs-bot_alpha"
56
        }
57
        host = "racetime.gg"
58
59
        while True:
60
            if not self.timer.is_enabled():
61
                await asyncio.sleep(5.0)
62
            else:
63
                if self.race is None and self.selected_race != "None":
64
                    self.race = (
65
                        racetime_client.get_race_by_name(self.selected_race)
66
                    )
67
                    self.logger.debug("got race, trying to connect to ws")
68
                if self.race is not None and self.race.websocket_url != "":
69
                    self.logger.debug("received race, trying to connect to ws")
70
                    try:
71
                        async with websockets.connect(
72
                            "wss://racetime.gg" + self.race.websocket_url,
73
                            host=host, extra_headers=headers
74
                        ) as ws:
75
                            self.race_changed = False
76
                            self.logger.info(
77
                                "connected to websocket:"
78
                                " {self.race.websocket_url}"
79
                            )
80
                            await self.process_messages(ws)
81
                    except (ConnectionRefusedError, ConnectionClosedError):
82
                        self.logger.error("websocket closed unexpectedly.")
83
                        continue
84
            await asyncio.sleep(5.0)
85
86
    async def process_messages(self, ws: WebSocketClientProtocol):
87
        last_pong = datetime.now(timezone.utc)
88
        while True:
89
            try:
90
                if self.race_changed:
91
                    self.logger.info("new race selected")
92
                    self.race_changed = False
93
                    break
94
                message = await asyncio.wait_for(ws.recv(), 5.0)
95
                self.logger.info(f"received message from websocket: {message}")
96
                data = json.loads(message)
97
                last_pong = self.process_ws_message(data, last_pong)
98
            except asyncio.TimeoutError:
99
                await self.ping_ws(ws, last_pong)
100
                continue
101
            except websockets.ConnectionClosed:
102
                self.logger.error("websocket connection closed")
103
                self.race = None
104
                break
105
106
    async def ping_ws(self, ws, last_pong):
107
        if datetime.now(timezone.utc) - last_pong > timedelta(seconds=20):
108
            await ws.send(json.dumps({"action": "ping"}))
109
110
    def process_ws_message(self, data: dict, last_pong: datetime):
111
        if data.get("type") == "race.data":
112
            self.update_race(data)
113
        elif data.get("type") == "chat.message":
114
            self.process_chat_message(data.get("message"))
115
        elif data.get("type") == "pong":
116
            last_pong = dateutil.parser.parse(data.get("date"))
117
            pass
118
        return last_pong
119
120
    def process_chat_message(self, data: dict):
121
        self.logger.debug(
122
                f"received chat message. chat sounds enabled is "
123
                f"{self.media_player.ping_chat_messages}"
124
            )
125
        if (
126
                self.media_player.ping_chat_messages and
127
                data.get("is_bot") or data.get("highlight")
128
        ):
129
            self.logger.debug(
130
                    f"trying to play {self.media_player.chat_media_file}")
131
            self.event_loop.call_soon_threadsafe(
132
                    self.media_player.play_media_callback,
133
                    self.media_player.chat_media_file,
134
                    self.media_player.monitoring_type
135
                )
136
137
    def update_race(self, data: dict):
138
        r = race_from_dict(data.get("race"))
139
        self.logger.debug(f"race data parsed: {r}")
140
        self.logger.debug(f"current race is {self.race}")
141
        if r is not None and r.version > self.race.version:
142
            self.race = r
143
            self.logger.debug(f"self.race is {self.race}")
144
            self.coop.update_coop_text(self.race, self.full_name)
145
            self.qualifier.update_qualifier_text(self.race, self.full_name)
146
            self.event_loop.call_soon_threadsafe(
147
                    self.media_player.race_updated,
148
                    self.race, self.full_name)
149
150
    def update_logger(
151
        self, enabled: bool, log_to_file: bool, log_file: str, level: str
152
    ):
153
        self.logger.disabled = not enabled
154
        self.logger.handlers = []
155
        handler = logging.StreamHandler()
156
        if log_to_file:
157
            try:
158
                handler = logging.FileHandler(log_file)
159
            except IOError as e:
160
                self.logger.error(f"IOError while opening open {log_file}:")
161
                self.logger.error(f"{e.errno} {e.strerror}")
162
        elif level == "Debug":
163
            handler.setLevel(logging.DEBUG)
164
        elif level == "Info":
165
            handler.setLevel(logging.INFO)
166
        else:
167
            handler.setLevel(logging.ERROR)
168
        handler.setFormatter(LogFormatter())
169
        self.logger.addHandler(handler)
170