Passed
Push — master ( b1826e...729ce0 )
by manny
01:49
created

rtgg_obs.RacetimeObs.ping_ws()   A

Complexity

Conditions 2

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 2
nop 3
1
import asyncio
2
import json
3
import logging
4
from datetime import datetime, timedelta, timezone
5
from typing import Any
6
7
import dateutil
8
import websockets
9
from websockets.client import WebSocketClientProtocol
10
11
import racetime_client
12
from gadgets.coop import Coop
13
from gadgets.qualifier import Qualifier
14
from gadgets.timer import Timer
15
from helpers.LogFormatter import LogFormatter
16
from models.race import Race, race_from_dict
17
18
19
def script_description():
20
    return (
21
        "<p>You've loaded the incorrect script.<br><br>Please remove this file"
22
        "and add 'racetime_obs.py' instead</p>"
23
    )
24
25
26
class RacetimeObs():
27
    logger = logging.Logger("racetime-obs")
28
    race: Race = None
29
    selected_race = ""
30
    check_race_updates = False
31
    race_changed = False
32
    full_name = ""
33
    category = ""
34
    timer = Timer()
35
    coop = Coop()
36
    qualifier = Qualifier()
37
38
    def __init__(self):
39
        self.timer.logger = self.logger
40
        self.coop.logger = self.logger
41
        self.qualifier.logger = self.logger
42
43
    def race_update_thread(self):
44
        self.logger.debug("starting race update")
45
        race_event_loop = asyncio.new_event_loop()
46
        race_event_loop.run_until_complete(self.race_updater())
47
        race_event_loop.run_forever()
48
49
    async def race_updater(self):
50
        headers = {
51
            'User-Agent': "oro-obs-bot_alpha"
52
        }
53
        host = "racetime.gg"
54
55
        while True:
56
            if not self.timer.is_enabled():
57
                await asyncio.sleep(5.0)
58
            else:
59
                if self.race is None and self.selected_race != "":
60
                    self.race = (
61
                        racetime_client.get_race_by_name(self.selected_race)
62
                    )
63
                if self.race is not None and self.race.websocket_url != "":
64
                    async with websockets.connect(
65
                        "wss://racetime.gg" + self.race.websocket_url,
66
                        host=host, extra_headers=headers
67
                    ) as ws:
68
                        self.race_changed = False
69
                        self.logger.info(
70
                            "connected to websocket:"
71
                            " {self.race.websocket_url}"
72
                        )
73
                        await self.process_messages(ws)
74
            await asyncio.sleep(5.0)
75
76
    async def process_messages(self, ws: WebSocketClientProtocol):
77
        last_pong = datetime.now(timezone.utc)
78
        while True:
79
            try:
80
                if self.race_changed:
81
                    self.logger.info("new race selected")
82
                    self.race_changed = False
83
                    break
84
                message = await asyncio.wait_for(ws.recv(), 5.0)
85
                self.logger.info(f"received message from websocket: {message}")
86
                data = json.loads(message)
87
                last_pong = self.process_ws_message(data, last_pong)
88
            except asyncio.TimeoutError:
89
                await self.ping_ws(ws, last_pong)
90
            except websockets.ConnectionClosed:
91
                self.logger.error("websocket connection closed")
92
                self.race = None
93
                break
94
95
    async def ping_ws(self, ws, last_pong):
96
        if datetime.now(timezone.utc) - last_pong > timedelta(seconds=20):
97
            await ws.send(json.dumps({"action": "ping"}))
98
99
    def process_ws_message(self, data, last_pong):
100
        if data.get("type") == "race.data":
101
            r = race_from_dict(data.get("race"))
102
            self.logger.debug(f"race data parsed: {r}")
103
            self.logger.debug(f"current race is {self.race}")
104
            if r is not None and r.version > self.race.version:
105
                self.race = r
106
                self.logger.debug(f"self.race is {self.race}")
107
                self.coop.update_coop_text(self.race, self.full_name)
108
                self.qualifier.update_qualifier_text(self.race, self.full_name)
109
        elif data.get("type") == "pong":
110
            last_pong = dateutil.parser.parse(data.get("date"))
111
            pass
112
        return last_pong
113
114
    def update_logger(
115
        self, enabled: bool, log_to_file: bool, log_file: str, level: str
116
    ):
117
        self.logger.disabled = not enabled
118
        self.logger.handlers = []
119
        handler = logging.StreamHandler()
120
        if log_to_file:
121
            try:
122
                handler = logging.FileHandler(log_file)
123
            except Any:
124
                self.logger.error(f"Unable to open {log_file}")
125
        elif level == "Debug":
126
            handler.setLevel(logging.DEBUG)
127
        elif level == "Info":
128
            handler.setLevel(logging.INFO)
129
        else:
130
            handler.setLevel(logging.ERROR)
131
        handler.setFormatter(LogFormatter())
132
        self.logger.addHandler(handler)
133