Test Failed
Push — coverage ( 00371a...390bfe )
by Ian
06:05
created

build.rsudp.c_alertsound.AlertSound._load_sound()   A

Complexity

Conditions 3

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 20
rs 9.6
c 0
b 0
f 0
cc 3
nop 1
1
import sys, os
2
from rsudp.raspberryshake import ConsumerThread
3
from rsudp import printM, printW, printE
4
from rsudp.test import TEST
5
import subprocess
6
try:
7
	from pydub.playback import play, PLAYER
8
	from pydub import AudioSegment
9
	pydub_exists = True
10
	TEST['d_pydub'][1] = True
11
except ImportError:
12
	pydub_exists = False
13
14
15
class AlertSound(ConsumerThread):
16
	"""
17
	.. _pydub.AudioSegment: https://github.com/jiaaro/pydub/blob/master/API.markdown#audiosegment
18
19
	A consumer class that plays an alert sound when an ``ALARM`` message arrives on the queue.
20
	``rsudp.c_alertsound.AlertSound.sound`` is a pydub.AudioSegment_ object and is passed from the client.
21
22
	:param sta: short term average (STA) duration in seconds.
23
	:type sta: bool or pydub.AudioSegment_ 
24
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
25
26
	"""
27
28
	def _load_sound(self):
29
		'''
30
		Loads MP3 sound if possible, then writes to wav.
31
		Catches a ``FileNotFoundError`` when no player can be loaded.
32
		'''
33
		try:
34
			soundloc = self.sound
35
			self.sound = AudioSegment.from_file(self.sound, format="mp3")
36
			printM('Loaded %.2f sec alert sound from %s' % (len(self.sound)/1000., soundloc), sender=self.sender)
37
			self.wavloc = '%s.wav' % os.path.splitext(soundloc)[0]
38
			if 'ffplay' in PLAYER:
39
				self._write_wav()
40
		except FileNotFoundError as e:
41
			printE('Error loading player - %s' % (e), sender=self.sender)
42
			printW("You have chosen to play a sound, but don't have ffmpeg or libav installed.", sender=self.sender)
43
			printW('Sound playback requires one of these dependencies.', sender=self.sender, spaces=True)
44
			printW("To install either dependency, follow the instructions at:", sender=self.sender, spaces=True)
45
			printW('https://github.com/jiaaro/pydub#playback', sender=self.sender, spaces=True)
46
			printW('The program will now continue without sound playback.', sender=self.sender, spaces=True)
47
			self.sound = False
48
49
	def _init_sound(self):
50
		if pydub_exists:
51
			if os.path.exists(self.sound):
52
				self._load_sound()
53
			else:
54
				printW("The file %s could not be found." % (self.sound), sender=self.sender)
55
				printW('The program will now continue without sound playback.', sender=self.sender, spaces=True)
56
				self.sound = False
57
		else:
58
			printW("You don't have pydub installed, so no sound will play.", sender=self.sender)
59
			printW('To install pydub, follow the instructions at:', sender=self.sender, spaces=True)
60
			printW('https://github.com/jiaaro/pydub#installation', sender=self.sender, spaces=True)
61
			printW('Sound playback also requires you to install either ffmpeg or libav.', sender=self.sender, spaces=True)
62
63
	def _write_wav(self):
64
		'''
65
		FFPlay can only play raw wav sounds without verbosity, so to support
66
		non-verbose mode we must export to .wav prior to playing a sound.
67
		This function checks for an existing wav file and if it does not
68
		exist, writes a new one.
69
		'''
70
		if not os.path.isfile(self.wavloc):
71
			self.sound.export(self.wavloc, format="wav")
72
			printM('Wrote wav version of sound file %s' % (self.wavloc), self.sender)
73
74
75
	def __init__(self, testing=False, soundloc=False, q=False):
76
		"""
77
		.. _pydub.AudioSegment: https://github.com/jiaaro/pydub/blob/master/API.markdown#audiosegment
78
79
		Initializes the alert sound listener thread.
80
		Needs a pydub.AudioSegment_ to play and a :class:`queue.Queue` to listen on.
81
82
		"""
83
		super().__init__()
84
		self.sender = 'AlertSound'
85
		self.alive = True
86
		self.testing = testing
87
88
		self.sound = soundloc
89
		self.devnull = open(os.devnull, 'w')
90
		
91
		self._init_sound()
92
93
		if q:
94
			self.queue = q
95
		else:
96
			printE('no queue passed to the consumer thread! We will exit now!',
97
				   self.sender)
98
			sys.stdout.flush()
99
			self.alive = False
100
			sys.exit()
101
102
		printM('Starting.', self.sender)
103
104
	def _play_quiet(self):
105
		'''
106
		if FFPlay is the player, suppress printed output.
107
		'''
108
		self._write_wav()
109
		subprocess.call([PLAYER,"-nodisp", "-autoexit", "-hide_banner",
110
						self.wavloc], stdout=self.devnull, stderr=self.devnull)
111
112
	def _play(self):
113
		printM('Playing alert sound...', sender=self.sender)
114
		if 'ffplay' in PLAYER:
115
			self._play_quiet()
116
		else:
117
			play(self.sound)
118
		if self.testing:
119
			TEST['c_play'][1] = True
120
121
	def run(self):
122
		"""
123
		Reads data from the queue and plays self.sound if it sees an ``ALARM`` message.
124
		Quits if it sees a ``TERM`` message.
125
		"""
126
		while True:
127
			d = self.queue.get()
128
			self.queue.task_done()
129
			if 'TERM' in str(d):
130
				self.alive = False
131
				self.devnull.close()
132
				printM('Exiting.', self.sender)
133
				sys.exit()
134
			elif 'ALARM' in str(d):
135
				if self.sound and pydub_exists:
136
					self._play()
137