Passed
Push — master ( 80a6f5...e155f9 )
by manny
47s queued 10s
created

gadgets.media_player.MediaTrigger.__init__()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 4
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
import asyncio
2
from asyncio.locks import Lock
3
from datetime import datetime, timedelta
4
import logging
5
from threading import Timer
6
from typing import List, Optional
7
from asyncio import Event
8
from helpers import timer_to_str
9
from models.race import Entrant, Race
10
from models.chat_message import ChatMessage
11
12
13
class MediaTrigger:
14
    place: Optional[int] = None
15
    entrant_count_trigger: Optional[int] = None
16
    media_file_path: str = ""
17
    triggered: bool = False
18
    race_started_at: datetime = None
19
20
    def __init__(
21
        self, media_file_path: str, place_trigger: int = None,
22
        entrant_count_trigger: int = None
23
    ):
24
        self.media_file_path = media_file_path
25
        self.place = place_trigger
26
        self.entrant_count_trigger = entrant_count_trigger
27
28
    def check_trigger(self, race: Race, entrant: Entrant):
29
        if (
30
            self.triggered or self.media_file_path == "" or
31
            race is None or entrant is None
32
        ):
33
            return False
34
        else:
35
            if (
36
                self.check_finish_place_entrant_count(race, entrant) or
37
                self.check_finish_place(entrant)
38
            ):
39
                self.triggered = True
40
                return True
41
        return False
42
43
    def check_finish_place(self, entrant: Entrant):
44
        if (
45
            self.entrant_count_trigger is not None or entrant.user is None or
46
            entrant.place is None
47
        ):
48
            return False
49
        else:
50
            return (
51
                entrant.place <= self.place
52
            )
53
54
    def check_finish_place_entrant_count(self, race: Race, entrant: Entrant):
55
        if None in [
56
            self.entrant_count_trigger, entrant.user, entrant.place,
57
            race.entrants_count
58
        ]:
59
            return False
60
        else:
61
            return (
62
                entrant.place <= self.place and
63
                race.entrants_count <= self.entrant_count_trigger
64
            )
65
66
67
class ChatTrigger:
68
    media_file_path: str = ""
69
    highlight_trigger: bool = False
70
    bot_trigger: bool = False
71
    system_trigger: bool = False
72
    message_plain_trigger: str = ""
73
    trigger_once: bool = False
74
    triggered: bool = False
75
76
    def __init__(self, media_file_path, highlight, bot, system, message_plain):
77
        self.media_file_path = media_file_path
78
        self.highlight_trigger = highlight
79
        self.bot_trigger = bot
80
        self.system_trigger = system
81
        self.message_plain_trigger = message_plain
82
83
    def check_trigger(self, chat_message: ChatMessage):
84
        def trigger_on_bot(bot_trigger: bool, chat_message: ChatMessage):
85
            return bot_trigger and chat_message.is_bot
86
87
        def trigger_on_highlight(
88
                highlight_trigger: bool, chat_message: ChatMessage):
89
            return highlight_trigger and chat_message.highlight
90
91
        def trigger_on_system(system_trigger: bool, chat_message: ChatMessage):
92
            return system_trigger and chat_message.is_system
93
94
        if self.trigger_once and self.triggered:
95
            return False
96
        self.triggered = (
97
            trigger_on_bot(self.bot_trigger, chat_message) or
98
            trigger_on_highlight(self.highlight_trigger, chat_message) or
99
            trigger_on_system(self.system_trigger, chat_message)
100
        )
101
        return self.triggered
102
103
104
class MediaPlayer:
105
    logger: logging.Logger = logging.Logger("racetime-obs")
106
    enabled: bool = False
107
    triggers: List[MediaTrigger] = []
108
    chat_triggers: List[ChatTrigger] = []
109
    timers: List[Timer] = []
110
    triggers_lock: Lock = Lock()
111
    race_update_event: Event()
112
    play_media_callback = None
113
    started_at: datetime = None
114
    ping_chat_messages: bool = False
115
    chat_media_file: str = None
116
    last_session_race: str = ""
117
    monitoring_type: int = 0
118
119
    def race_updated(self, race: Race, entrant_name: str):
120
        # so the sound doesn't play when the user starts obs next time
121
        if self.last_session_race == race.name:
122
            return
123
        self.started_at = race.started_at
124
        for trigger in self.triggers:
125
            self.logger.debug(trigger)
126
            if trigger.check_trigger(
127
                race, race.get_entrant_by_name(entrant_name)
128
            ):
129
                self.play_media_callback(
130
                    trigger.media_file_path, self.monitoring_type)
131
                self.logger.debug("trigger fired")
132
133
    def chat_updated(self, message: ChatMessage):
134
        for trigger in self.chat_triggers:
135
            self.logger.debug(trigger)
136
            if trigger.check_trigger(message):
137
                self.play_media_callback(
138
                    trigger.media_file_path, self.monitoring_type)
139
140
    def add_trigger(
141
        self, media_file_path: str, place_trigger: int = None,
142
        entrant_count_trigger: int = None
143
    ):
144
        async def add(
145
            media_file_path: str, place_trigger: int = None,
146
            entrant_count_trigger: int = None
147
        ):
148
            async with self.triggers_lock:
149
                self.triggers.append(MediaTrigger(
150
                    media_file_path, place_trigger=place_trigger,
151
                    entrant_count_trigger=entrant_count_trigger
152
                ))
153
154
        asyncio.ensure_future(add(
155
                media_file_path, place_trigger,
156
                entrant_count_trigger
157
        ))
158
159
    def add_chat_trigger(
160
            self, media_file_path: str, highlight: bool = False,
161
            is_bot: bool = False, is_system: bool = False,
162
            message_plain_text: str = None
163
            ):
164
        async def add(
165
            media_file_path, highlight, is_bot, is_system, message_plain_text
166
        ):
167
            async with self.triggers_lock:
168
                self.chat_triggers.append(
169
                    ChatTrigger(media_file_path, highlight, is_bot, is_system,
170
                                message_plain_text))
171
            pass
172
        asyncio.ensure_future(add(
173
            media_file_path, highlight, is_bot, is_system, message_plain_text))
174
175
    def add_timer(self, media_file_path: str, race_time: timedelta):
176
        # try to wake up a little early and get ready
177
        timer = Timer(
178
            self.time_to_start_play(race_time),
179
            self.timer_wake_up, media_file_path, race_time
180
        )
181
        self.timers.append(timer)
182
        timer.start()
183
184
    def time_to_start_play(self, race_time: timedelta) -> float:
185
        time_to_start_play = 10000000000000.0
186
        return time_to_start_play
187
188
    async def timer_wake_up(self, media_file_path: str, race_time: timedelta):
189
        asyncio.sleep(self.time_to_start_play(race_time))
190
        self.logger.debug(
191
            f"attempting to play {media_file_path} at "
192
            f"{timer_to_str(race_time)}"
193
        )
194
        asyncio.ensure_future(
195
            self.play_media_callback(
196
                media_file_path, self.monitoring_type))
197
198
    def remove_trigger(self, index: int):
199
        async def remove(index: int):
200
            async with self.triggers_lock:
201
                if len(self.triggers) > index:
202
                    self.triggers.remove(self.triggers[index])
203
        asyncio.ensure_future(remove(index))
204