Passed
Push — master ( 7ffd9c...1783f0 )
by Ian
04:46
created

build.rsudp.helpers.deconv_rbm_inst()   A

Complexity

Conditions 1

Size

Total Lines 19
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 19
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
import rsudp.raspberryshake as rs
2
3
4
def set_channels(self, cha):
5
	'''
6
	This function sets the channels available for plotting. Allowed units are as follows:
7
8
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
9
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
10
	- ``["HDF"]`` - pressure transducer channel
11
	- ``["all"]`` - all available channels
12
13
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
14
	(geophone and vertical accelerometer) you could specify:
15
16
	``["EHZ", "ENZ"]``
17
18
	You can also specify partial channel names.
19
	So for example, the following will display at least one channel from any
20
	Raspberry Shake instrument:
21
22
	``["HZ", "HDF"]``
23
24
	Or if you wanted to display only vertical channels from a RS4D,
25
	you could specify
26
27
	``["Z"]``
28
29
	which would match both ``"EHZ"`` and ``"ENZ"``.
30
31
	:param self self: self object of the class calling this function
32
	:param cha: the channel or list of channels to plot
33
	:type cha: list or str
34
	'''
35
	cha = rs.chns if ('all' in cha) else cha
36
	cha = list(cha) if isinstance(cha, str) else cha
37
	for c in rs.chns:
38
		n = 0
39
		for uch in cha:
40
			if (uch.upper() in c) and (c not in str(self.chans)):
41
				self.chans.append(c)
42
			n += 1
43
	if len(self.chans) < 1:
44
			self.chans = rs.chns
45
46
47
def fsec(ti):
48
	'''
49
	.. versionadded:: 0.4.3
50
51
	The Raspberry Shake records at hundredths-of-a-second precision.
52
	In order to report time at this precision, we need to do some time-fu.
53
54
	This function rounds the microsecond fraction of a
55
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
56
	depending on its precision, so that it accurately reflects the Raspberry Shake's
57
	event measurement precision.
58
59
	This is necessary because datetime objects in Python are strange and confusing, and
60
	strftime doesn't support fractional returns, only the full integer microsecond field
61
	which is an integer right-padded with zeroes. This function uses the ``precision``
62
	of a datetime object.
63
64
	For example:
65
66
	.. code-block:: python
67
68
		>>> from obspy import UTCDateTime
69
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
70
		>>> fsec(ti)
71
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
72
73
	:param ti: time object to convert microseconds for
74
	:type ti: obspy.core.utcdatetime.UTCDateTime
75
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
76
	:rtype: obspy.core.utcdatetime.UTCDateTime
77
	'''
78
	# time in python is weird and confusing, but luckily obspy is better than Python
79
	# at dealing with datetimes. all we need to do is tell it what precision we want
80
	# and it handles the rounding for us.
81
	return rs.UTCDateTime(ti, precision=2)
82
83
84
def msg_alarm(event_time):
85
	'''
86
	This function constructs the ``ALARM`` message as a bytes object.
87
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
88
	to construct alarm queue messages.
89
90
	For example:
91
92
	.. code-block:: python
93
94
		>>> from obspy import UTCDateTime
95
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
96
		>>> msg_alarm(ti)
97
		b'ALARM 2020-01-01T00:00:00.599Z'
98
99
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
100
	:rtype: bytes
101
	:return: the ``ALARM`` message, ready to be put on the queue
102
	'''
103
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
104
105
106
def msg_reset(reset_time):
107
	'''
108
	This function constructs the ``RESET`` message as a bytes object.
109
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
110
	to construct reset queue messages.
111
112
	For example:
113
114
	.. code-block:: python
115
116
		>>> from obspy import UTCDateTime
117
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
118
		>>> msg_reset(ti)
119
		b'RESET 2020-01-01T00:00:00.599Z'
120
121
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
122
	:rtype: bytes
123
	:return: the ``RESET`` message, ready to be put on the queue
124
	'''
125
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
126
127
128
def msg_imgpath(event_time, figname):
129
	'''
130
	This function constructs the ``IMGPATH`` message as a bytes object.
131
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
132
	to construct queue messages containing timestamp and saved image path.
133
134
	For example:
135
136
	.. code-block:: python
137
138
		>>> from obspy import UTCDateTime
139
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
140
		>>> path = '/home/pi/rsudp/screenshots/test.png'
141
		>>> msg_imgpath(ti, path)
142
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
143
144
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
145
	:param str figname: the figure path as a string
146
	:rtype: bytes
147
	:return: the ``IMGPATH`` message, ready to be put on the queue
148
	'''
149
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
150
151
152
def msg_term():
153
	'''
154
	This function constructs the simple ``TERM`` message as a bytes object.
155
156
	.. code-block:: python
157
158
		>>> msg_term()
159
		b'TERM'
160
161
162
	:rtype: bytes
163
	:return: the ``TERM`` message
164
	'''
165
	return b'TERM'
166
167
168
def get_msg_time(msg):
169
	'''
170
	This function gets the time from ``ALARM``, ``RESET``,
171
	and ``IMGPATH`` messages as a UTCDateTime object.
172
173
	For example:
174
175
	.. code-block:: python
176
177
		>>> from obspy import UTCDateTime
178
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
179
		>>> path = '/home/pi/rsudp/screenshots/test.png'
180
		>>> msg = msg_imgpath(ti, path)
181
		>>> msg
182
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
183
		>>> get_msg_time(msg)
184
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
185
186
	:param bytes msg: the bytes-formatted queue message to decode
187
	:rtype: obspy.core.utcdatetime.UTCDateTime
188
	:return: the time embedded in the message
189
	'''
190
	return rs.UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
191
192
193
def get_msg_path(msg):
194
	'''
195
	This function gets the path from ``IMGPATH`` messages as a string.
196
197
	For example:
198
199
	.. code-block:: python
200
201
		>>> from obspy import UTCDateTime
202
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
203
		>>> path = '/home/pi/rsudp/screenshots/test.png'
204
		>>> msg = msg_imgpath(ti, path)
205
		>>> msg
206
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
207
		>>> get_msg_path(msg)
208
		'/home/pi/rsudp/screenshots/test.png'
209
210
	:param bytes msg: the bytes-formatted queue message to decode
211
	:rtype: str
212
	:return: the path embedded in the message
213
	'''
214
	return msg.decode('utf-8').split(' ')[2]
215
216
217
def deconv_vel_inst(self, trace, output):
218
	'''
219
	.. role:: pycode(code)
220
		:language: python
221
	
222
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
223
	for velocity channels.
224
225
	:param self self: The self object of the sub-consumer class calling this function.
226
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
227
	'''
228
	if self.deconv not in 'CHAN':
229
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
230
								output=output, water_level=4.5, taper=False)
231
	else:
232
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
233
								output='VEL', water_level=4.5, taper=False)
234
	if 'ACC' in self.deconv:
235
		trace.data = np.gradient(trace.data, 1)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable np does not seem to be defined.
Loading history...
236
	elif 'GRAV' in self.deconv:
237
		trace.data = np.gradient(trace.data, 1) / g
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable g does not seem to be defined.
Loading history...
238
		trace.stats.units = 'Earth gravity'
239
	elif 'DISP' in self.deconv:
240
		trace.data = np.cumsum(trace.data)
241
		trace.taper(max_percentage=0.1, side='left', max_length=1)
242
		trace.detrend(type='demean')
243
	else:
244
		trace.stats.units = 'Velocity'
245
246
247
def deconv_acc_inst(self, trace, output):
248
	'''
249
	.. role:: pycode(code)
250
		:language: python
251
	
252
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
253
	for acceleration channels.
254
255
	:param self self: The self object of the sub-consumer class calling this function.
256
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
257
	'''
258
	if self.deconv not in 'CHAN':
259
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
260
								output=output, water_level=4.5, taper=False)
261
	else:
262
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
263
								output='ACC', water_level=4.5, taper=False)
264
	if 'VEL' in self.deconv:
265
		trace.data = np.cumsum(trace.data)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable np does not seem to be defined.
Loading history...
266
		trace.detrend(type='demean')
267
	elif 'DISP' in self.deconv:
268
		trace.data = np.cumsum(np.cumsum(trace.data))
269
		trace.detrend(type='linear')
270
	elif 'GRAV' in self.deconv:
271
		trace.data = trace.data / g
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable g does not seem to be defined.
Loading history...
272
		trace.stats.units = 'Earth gravity'
273
	else:
274
		trace.stats.units = 'Acceleration'
275
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
276
		trace.taper(max_percentage=0.1, side='left', max_length=1)
277
278
279
def deconv_rbm_inst(self, trace, output):
280
	'''
281
	.. role:: pycode(code)
282
		:language: python
283
	
284
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
285
	for Raspberry Boom pressure transducer channels.
286
287
	.. note::
288
289
		The Raspberry Boom pressure transducer does not currently have a
290
		deconvolution function. The Raspberry Shake team is working on a
291
		calibration for the Boom, but until then Boom units are given in
292
		counts.
293
294
	:param self self: The self object of the sub-consumer class calling this function.
295
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
296
	'''
297
	trace.stats.units = ' counts'
298
299
300
def deconvolve(self):
301
	'''
302
	.. role:: pycode(code)
303
		:language: python
304
	
305
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
306
	that need to deconvolve their raw data to metric units.
307
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
308
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
309
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
310
311
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
312
	'''
313
	acc_channels = ['ENE', 'ENN', 'ENZ']
314
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
315
	rbm_channels = ['HDF']
316
317
	self.stream = self.raw.copy()
318
	for trace in self.stream:
319
		trace.stats.units = self.units
320
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
321
		if self.deconv:
322
			if trace.stats.channel in vel_channels:
323
				deconv_vel_inst(self, trace, output)	# geophone channels
324
325
			elif trace.stats.channel in acc_channels:
326
				deconv_acc_inst(self, trace, output)	# accelerometer channels
327
328
			elif trace.stats.channel in rbm_channels:
329
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
330
331
			else:
332
				trace.stats.units = ' counts'	# this is a new one
333
334
		else:
335
			trace.stats.units = ' counts'		# this is not being deconvolved
336
337
338