Passed
Push — master ( 1fde47...705ffc )
by manny
01:51
created

rtgg_obs.RacetimeObs.fill_entrant_list()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
import json
2
import websockets
3
from websockets.client import WebSocketClientProtocol
4
import obspython as obs
5
from datetime import datetime, timedelta, timezone
6
import racetime_client
7
from models.race import Race, race_from_dict
8
import asyncio
9
import websockets
10
import dateutil
11
import logging
12
from helpers.LogFormatter import LogFormatter
13
from helpers.obs_context_manager import source_ar, source_list_ar, data_ar
14
from gadgets.timer import Timer
15
from gadgets.coop import Coop
16
from gadgets.qualifier import Qualifier
17
18
def script_description():
19
    return "<p>You've loaded the incorrect script.<br><br>Please remove this file and add 'racetime_obs.py' instead</p>"
20
21
class RacetimeObs():
22
    logger = logging.Logger("racetime-obs")
23
    race: Race = None
24
    selected_race = ""
25
    check_race_updates = False
26
    race_changed = False
27
    full_name = ""
28
    category = ""
29
    timer = Timer()
30
    coop = Coop()
31
    qualifier = Qualifier()
32
33
    def __init__(self):
34
        self.timer.logger = self.coop.logger = self.qualifier.logger = self.logger
35
36
    def update_sources(self):
37
        if self.race is not None:
38
            if self.timer.enabled:
39
                color, time = self.timer.get_timer_text(self.race, self.full_name)
40
                self.set_source_text(self.timer.source_name, time, color)
41
            if self.coop.enabled:
42
                self.set_source_text(self.coop.source_name, self.coop.text, None)
43
                self.set_source_text(self.coop.label_source_name, self.coop.label_text, None)
44
            if self.qualifier.enabled:
45
                self.set_source_text(self.qualifier.qualifier_par_source, self.qualifier.qualifier_par_text, None)
46
                self.set_source_text(self.qualifier.qualifier_score_source, self.qualifier.entrant_score, None)
47
            pass
48
49
    def race_update_thread(self):
50
        self.logger.debug("starting race update")
51
        race_event_loop = asyncio.new_event_loop()
52
        race_event_loop.run_until_complete(self.race_updater())
53
        race_event_loop.run_forever()
54
55
56
    async def race_updater(self):
57
        headers = {
58
            'User-Agent': "oro-obs-bot_alpha"
59
        }
60
        host = "racetime.gg"
61
62
        while True:
63
            if not self.timer.enabled:
64
                await asyncio.sleep(5.0)
65
            else:
66
                if self.race is None and self.selected_race != "":
67
                    self.race = racetime_client.get_race(self.selected_race)
68
                if self.race is not None and self.race.websocket_url != "":
69
                    async with websockets.connect("wss://racetime.gg" + self.race.websocket_url, host=host, extra_headers=headers) as ws:
70
                        self.race_changed = False
71
                        self.logger.info(
72
                            f"connected to websocket: {self.race.websocket_url}")
73
                        await self.process_messages(ws)
74
            await asyncio.sleep(5.0)
75
76
77
    async def process_messages(self, ws: WebSocketClientProtocol):
78
        last_pong = datetime.now(timezone.utc)
79
        while True:
80
            try:
81
                if self.race_changed:
82
                    self.logger.info("new race selected")
83
                    self.race_changed = False
84
                    break
85
                message = await asyncio.wait_for(ws.recv(), 5.0)
86
                self.logger.info(f"received message from websocket: {message}")
87
                data = json.loads(message)
88
                last_pong = self.process_ws_message(data, last_pong)
89
            except asyncio.TimeoutError:
90
                if datetime.now(timezone.utc) - last_pong > timedelta(seconds=20):
91
                    await ws.send(json.dumps({"action": "ping"}))
92
            except websockets.ConnectionClosed:
93
                self.logger.error(f"websocket connection closed")
94
                self.race = None
95
                break
96
97
    def process_ws_message(self, data, last_pong):
98
        if data.get("type") == "race.data":
99
            r = race_from_dict(data.get("race"))
100
            self.logger.debug(f"race data parsed: {r}")
101
            self.logger.debug(f"current race is {self.race}")
102
            if r is not None and r.version > self.race.version:
103
                self.race = r
104
                self.logger.debug(f"self.race is {self.race}")
105
                self.coop.update_coop_text(self.race, self.full_name)
106
                self.qualifier.update_qualifier_text(self.race, self.full_name)
107
        elif data.get("type") == "pong":
108
            last_pong = dateutil.parser.parse(data.get("date"))
109
            pass
110
        return last_pong
111
112
113
    def update_logger(self, enabled: bool, log_to_file: bool, log_file: str, level: str):
114
        self.logger.disabled = not enabled
115
        self.logger.handlers = []
116
        handler = logging.StreamHandler()
117
        if log_to_file:
118
            try:
119
                handler = logging.FileHandler(log_file)
120
            except:
121
                self.logger.error(f"Unable to open {log_file}")
122
        elif level == "Debug":
123
            handler.setLevel(logging.DEBUG)
124
        elif level == "Info":
125
            handler.setLevel(logging.INFO)
126
        else:
127
            handler.setLevel(logging.ERROR)
128
        handler.setFormatter(LogFormatter())
129
        self.logger.addHandler(handler)
130
131
    @staticmethod
132
    def fill_source_list(p):
133
        obs.obs_property_list_clear(p)
134
        obs.obs_property_list_add_string(p, "", "")
135
        with source_list_ar() as sources:
136
            if sources is not None:
137
                for source in sources:
138
                    source_id = obs.obs_source_get_unversioned_id(source)
139
                    if source_id == "text_gdiplus" or source_id == "text_ft2_source":
140
                        name = obs.obs_source_get_name(source)
141
                        obs.obs_property_list_add_string(p, name, name)
142
143
    def fill_race_list(self, race_list, category_list):
144
        obs.obs_property_list_clear(race_list)
145
        obs.obs_property_list_clear(category_list)
146
        obs.obs_property_list_add_string(category_list, "All", "All")
147
148
        obs.obs_property_list_add_string(race_list, "", "")
149
        races = racetime_client.get_races()
150
        if races is not None:
151
            categories = []
152
            for race in races:
153
                if self.category == "" or self.category == "All" or race.category.name == self.category:
154
                    obs.obs_property_list_add_string(
155
                        race_list, race.name, race.name)
156
                if not race.category.name in categories:
157
                    categories.append(race.category.name)
158
                    obs.obs_property_list_add_string(
159
                        category_list, race.category.name, race.category.name)
160
161
162
    def fill_coop_entrant_lists(self, props):
163
        self.fill_entrant_list(obs.obs_properties_get(props, "coop_partner"))
164
        self.fill_entrant_list(obs.obs_properties_get(props, "coop_opponent1"))
165
        self.fill_entrant_list(obs.obs_properties_get(props, "coop_opponent2"))
166
167
168
    def fill_entrant_list(self, entrant_list):
169
        obs.obs_property_list_clear(entrant_list)
170
        obs.obs_property_list_add_string(entrant_list, "", "")
171
        if self.race is not None:
172
            for entrant in self.race.entrants:
173
                obs.obs_property_list_add_string(
174
                    entrant_list, entrant.user.full_name, entrant.user.full_name)
175
    
176
    
177
    # copied and modified from scripted-text.py by UpgradeQ
178
179
    @staticmethod
180
    def set_source_text(source_name: str, text: str, color: int):
181
        with source_ar(source_name) as source, data_ar() as settings:
182
            obs.obs_data_set_string(settings, "text", text)
183
            source_id = obs.obs_source_get_unversioned_id(source)
184
            if color is not None:
185
                if source_id == "text_gdiplus":
186
                    obs.obs_data_set_int(settings, "color", color)  # colored text
187
188
                else:  # freetype2,if taken from user input it should be reversed for getting correct color
189
                    number = "".join(hex(color)[2:])
190
                    color = int("0xff" f"{number}", base=16)
191
                    obs.obs_data_set_int(settings, "color1", color)
192
            
193
            obs.obs_source_update(source, settings)