Passed
Push — master ( 45db06...e21d39 )
by Ian
04:35 queued 12s
created

build.rsudp.c_write.Write._tracewrite()   B

Complexity

Conditions 7

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 27
rs 8
c 0
b 0
f 0
cc 7
nop 2
1
import sys, os
2
import time
3
from datetime import datetime, timedelta
4
from obspy import UTCDateTime
5
from rsudp.raspberryshake import ConsumerThread
6
import rsudp.raspberryshake as rs
7
from rsudp import printM, printW, printE, helpers
8
from rsudp.test import TEST
9
10
class Write(rs.ConsumerThread):
11
	"""
12
	A simple routine to write daily miniSEED data to :code:`output_dir/data`.
13
14
	:param cha: channel(s) to forward. others will be ignored.
15
	:type cha: str or list
16
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
17
	:param bool debug: whether or not to display messages when writing data to disk.
18
	"""
19
	def __init__(self, q, data_dir, testing=False, debug=False, cha='all'):
20
		"""
21
		Initialize the process
22
		"""
23
		super().__init__()
24
		self.sender = 'Write'
25
		self.alive = True
26
		self.testing = testing
27
		self.debug = debug
28
		if self.testing:
29
			self.debug = True
30
31
		self.queue = q
32
33
		self.stream = rs.Stream()
34
		self.outdir = os.path.join(data_dir, 'data')
35
		self.outfiles = []
36
37
		self.chans = []
38
		helpers.set_channels(self, cha)
39
40
		printM('Writing channels: %s' % self.chans, self.sender)
41
		self.numchns = rs.numchns
42
		self.stime = 1/rs.sps
43
		self.inv = rs.inv
44
45
		printM('Starting.', self.sender)
46
47
48
	def getq(self):
49
		'''
50
		Reads data from the queue and updates the stream.
51
52
		:rtype: bool
53
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
54
		'''
55
		d = self.queue.get(True, timeout=None)
56
		self.queue.task_done()
57
		if 'TERM' in str(d):
58
			self.alive = False
59
			printM('Exiting.', self.sender)
60
			sys.exit()
61
		elif str(d.decode('UTF-8')).split(' ')[0] in ['ALARM', 'RESET', 'IMGPATH']:
62
			pass
63
		else:
64
			if rs.getCHN(d) in self.chans:
65
				self.stream = rs.update_stream(
66
					stream=self.stream, d=d, fill_value=None)
67
				return True
68
			else:
69
				return False
70
	
71
	def set_sps(self):
72
		'''
73
		Sets samples per second.
74
		'''
75
		self.sps = self.stream[0].stats.sampling_rate
76
77
	def elapse(self, new=False):
78
		'''
79
		Ticks self variables over into a new day for file naming purposes.
80
81
		:param bool new: If ``False``, the program is starting. If ``True``, the UTC date just elapsed.
82
		'''
83
		self.st = UTCDateTime.now()
84
		self.y, self.m, self.d = self.st.year, self.st.month, self.st.day
85
		self.j = self.st.strftime('%j')
86
		self.newday = UTCDateTime(self.y, self.m, self.d, 0, 0) + timedelta(days=1.1)
87
		self.newday = UTCDateTime(self.newday.year, self.newday.month, self.newday.day, 0, 0)
88
		if new:
89
			self.last = self.newday
90
		else:
91
			self.last = self.st
92
93
	def slicestream(self):
94
		'''
95
		Causes the stream to slice down to the time the last write operation was made.
96
		'''
97
		self.stream.slice(starttime=self.last)
98
99
	def _tracewrite(self, t):
100
		'''
101
		Processing for the :py:func:`rsudp.c_write.Write.write` function.
102
		Writes an input trace to disk.
103
104
		:type t: obspy.core.trace.Trace
105
		:param t: The trace segment to write to disk.
106
107
		'''
108
		enc = 'STEIM2'	# encoding
109
		if isinstance(t.data, rs.np.ma.masked_array):
110
			t.data = t.data.filled(fill_value=0) # fill array (to avoid obspy write error)
111
		outfile = self.outdir + '/%s.%s.00.%s.D.%s.%s' % (t.stats.network,
112
							t.stats.station, t.stats.channel, self.y, self.j)
113
		if not outfile in self.outfiles:
114
			self.outfiles.append(outfile)
115
		if os.path.exists(os.path.abspath(outfile)):
116
			with open(outfile, 'ab') as fh:
117
				t.write(fh, format='MSEED', encoding=enc)
118
				if self.debug:
119
					printM('%s records to %s'
120
							% (len(t.data), outfile), self.sender)
121
		else:
122
			t.write(outfile, format='MSEED', encoding=enc)
123
			if self.debug:
124
				printM('%s records to new file %s'
125
						% (len(t.data), outfile), self.sender)
126
127
128
	def write(self, stream=False):
129
		'''
130
		Writes a segment of the stream to disk as miniSEED, and appends it to the
131
		file in question. If there is no file (i.e. if the program is just starting
132
		or a new UTC day has just started, then this function writes to a new file).
133
134
		:type stream: obspy.core.stream.Stream or bool
135
		:param stream: The stream segment to write. If ``False``, the program has just started.
136
		'''
137
		if not stream:
138
			self.last = self.stream[0].stats.endtime - timedelta(seconds=5)
139
			stream = self.stream.copy().slice(
140
						endtime=self.last, nearest_sample=False)
141
142
		for t in stream:
143
			self._tracewrite(t)
144
		if self.testing:
145
			TEST['c_write'][1] = True
146
147
	def run(self):
148
		"""
149
		Reads packets and coordinates write operations.
150
		"""
151
		self.elapse()
152
153
		self.getq()
154
		self.set_sps()
155
		self.getq()
156
		printM('miniSEED output directory: %s' % (self.outdir), self.sender)
157
		if self.inv:
158
			printM('Writing inventory file: %s/%s.%s.00.xml' % (self.outdir,
159
					self.stream[0].stats.network,
160
					self.stream[0].stats.station), self.sender)
161
			self.inv.write('%s/%s.%s.00.xml' % (self.outdir,
162
					self.stream[0].stats.network,
163
					self.stream[0].stats.station),
164
					format='STATIONXML')
165
		printM('Beginning miniSEED output.', self.sender)
166
		wait_pkts = (self.numchns * 10) / (rs.tf / 1000) 	# comes out to 10 seconds (tf is in ms)
167
168
		n = 0
169
		while True:
170
			while True:
171
				if self.queue.qsize() > 0:
172
					self.getq()
173
					time.sleep(0.01)		# wait a few ms to see if another packet will arrive
174
					n += 1
175
				else:
176
					self.getq()
177
					n += 1
178
					break
179
			if n >= wait_pkts:
180
				if self.newday < UTCDateTime.now(): # end of previous day and start of new day
181
					self.write(self.stream.slice(
182
								endtime=self.newday, nearest_sample=False))
183
					self.stream = self.stream.slice(
184
								starttime=self.newday, nearest_sample=False)
185
					self.elapse(new=True)
186
				else:
187
					self.write()
188
					self.stream = self.stream.slice(
189
								starttime=self.last, nearest_sample=False)
190
				self.stream = rs.copy(self.stream)
191
				n = 0
192
193
				self.getq()
194
				time.sleep(0.01)		# wait a few ms to see if another packet will arrive
195
			sys.stdout.flush()
196
			sys.stderr.flush()
197