Passed
Push — master ( e44069...5b9119 )
by manny
01:44
created

rtgg_obs.RacetimeObs.process_ws_message()   A

Complexity

Conditions 5

Size

Total Lines 14
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 14
rs 9.2333
c 0
b 0
f 0
cc 5
nop 3
1
import json
2
import websockets
3
from websockets.client import WebSocketClientProtocol
4
import obspython as obs
5
from datetime import datetime, timedelta, timezone
6
import racetime_client
7
from models.race import Race, race_from_dict
8
import asyncio
9
import websockets
10
import dateutil
11
import logging
12
from helpers.LogFormatter import LogFormatter
13
from helpers.obs_context_manager import source_ar, source_list_ar, data_ar
14
from gadgets.timer import Timer
15
from gadgets.coop import Coop
16
from gadgets.qualifier import Qualifier
17
18
def script_description():
19
    return "<p>You've loaded the incorrect script.<br><br>Please remove this file and add 'racetime_obs.py' instead</p>"
20
21
class RacetimeObs():
22
    logger = logging.Logger("racetime-obs")
23
    race: Race = None
24
    selected_race = ""
25
    check_race_updates = False
26
    race_changed = False
27
    full_name = ""
28
    category = ""
29
    timer = Timer()
30
    coop = Coop()
31
    qualifier = Qualifier()
32
33
    def update_sources(self):
34
        if self.race is not None:
35
            if self.timer.enabled:
36
                color, time = self.timer.get_timer_text(self.race, self.full_name)
37
                self.set_source_text(self.timer.source_name, time, color)
38
            if self.coop.enabled:
39
                self.set_source_text(self.coop.source_name, self.coop.text, None)
40
                self.set_source_text(self.coop.label_source_name, self.coop.label_text, None)
41
            if self.qualifier.enabled:
42
                self.set_source_text(self.qualifier.qualifier_par_source, self.qualifier.qualifier_par_text, None)
43
                self.set_source_text(self.qualifier.qualifier_score_source, self.qualifier.entrant_score, None)
44
            pass
45
46
    def race_update_thread(self):
47
        self.logger.debug("starting race update")
48
        race_event_loop = asyncio.new_event_loop()
49
        race_event_loop.run_until_complete(self.race_updater())
50
        race_event_loop.run_forever()
51
52
53
    async def race_updater(self):
54
        headers = {
55
            'User-Agent': "oro-obs-bot_alpha"
56
        }
57
        host = "racetime.gg"
58
59
        while True:
60
            if not self.timer.enabled:
61
                await asyncio.sleep(5.0)
62
            else:
63
                if self.race is None and self.selected_race != "":
64
                    self.race = racetime_client.get_race(self.selected_race)
65
                if self.race is not None and self.race.websocket_url != "":
66
                    async with websockets.connect("wss://racetime.gg" + self.race.websocket_url, host=host, extra_headers=headers) as ws:
67
                        self.race_changed = False
68
                        self.logger.info(
69
                            f"connected to websocket: {self.race.websocket_url}")
70
                        await self.process_messages(ws)
71
            await asyncio.sleep(5.0)
72
73
74
    async def process_messages(self, ws: WebSocketClientProtocol):
75
        last_pong = datetime.now(timezone.utc)
76
        while True:
77
            try:
78
                if self.race_changed:
79
                    self.logger.info("new race selected")
80
                    self.race_changed = False
81
                    break
82
                message = await asyncio.wait_for(ws.recv(), 5.0)
83
                self.logger.info(f"received message from websocket: {message}")
84
                data = json.loads(message)
85
                last_pong = self.process_ws_message(data, last_pong)
86
            except asyncio.TimeoutError:
87
                if datetime.now(timezone.utc) - last_pong > timedelta(seconds=20):
88
                    await ws.send(json.dumps({"action": "ping"}))
89
            except websockets.ConnectionClosed:
90
                self.logger.error(f"websocket connection closed")
91
                self.race = None
92
                break
93
94
    def process_ws_message(self, data, last_pong):
95
        if data.get("type") == "race.data":
96
            r = race_from_dict(data.get("race"))
97
            self.logger.debug(f"race data parsed: {r}")
98
            self.logger.debug(f"current race is {self.race}")
99
            if r is not None and r.version > self.race.version:
100
                self.race = r
101
                self.logger.debug(f"self.race is {self.race}")
102
                self.coop.update_coop_text(self.race, self.full_name)
103
                self.qualifier.update_qualifier_text(self.race, self.full_name)
104
        elif data.get("type") == "pong":
105
            last_pong = dateutil.parser.parse(data.get("date"))
106
            pass
107
        return last_pong
108
109
110
    def update_logger(self, enabled: bool, log_to_file: bool, log_file: str, level: str):
111
        self.logger.disabled = not enabled
112
        self.logger.handlers = []
113
        handler = logging.StreamHandler()
114
        if log_to_file:
115
            try:
116
                handler = logging.FileHandler(log_file)
117
            except:
118
                self.logger.error(f"Unable to open {log_file}")
119
        elif level == "Debug":
120
            handler.setLevel(logging.DEBUG)
121
        elif level == "Info":
122
            handler.setLevel(logging.INFO)
123
        else:
124
            handler.setLevel(logging.ERROR)
125
        handler.setFormatter(LogFormatter())
126
        self.logger.addHandler(handler)
127
128
    @staticmethod
129
    def fill_source_list(p):
130
        obs.obs_property_list_clear(p)
131
        obs.obs_property_list_add_string(p, "", "")
132
        with source_list_ar() as sources:
133
            if sources is not None:
134
                for source in sources:
135
                    source_id = obs.obs_source_get_unversioned_id(source)
136
                    if source_id == "text_gdiplus" or source_id == "text_ft2_source":
137
                        name = obs.obs_source_get_name(source)
138
                        obs.obs_property_list_add_string(p, name, name)
139
140
    def fill_race_list(self, race_list, category_list):
141
        obs.obs_property_list_clear(race_list)
142
        obs.obs_property_list_clear(category_list)
143
        obs.obs_property_list_add_string(category_list, "All", "All")
144
145
        obs.obs_property_list_add_string(race_list, "", "")
146
        races = racetime_client.get_races()
147
        if races is not None:
148
            categories = []
149
            for race in races:
150
                if self.category == "" or self.category == "All" or race.category.name == self.category:
151
                    obs.obs_property_list_add_string(
152
                        race_list, race.name, race.name)
153
                if not race.category.name in categories:
154
                    categories.append(race.category.name)
155
                    obs.obs_property_list_add_string(
156
                        category_list, race.category.name, race.category.name)
157
158
159
    def fill_coop_entrant_lists(self, props):
160
        self.fill_entrant_list(obs.obs_properties_get(props, "coop_partner"))
161
        self.fill_entrant_list(obs.obs_properties_get(props, "coop_opponent1"))
162
        self.fill_entrant_list(obs.obs_properties_get(props, "coop_opponent2"))
163
164
165
    def fill_entrant_list(self, entrant_list):
166
        obs.obs_property_list_clear(entrant_list)
167
        obs.obs_property_list_add_string(entrant_list, "", "")
168
        if self.race is not None:
169
            for entrant in self.race.entrants:
170
                obs.obs_property_list_add_string(
171
                    entrant_list, entrant.user.full_name, entrant.user.full_name)
172
    
173
    
174
    # copied and modified from scripted-text.py by UpgradeQ
175
176
    @staticmethod
177
    def set_source_text(source_name: str, text: str, color: int):
178
        with source_ar(source_name) as source, data_ar() as settings:
179
            obs.obs_data_set_string(settings, "text", text)
180
            source_id = obs.obs_source_get_unversioned_id(source)
181
            if color is not None:
182
                if source_id == "text_gdiplus":
183
                    obs.obs_data_set_int(settings, "color", color)  # colored text
184
185
                else:  # freetype2,if taken from user input it should be reversed for getting correct color
186
                    number = "".join(hex(color)[2:])
187
                    color = int("0xff" f"{number}", base=16)
188
                    obs.obs_data_set_int(settings, "color1", color)
189
            
190
            obs.obs_source_update(source, settings)