Passed
Push — master ( 0ed23a...45db06 )
by Ian
04:37 queued 12s
created

build.rsudp.c_write.Write.write()   C

Complexity

Conditions 9

Size

Total Lines 33
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 33
rs 6.6666
c 0
b 0
f 0
cc 9
nop 2
1
import sys, os
2
import time
3
from datetime import datetime, timedelta
4
from obspy import UTCDateTime
5
from rsudp.raspberryshake import ConsumerThread
6
import rsudp.raspberryshake as rs
7
from rsudp import printM, printW, printE, helpers
8
from rsudp import ms_path
9
10
class Write(rs.ConsumerThread):
11
	"""
12
	A simple routine to write daily miniSEED data to :code:`output_dir/data`.
13
14
	:param cha: channel(s) to forward. others will be ignored.
15
	:type cha: str or list
16
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
17
	:param bool debug: whether or not to display messages when writing data to disk.
18
	"""
19
	def __init__(self, q, data_dir, debug=False, cha='all'):
20
		"""
21
		Initialize the process
22
		"""
23
		super().__init__()
24
		self.sender = 'Write'
25
		self.alive = True
26
		self.debug = debug
27
28
		self.queue = q
29
30
		self.stream = rs.Stream()
31
		self.outdir = os.path.join(data_dir, 'data')
32
		self.outfiles = []
33
34
		self.chans = []
35
		helpers.set_channels(self, cha)
36
37
		printM('Writing channels: %s' % self.chans, self.sender)
38
		self.numchns = rs.numchns
39
		self.stime = 1/rs.sps
40
		self.inv = rs.inv
41
42
		printM('Starting.', self.sender)
43
44
45
	def getq(self):
46
		'''
47
		Reads data from the queue and updates the stream.
48
49
		:rtype: bool
50
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
51
		'''
52
		d = self.queue.get(True, timeout=None)
53
		self.queue.task_done()
54
		if 'TERM' in str(d):
55
			self.alive = False
56
			printM('Exiting.', self.sender)
57
			sys.exit()
58
		elif str(d.decode('UTF-8')).split(' ')[0] in ['ALARM', 'RESET', 'IMGPATH']:
59
			pass
60
		else:
61
			if rs.getCHN(d) in self.chans:
62
				self.stream = rs.update_stream(
63
					stream=self.stream, d=d, fill_value=None)
64
				return True
65
			else:
66
				return False
67
	
68
	def set_sps(self):
69
		'''
70
		Sets samples per second.
71
		'''
72
		self.sps = self.stream[0].stats.sampling_rate
73
74
	def elapse(self, new=False):
75
		'''
76
		Ticks self variables over into a new day for file naming purposes.
77
78
		:param bool new: If ``False``, the program is starting. If ``True``, the UTC date just elapsed.
79
		'''
80
		self.st = UTCDateTime.now()
81
		self.y, self.m, self.d = self.st.year, self.st.month, self.st.day
82
		self.j = self.st.strftime('%j')
83
		self.newday = UTCDateTime(self.y, self.m, self.d, 0, 0) + timedelta(days=1.1)
84
		self.newday = UTCDateTime(self.newday.year, self.newday.month, self.newday.day, 0, 0)
85
		if new:
86
			self.last = self.newday
87
		else:
88
			self.last = self.st
89
90
	def slicestream(self):
91
		'''
92
		Causes the stream to slice down to the time the last write operation was made.
93
		'''
94
		self.stream.slice(starttime=self.last)
95
96
	def write(self, stream=False):
97
		'''
98
		Writes a segment of the stream to disk as miniSEED, and appends it to the
99
		file in question. If there is no file (i.e. if the program is just starting
100
		or a new UTC day has just started, then this function writes to a new file).
101
102
		:type stream: obspy.core.stream.Stream or bool
103
		:param stream: The stream segment to write. If ``False``, the program has just started.
104
		'''
105
		if not stream:
106
			self.last = self.stream[0].stats.endtime - timedelta(seconds=5)
107
			stream = self.stream.copy().slice(
108
						endtime=self.last, nearest_sample=False)
109
110
		for t in stream:
111
			enc = 'STEIM2'	# encoding
112
			if isinstance(t.data, rs.np.ma.masked_array):
113
				t.data = t.data.filled(fill_value=0) # fill array (to avoid obspy write error)
114
			outfile = self.outdir + '/%s.%s.00.%s.D.%s.%s' % (t.stats.network,
115
								t.stats.station, t.stats.channel, self.y, self.j)
116
			if not outfile in self.outfiles:
117
				self.outfiles.append(outfile)
118
			if os.path.exists(os.path.abspath(outfile)):
119
				with open(outfile, 'ab') as fh:
120
					if self.debug:
121
						printM('Writing %s records to %s'
122
								% (len(t.data), outfile), self.sender)
123
					t.write(fh, format='MSEED', encoding=enc)
124
			else:
125
				if self.debug:
126
					printM('Writing %s new file %s'
127
							% (len(t.data), outfile), self.sender)
128
				t.write(outfile, format='MSEED', encoding=enc)
129
130
	def run(self):
131
		"""
132
		Reads packets and coordinates write operations.
133
		"""
134
		self.elapse()
135
136
		self.getq()
137
		self.set_sps()
138
		self.getq()
139
		printM('miniSEED output directory: %s' % (self.outdir), self.sender)
140
		if self.inv:
141
			printM('Writing inventory file: %s/%s.%s.00.xml' % (self.outdir,
142
					self.stream[0].stats.network,
143
					self.stream[0].stats.station), self.sender)
144
			self.inv.write('%s/%s.%s.00.xml' % (self.outdir,
145
					self.stream[0].stats.network,
146
					self.stream[0].stats.station),
147
					format='STATIONXML')
148
		printM('Beginning miniSEED output.', self.sender)
149
		wait_pkts = (self.numchns * 10) / (rs.tf / 1000) 	# comes out to 10 seconds (tf is in ms)
150
151
		n = 0
152
		while True:
153
			while True:
154
				if self.queue.qsize() > 0:
155
					self.getq()
156
					time.sleep(0.01)		# wait a few ms to see if another packet will arrive
157
					n += 1
158
				else:
159
					self.getq()
160
					n += 1
161
					break
162
			if n >= wait_pkts:
163
				if self.newday < UTCDateTime.now(): # end of previous day and start of new day
164
					self.write(self.stream.slice(
165
								endtime=self.newday, nearest_sample=False))
166
					self.stream = self.stream.slice(
167
								starttime=self.newday, nearest_sample=False)
168
					self.elapse(new=True)
169
				else:
170
					self.write()
171
					self.stream = self.stream.slice(
172
								starttime=self.last, nearest_sample=False)
173
				self.stream = rs.copy(self.stream)
174
				n = 0
175
176
				self.getq()
177
				time.sleep(0.01)		# wait a few ms to see if another packet will arrive
178
			sys.stdout.flush()
179
			sys.stderr.flush()
180