Passed
Push — master ( c00388...cd152f )
by Ian
04:16
created

build.rsudp.c_write.Write.__init__()   A

Complexity

Conditions 1

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 15
nop 4
dl 0
loc 23
rs 9.65
c 0
b 0
f 0
1
import sys, os
2
import time
3
from datetime import datetime, timedelta
4
from obspy import UTCDateTime
5
from rsudp.raspberryshake import ConsumerThread
6
import rsudp.raspberryshake as rs
7
from rsudp import printM, printW, printE
8
import rsudp
9
10
class Write(rs.ConsumerThread):
11
	"""
12
	A simple routine to write daily miniSEED data to :code:`output_dir/data`.
13
14
	:param cha: channel(s) to forward. others will be ignored.
15
	:type cha: str or list
16
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
17
	:param bool debug: whether or not to display messages when writing data to disk.
18
	"""
19 View Code Duplication
	def _set_channels(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
20
		'''
21
		This function sets the channels available for plotting. Allowed units are as follows:
22
23
		- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
24
		- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
25
		- ``["HDF"]`` - pressure transducer channel
26
		- ``["all"]`` - all available channels
27
28
		So for example, if you wanted to display the two vertical channels of a Shake 4D,
29
		(geophone and vertical accelerometer) you could specify:
30
31
		``["EHZ", "ENZ"]``
32
33
		You can also specify partial channel names.
34
		So for example, the following will display at least one channel from any
35
		Raspberry Shake instrument:
36
37
		``["HZ", "HDF"]``
38
39
		Or if you wanted to display only vertical channels from a RS4D,
40
		you could specify
41
42
		``["Z"]``
43
44
		which would match both ``"EHZ"`` and ``"ENZ"``.
45
46
		:param cha: the channel or list of channels to plot
47
		:type cha: list or str
48
		'''
49
		cha = rs.chns if ('all' in cha) else cha
50
		cha = list(cha) if isinstance(cha, str) else cha
51
		for c in rs.chns:
52
			n = 0
53
			for uch in cha:
54
				if (uch.upper() in c) and (c not in str(self.chans)):
55
					self.chans.append(c)
56
				n += 1
57
		if len(self.chans) < 1:
58
			self.chans = rs.chns
59
60
61
	def __init__(self, q, debug=False, cha='all'):
62
		"""
63
		Initialize the process
64
		"""
65
		super().__init__()
66
		self.sender = 'Write'
67
		self.alive = True
68
69
		self.queue = q
70
71
		self.stream = rs.Stream()
72
		self.outdir = rsudp.data_dir
73
		self.debug = debug
74
75
		self.chans = []
76
		self._set_channels(cha)
77
78
		printM('Writing channels: %s' % self.chans, self.sender)
79
		self.numchns = rs.numchns
80
		self.stime = 1/rs.sps
81
		self.inv = rs.inv
82
83
		printM('Starting.', self.sender)
84
85
86
	def getq(self):
87
		'''
88
		Reads data from the queue and updates the stream.
89
90
		:rtype: bool
91
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
92
		'''
93
		d = self.queue.get(True, timeout=None)
94
		self.queue.task_done()
95
		if 'TERM' in str(d):
96
			self.alive = False
97
			printM('Exiting.', self.sender)
98
			sys.exit()
99
		elif 'ALARM' in str(d):
100
			pass
101
		else:
102
			if rs.getCHN(d) in self.chans:
103
				self.stream = rs.update_stream(
104
					stream=self.stream, d=d, fill_value=None)
105
				return True
106
			else:
107
				return False
108
	
109
	def set_sps(self):
110
		'''
111
		Sets samples per second.
112
		'''
113
		self.sps = self.stream[0].stats.sampling_rate
114
115
	def elapse(self, new=False):
116
		'''
117
		Ticks self variables over into a new day for file naming purposes.
118
119
		:param bool new: If ``False``, the program is starting. If ``True``, the UTC date just elapsed.
120
		'''
121
		self.st = UTCDateTime.now()
122
		self.y, self.m, self.d = self.st.year, self.st.month, self.st.day
123
		self.j = self.st.strftime('%j')
124
		self.newday = UTCDateTime(self.y, self.m, self.d, 0, 0) + timedelta(days=1.1)
125
		self.newday = UTCDateTime(self.newday.year, self.newday.month, self.newday.day, 0, 0)
126
		if new:
127
			self.last = self.newday
128
		else:
129
			self.last = self.st
130
131
	def slicestream(self):
132
		'''
133
		Causes the stream to slice down to the time the last write operation was made.
134
		'''
135
		self.stream.slice(starttime=self.last)
136
137
	def write(self, stream=False):
138
		'''
139
		Writes a segment of the stream to disk as miniSEED, and appends it to the
140
		file in question. If there is no file (i.e. if the program is just starting
141
		or a new UTC day has just started, then this function writes to a new file).
142
143
		:type stream: obspy.core.stream.Stream or bool
144
		:param stream: The stream segment to write. If ``False``, the program has just started.
145
		'''
146
		if not stream:
147
			self.last = self.stream[0].stats.endtime - timedelta(seconds=5)
148
			stream = self.stream.copy().slice(
149
						endtime=self.last, nearest_sample=False)
150
151
		for t in stream:
152
			enc = 'STEIM2'	# encoding
153
			if isinstance(t.data, rs.np.ma.masked_array):
154
				t.data = t.data.filled(fill_value=0) # fill array (to avoid obspy write error)
155
			outfile = self.outdir + '/%s.%s.00.%s.D.%s.%s' % (t.stats.network,
156
								t.stats.station, t.stats.channel, self.y, self.j)
157
			if os.path.exists(os.path.abspath(outfile)):
158
				with open(outfile, 'ab') as fh:
159
					if self.debug:
160
						printM('Writing %s records to %s'
161
								% (len(t.data), outfile), self.sender)
162
					t.write(fh, format='MSEED', encoding=enc)
163
			else:
164
				if self.debug:
165
					printM('Writing %s new file %s'
166
							% (len(t.data), outfile), self.sender)
167
				t.write(outfile, format='MSEED', encoding=enc)
168
169
	def run(self):
170
		"""
171
		Reads packets and coordinates write operations.
172
		"""
173
		self.elapse()
174
175
		self.getq()
176
		self.set_sps()
177
		self.getq()
178
		printM('miniSEED output directory: %s' % (self.outdir), self.sender)
179
		if self.inv:
180
			printM('Writing inventory file: %s/%s.%s.00.xml' % (self.outdir,
181
					self.stream[0].stats.network,
182
					self.stream[0].stats.station), self.sender)
183
			self.inv.write('%s/%s.%s.00.xml' % (self.outdir,
184
					self.stream[0].stats.network,
185
					self.stream[0].stats.station),
186
					format='STATIONXML')
187
		printM('Beginning miniSEED output.', self.sender)
188
		wait_pkts = (self.numchns * 10) / (rs.tf / 1000) 	# comes out to 10 seconds (tf is in ms)
189
190
		n = 0
191
		while True:
192
			while True:
193
				if self.queue.qsize() > 0:
194
					self.getq()
195
					time.sleep(0.01)		# wait a few ms to see if another packet will arrive
196
					n += 1
197
				else:
198
					self.getq()
199
					n += 1
200
					break
201
			if n >= wait_pkts:
202
				if self.newday < UTCDateTime.now(): # end of previous day and start of new day
203
					self.write(self.stream.slice(
204
								endtime=self.newday, nearest_sample=False))
205
					self.stream = self.stream.slice(
206
								starttime=self.newday, nearest_sample=False)
207
					self.elapse(new=True)
208
				else:
209
					self.write()
210
					self.stream = self.stream.slice(
211
								starttime=self.last, nearest_sample=False)
212
				self.stream = rs.copy(self.stream)
213
				n = 0
214
215
				self.getq()
216
				time.sleep(0.01)		# wait a few ms to see if another packet will arrive
217
			sys.stdout.flush()
218
			sys.stderr.flush()
219