Passed
Pull Request — master (#25)
by Ian
06:01
created

build.rsudp.c_write.Write.slicestream()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 5
ccs 1
cts 2
cp 0.5
rs 10
c 0
b 0
f 0
cc 1
nop 1
crap 1.125
1 1
import sys, os
2 1
import time
3 1
from datetime import datetime, timedelta
4 1
from obspy import UTCDateTime
5 1
from rsudp.raspberryshake import ConsumerThread
6 1
import rsudp.raspberryshake as rs
7 1
from rsudp import printM, printW, printE, helpers
8 1
from rsudp.test import TEST
9
10 1
class Write(rs.ConsumerThread):
11
	"""
12
	A simple routine to write daily miniSEED data to :code:`output_dir/data`.
13
14
	:param cha: channel(s) to forward. others will be ignored.
15
	:type cha: str or list
16
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
17
	:param bool debug: whether or not to display messages when writing data to disk.
18
	"""
19 1
	def __init__(self, q, data_dir, testing=False, debug=False, cha='all'):
20
		"""
21
		Initialize the process
22
		"""
23 1
		super().__init__()
24 1
		self.sender = 'Write'
25 1
		self.alive = True
26 1
		self.testing = testing
27 1
		self.debug = debug
28 1
		if self.testing:
29 1
			self.debug = True
30
31 1
		self.queue = q
32
33 1
		self.stream = rs.Stream()
34 1
		self.outdir = os.path.join(data_dir, 'data')
35 1
		self.outfiles = []
36
37 1
		self.chans = []
38 1
		helpers.set_channels(self, cha)
39
40 1
		printM('Writing channels: %s' % self.chans, self.sender)
41 1
		self.numchns = rs.numchns
42 1
		self.stime = 1/rs.sps
43 1
		self.inv = rs.inv
44
45 1
		printM('Starting.', self.sender)
46
47
48 1
	def getq(self):
49
		'''
50
		Reads data from the queue and updates the stream.
51
52
		:rtype: bool
53
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
54
		'''
55 1
		d = self.queue.get(True, timeout=None)
56 1
		self.queue.task_done()
57 1
		if 'TERM' in str(d):
58 1
			self.alive = False
59 1
			printM('Exiting.', self.sender)
60 1
			sys.exit()
61 1
		elif str(d.decode('UTF-8')).split(' ')[0] in ['ALARM', 'RESET', 'IMGPATH']:
62 1
			pass
63
		else:
64 1
			if rs.getCHN(d) in self.chans:
65 1
				self.stream = rs.update_stream(
66
					stream=self.stream, d=d, fill_value=None)
67 1
				return True
68
			else:
69
				return False
70
	
71 1
	def set_sps(self):
72
		'''
73
		Sets samples per second.
74
		'''
75 1
		self.sps = self.stream[0].stats.sampling_rate
76
77 1
	def elapse(self, new=False):
78
		'''
79
		Ticks self variables over into a new day for file naming purposes.
80
81
		:param bool new: If ``False``, the program is starting. If ``True``, the UTC date just elapsed.
82
		'''
83 1
		self.st = UTCDateTime.now()
84 1
		self.y, self.m, self.d = self.st.year, self.st.month, self.st.day
85 1
		self.j = self.st.strftime('%j')
86 1
		self.newday = UTCDateTime(self.y, self.m, self.d, 0, 0) + timedelta(days=1.1)
87 1
		self.newday = UTCDateTime(self.newday.year, self.newday.month, self.newday.day, 0, 0)
88 1
		if new:
89
			self.last = self.newday
90
		else:
91 1
			self.last = self.st
92
93 1
	def slicestream(self):
94
		'''
95
		Causes the stream to slice down to the time the last write operation was made.
96
		'''
97
		self.stream.slice(starttime=self.last)
98
99 1
	def _tracewrite(self, t):
100
		'''
101
		Processing for the :py:func:`rsudp.c_write.Write.write` function.
102
		Writes an input trace to disk.
103
104
		:type t: obspy.core.trace.Trace
105
		:param t: The trace segment to write to disk.
106
107
		'''
108 1
		enc = 'STEIM2'	# encoding
109 1
		if isinstance(t.data, rs.np.ma.masked_array):
110
			t.data = t.data.filled(fill_value=0) # fill array (to avoid obspy write error)
111 1
		outfile = self.outdir + '/%s.%s.00.%s.D.%s.%s' % (t.stats.network,
112
							t.stats.station, t.stats.channel, self.y, self.j)
113 1
		if not outfile in self.outfiles:
114 1
			self.outfiles.append(outfile)
115 1
		if os.path.exists(os.path.abspath(outfile)):
116 1
			with open(outfile, 'ab') as fh:
117 1
				t.write(fh, format='MSEED', encoding=enc)
118
				if self.debug:
119
					printM('%s records to %s'
120
							% (len(t.data), outfile), self.sender)
121
		else:
122 1
			t.write(outfile, format='MSEED', encoding=enc)
123
			if self.debug:
124
				printM('%s records to new file %s'
125
						% (len(t.data), outfile), self.sender)
126
127
128 1
	def write(self, stream=False):
129
		'''
130
		Writes a segment of the stream to disk as miniSEED, and appends it to the
131
		file in question. If there is no file (i.e. if the program is just starting
132
		or a new UTC day has just started, then this function writes to a new file).
133
134
		:type stream: obspy.core.stream.Stream or bool
135
		:param stream: The stream segment to write. If ``False``, the program has just started.
136
		'''
137 1
		if not stream:
138 1
			self.last = self.stream[0].stats.endtime - timedelta(seconds=5)
139 1
			stream = self.stream.copy().slice(
140
						endtime=self.last, nearest_sample=False)
141
142 1
		for t in stream:
143 1
			self._tracewrite(t)
144 1
		if self.testing:
145 1
			TEST['c_write'][1] = True
146
147 1
	def run(self):
148
		"""
149
		Reads packets and coordinates write operations.
150
		"""
151 1
		self.elapse()
152
153 1
		self.getq()
154 1
		self.set_sps()
155 1
		self.getq()
156 1
		printM('miniSEED output directory: %s' % (self.outdir), self.sender)
157 1
		if self.inv:
158 1
			printM('Writing inventory file: %s/%s.%s.00.xml' % (self.outdir,
159
					self.stream[0].stats.network,
160
					self.stream[0].stats.station), self.sender)
161 1
			self.inv.write('%s/%s.%s.00.xml' % (self.outdir,
162
					self.stream[0].stats.network,
163
					self.stream[0].stats.station),
164
					format='STATIONXML')
165 1
		printM('Beginning miniSEED output.', self.sender)
166 1
		wait_pkts = (self.numchns * 10) / (rs.tf / 1000) 	# comes out to 10 seconds (tf is in ms)
167
168 1
		n = 0
169 1
		while True:
170 1
			while True:
171 1
				if self.queue.qsize() > 0:
172 1
					self.getq()
173 1
					time.sleep(0.01)		# wait a few ms to see if another packet will arrive
174 1
					n += 1
175
				else:
176 1
					self.getq()
177 1
					n += 1
178 1
					break
179 1
			if n >= wait_pkts:
180 1
				if self.newday < UTCDateTime.now(): # end of previous day and start of new day
181
					self.write(self.stream.slice(
182
								endtime=self.newday, nearest_sample=False))
183
					self.stream = self.stream.slice(
184
								starttime=self.newday, nearest_sample=False)
185
					self.elapse(new=True)
186
				else:
187 1
					self.write()
188 1
					self.stream = self.stream.slice(
189
								starttime=self.last, nearest_sample=False)
190 1
				self.stream = rs.copy(self.stream)
191 1
				n = 0
192
193 1
				self.getq()
194 1
				time.sleep(0.01)		# wait a few ms to see if another packet will arrive
195 1
			sys.stdout.flush()
196
			sys.stderr.flush()
197