Test Setup Failed
Push — master ( 977420...615522 )
by Jace
05:21
created

SpeechRecognizer.listen()   A

Complexity

Conditions 4

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 11
rs 9.2
c 1
b 0
f 0
1
#!env/bin/python
2
3
import time
4
import queue
5
import threading
6
import logging
7
import tkinter as tk
8
from tkinter import ttk
9
10
import click
11
from PIL import Image, ImageTk
12
try:
13
    import speech_recognition  # pylint: disable=import-error
14
except ImportError:
15
    speech_recognition = None
16
import requests
17
18
from . import __version__
19
20
21
log = logging.getLogger(__name__)
22
23
24
class Application:  # pylint: disable=too-many-instance-attributes
25
26
    def __init__(self):
27
        log.info("Starting the application...")
28
29
        self.label = None
30
        self.text = None
31
        self._image = None
32
        self._update_event = None
33
        self._clear_event = None
34
35
        # Configure root window
36
        self.root = tk.Tk()
37
        self.root.title(f"Meme Complete Desktop (v{__version__})")
38
        self.root.minsize(500, 500)
39
40
        # Configure speech recognition
41
        self._queue = queue.Queue()
42
        self._event = threading.Event()
43
        self._speech_recognizer = SpeechRecognizer(self._queue, self._event)
44
        self._speech_recognizer.start()
45
        self.process_speech()
46
47
        # Initialize the GUI
48
        self.label = None
49
        frame = self.init(self.root)
50
        frame.pack(fill=tk.BOTH, expand=1)
51
52
        # Start the event loop
53
        self.restart()
54
        self.root.protocol("WM_DELETE_WINDOW", self.close)
55
        self.root.mainloop()
56
57
    def init(self, root):
58
        padded = {'padding': 5}
59
        sticky = {'sticky': tk.NSEW}
60
61
        # Configure grid
62
        frame = ttk.Frame(root, **padded)
63
        frame.rowconfigure(0, weight=1)
64
        frame.rowconfigure(1, weight=0)
65
        frame.columnconfigure(0, weight=1)
66
67
        def frame_image(root):
68
            frame = ttk.Frame(root, **padded)
69
70
            # Configure grid
71
            frame.rowconfigure(0, weight=1)
72
            frame.columnconfigure(0, weight=1)
73
74
            # Place widgets
75
            self.label = ttk.Label(frame)
76
            self.label.grid(row=0, column=0)
77
78
            return frame
79
80
        def frame_text(root):
81
            frame = ttk.Frame(root, **padded)
82
83
            # Configure grid
84
            frame.rowconfigure(0, weight=1)
85
            frame.rowconfigure(1, weight=1)
86
            frame.columnconfigure(0, weight=1)
87
88
            # Place widgets
89
            self.text = ttk.Entry(frame)
90
            self.text.bind("<Key>", self.restart)
91
            self.text.grid(row=0, column=0, **sticky)
92
            self.text.focus_set()
93
94
            return frame
95
96
        def separator(root):
97
            return ttk.Separator(root)
98
99
        # Place widgets
100
        frame_image(frame).grid(row=0, **sticky)
101
        separator(frame).grid(row=1, padx=10, pady=5, **sticky)
102
        frame_text(frame).grid(row=2, **sticky)
103
104
        return frame
105
106
    def process_speech(self):
107
        try:
108
            speech = self._queue.get(0)
109
        except queue.Empty:
110
            pass
111
        else:
112
            self.update(speech)
113
        finally:
114
            self.root.after(10, self.process_speech)
115
116
    def update(self, value=None):
117
        if value:
118
            self.clear()
119
            self.text.insert(0, value)
120
121
        text = self.text.get()
122
        if not text:
123
            return
124
125
        log.info("Input text: %s", text)
126
        matches = self._get_matches(text)
127
128
        if matches:
129
            image = Image.open(self._get_image(matches))
130
            old_size = image.size
131
            max_size = self.root.winfo_width(), self.root.winfo_height()
132
            ratio = min(max_size[0] / old_size[0], max_size[1] / old_size[1])
133
            new_size = [int(s * ratio * 0.9) for s in old_size]
134
            image = image.resize(new_size, Image.ANTIALIAS)
135
136
            self._image = ImageTk.PhotoImage(image)
137
            self.label.configure(image=self._image)
138
            self.clear()
139
140
        self.restart(update=True, clear=False)
141
142
    @staticmethod
143
    def _get_matches(text):
144
        url = "https://memecomplete.com/api/memes/"
145
        data = dict(
146
            text=text,
147
            source='memecomplete-desktop',
148
        )
149
150
        log.info("Finding matches: %s", url)
151
        response = requests.get(url, params=data)
152
153
        return response.json()
154
155
    @staticmethod
156
    def _get_image(matches):
157
        url = matches[0]['image_url']
158
159
        log.info("Getting image: %s", url)
160
        response = requests.get(url, stream=True)
161
162
        return response.raw
163
164
    def clear(self, *_):
165
        self.text.delete(0, tk.END)
166
        self.restart()
167
168
    def restart(self, *_, update=True, clear=True):
169
        if update:
170
            if self._update_event:
171
                self.root.after_cancel(self._update_event)
172
            self._update_event = self.root.after(1000, self.update)
173
        if clear:
174
            if self._clear_event:
175
                self.root.after_cancel(self._clear_event)
176
            self._clear_event = self.root.after(5000, self.clear)
177
178
    def close(self):
179
        log.info("Closing the application...")
180
        self._event.set()
181
        time.sleep(0.1)
182
        self._speech_recognizer.join()
183
        self.root.destroy()
184
185
186
class SpeechRecognizer(threading.Thread):
187
188
    def __init__(self, queue, event):  # pylint: disable=redefined-outer-name
189
        super().__init__()
190
        self.queue = queue
191
        self.event = event
192
        self.microphone = None
193
        self.recognizer = None
194
195
    def run(self):
196
        if speech_recognition:
197
            self.configure()
198
            self.loop()
199
        else:
200
            log.info("Speech recognition disabled")
201
            self.event.wait()
202
203
    def configure(self):
204
        log.info("Configuring speech recognition...")
205
        self.recognizer = speech_recognition.Recognizer()
206
        self.recognizer.energy_threshold = 1500
207
        self.recognizer.dynamic_energy_threshold = True
208
        self.recognizer.dynamic_energy_adjustment_ratio = 3
209
        self.microphone = speech_recognition.Microphone()
210
        with self.microphone as source:
211
            log.info("Adjusting for ambient noise...")
212
            self.recognizer.adjust_for_ambient_noise(source, duration=3)
213
            log.info("Energy threshold: %s", self.recognizer.energy_threshold)
214
215
    def loop(self):
216
        log.info("Starting speech recognition loop...")
217
        while not self.event.is_set():
218
            audio = self.listen()
219
            if self.event.is_set():
220
                break
221
222
            if audio:
223
                result = self.recognize(audio)
224
                if self.event.is_set():
225
                    break
226
227
                if result:
228
                    self.queue.put(result)
229
230
    def listen(self):
231
        log.info("Listening for audio...")
232
        audio = None
233
        with self.microphone as source:
234
            try:
235
                audio = self.recognizer.listen(source, timeout=0.5)
236
            except speech_recognition.WaitTimeoutError:
237
                log.debug("No audio detected")
238
            else:
239
                log.debug("Audio detected: %s", audio)
240
        return audio
241
242
    def recognize(self, audio):
243
        log.info("Recognizing speech...")
244
        speech = None
245
        try:
246
            speech = self.recognizer.recognize_google(audio)
247
        except speech_recognition.UnknownValueError:
248
            log.warning("No speech detected")
249
        else:
250
            log.debug("Detected speech: %s", speech)
251
        return speech
252
253
254
@click.command()
255
@click.option('--speech/--no-speech', default=True)
256
def main(speech=True):
257
    global speech_recognition
258
259
    logging.basicConfig(level=logging.INFO,
260
                        format="%(levelname)s: %(message)s")
261
    logging.getLogger('requests').setLevel(logging.WARNING)
262
263
    if not speech:
264
        log.info("Disabling speech recognition...")
265
        speech_recognition = None
266
267
    Application()
268
269
270
if __name__ == '__main__':
271
    main()
272