build.rsudp.c_write.Write.run()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 31
CRAP Score 7.0336

Importance

Changes 0
Metric Value
eloc 42
dl 0
loc 50
ccs 31
cts 34
cp 0.9118
rs 7.472
c 0
b 0
f 0
cc 7
nop 1
crap 7.0336
1 1
import sys, os
2 1
import time
3 1
from datetime import timedelta
4 1
from obspy import UTCDateTime
5 1
import rsudp.raspberryshake as rs
6 1
from rsudp import printM, printW, printE, helpers
7 1
from rsudp.test import TEST
8
9 1
class Write(rs.ConsumerThread):
10
	"""
11
	A simple routine to write daily miniSEED data to :code:`output_dir/data`.
12
13
	:param cha: channel(s) to forward. others will be ignored.
14
	:type cha: str or list
15
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
16
	:param bool debug: whether or not to display messages when writing data to disk.
17
	"""
18 1
	def __init__(self, q, data_dir, testing=False, debug=False, cha='all'):
19
		"""
20
		Initialize the process
21
		"""
22 1
		super().__init__()
23 1
		self.sender = 'Write'
24 1
		self.alive = True
25 1
		self.testing = testing
26 1
		self.debug = debug
27 1
		if self.testing:
28 1
			self.debug = True
29
30 1
		self.queue = q
31
32 1
		self.stream = rs.Stream()
33 1
		self.outdir = os.path.join(data_dir, 'data')
34 1
		self.outfiles = []
35
36 1
		self.chans = []
37 1
		helpers.set_channels(self, cha)
38
39 1
		printM('Writing channels: %s' % self.chans, self.sender)
40 1
		self.numchns = rs.numchns
41 1
		self.stime = 1/rs.sps
42 1
		self.inv = rs.inv
43
44 1
		printM('Starting.', self.sender)
45
46
47 1
	def getq(self):
48
		'''
49
		Reads data from the queue and updates the stream.
50
51
		:rtype: bool
52
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
53
		'''
54 1
		d = self.queue.get(True, timeout=None)
55 1
		self.queue.task_done()
56 1
		if 'TERM' in str(d):
57 1
			self.alive = False
58 1
			printM('Exiting.', self.sender)
59 1
			sys.exit()
60 1
		elif str(d.decode('UTF-8')).split(' ')[0] in ['ALARM', 'RESET', 'IMGPATH']:
61 1
			pass
62
		else:
63 1
			if rs.getCHN(d) in self.chans:
64 1
				self.stream = rs.update_stream(
65
					stream=self.stream, d=d, fill_value=None)
66 1
				return True
67
			else:
68
				return False
69
	
70 1
	def set_sps(self):
71
		'''
72
		Sets samples per second.
73
		'''
74 1
		self.sps = self.stream[0].stats.sampling_rate
75
76 1
	def elapse(self, new=False):
77
		'''
78
		Ticks self variables over into a new day for file naming purposes.
79
80
		:param bool new: If ``False``, the program is starting. If ``True``, the UTC date just elapsed.
81
		'''
82 1
		self.st = UTCDateTime.now()
83 1
		self.y, self.m, self.d = self.st.year, self.st.month, self.st.day
84 1
		self.j = self.st.strftime('%j')
85 1
		self.newday = UTCDateTime(self.y, self.m, self.d, 0, 0) + timedelta(days=1.1)
86 1
		self.newday = UTCDateTime(self.newday.year, self.newday.month, self.newday.day, 0, 0)
87 1
		if new:
88
			self.last = self.newday
89
		else:
90 1
			self.last = self.st
91
92 1
	def slicestream(self):
93
		'''
94
		Causes the stream to slice down to the time the last write operation was made.
95
		'''
96
		self.stream.slice(starttime=self.last)
97
98 1
	def _tracewrite(self, t):
99
		'''
100
		Processing for the :py:func:`rsudp.c_write.Write.write` function.
101
		Writes an input trace to disk.
102
103
		:type t: obspy.core.trace.Trace
104
		:param t: The trace segment to write to disk.
105
106
		'''
107 1
		enc = 'STEIM2'	# encoding
108 1
		if isinstance(t.data, rs.np.ma.masked_array):
109
			t.data = t.data.filled(fill_value=0) # fill array (to avoid obspy write error)
110 1
		outfile = self.outdir + '/%s.%s.00.%s.D.%s.%s' % (t.stats.network,
111
							t.stats.station, t.stats.channel, self.y, self.j)
112 1
		if not outfile in self.outfiles:
113 1
			self.outfiles.append(outfile)
114 1
		if os.path.exists(os.path.abspath(outfile)):
115 1
			with open(outfile, 'ab') as fh:
116 1
				t.write(fh, format='MSEED', encoding=enc)
117
				if self.debug:
118
					printM('%s records to %s'
119
							% (len(t.data), outfile), self.sender)
120
		else:
121 1
			t.write(outfile, format='MSEED', encoding=enc)
122
			if self.debug:
123
				printM('%s records to new file %s'
124
						% (len(t.data), outfile), self.sender)
125
126
127 1
	def write(self, stream=False):
128
		'''
129
		Writes a segment of the stream to disk as miniSEED, and appends it to the
130
		file in question. If there is no file (i.e. if the program is just starting
131
		or a new UTC day has just started, then this function writes to a new file).
132
133
		:type stream: obspy.core.stream.Stream or bool
134
		:param stream: The stream segment to write. If ``False``, the program has just started.
135
		'''
136 1
		if not stream:
137 1
			self.last = self.stream[0].stats.endtime - timedelta(seconds=5)
138 1
			stream = self.stream.copy().slice(
139
						endtime=self.last, nearest_sample=False)
140
141 1
		for t in stream:
142 1
			self._tracewrite(t)
143 1
		if self.testing:
144 1
			TEST['c_write'][1] = True
145
146 1
	def run(self):
147
		"""
148
		Reads packets and coordinates write operations.
149
		"""
150 1
		self.elapse()
151
152 1
		self.getq()
153 1
		self.set_sps()
154 1
		self.getq()
155 1
		printM('miniSEED output directory: %s' % (self.outdir), self.sender)
156 1
		if self.inv:
157 1
			printM('Writing inventory file: %s/%s.%s.00.xml' % (self.outdir,
158
					self.stream[0].stats.network,
159
					self.stream[0].stats.station), self.sender)
160 1
			self.inv.write('%s/%s.%s.00.xml' % (self.outdir,
161
					self.stream[0].stats.network,
162
					self.stream[0].stats.station),
163
					format='STATIONXML')
164 1
		printM('Beginning miniSEED output.', self.sender)
165 1
		wait_pkts = (self.numchns * 10) / (rs.tf / 1000) 	# comes out to 10 seconds (tf is in ms)
166
167 1
		n = 0
168 1
		while True:
169 1
			while True:
170 1
				if self.queue.qsize() > 0:
171 1
					self.getq()
172 1
					time.sleep(0.01)		# wait a few ms to see if another packet will arrive
173 1
					n += 1
174
				else:
175 1
					self.getq()
176 1
					n += 1
177 1
					break
178 1
			if n >= wait_pkts:
179 1
				if self.newday < UTCDateTime.now(): # end of previous day and start of new day
180
					self.write(self.stream.slice(
181
								endtime=self.newday, nearest_sample=False))
182
					self.stream = self.stream.slice(
183
								starttime=self.newday, nearest_sample=False)
184
					self.elapse(new=True)
185
				else:
186 1
					self.write()
187 1
					self.stream = self.stream.slice(
188
								starttime=self.last, nearest_sample=False)
189 1
				self.stream = rs.copy(self.stream)
190 1
				n = 0
191
192 1
				self.getq()
193 1
				time.sleep(0.01)		# wait a few ms to see if another packet will arrive
194 1
			sys.stdout.flush()
195
			sys.stderr.flush()
196