Passed
Branch master (7ffd9c)
by Ian
04:15
created

build.rsudp.c_write.Write.set_sps()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
import sys, os
2
import time
3
from datetime import datetime, timedelta
4
from obspy import UTCDateTime
5
from rsudp.raspberryshake import ConsumerThread
6
import rsudp.raspberryshake as rs
7
from rsudp import printM, printW, printE
8
import rsudp
9
10
class Write(rs.ConsumerThread):
11
	"""
12
	A simple routine to write daily miniSEED data to :code:`output_dir/data`.
13
14
	:param cha: channel(s) to forward. others will be ignored.
15
	:type cha: str or list
16
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
17
	:param bool debug: whether or not to display messages when writing data to disk.
18
	"""
19
	def __init__(self, q, debug=False, cha='all'):
20
		"""
21
		Initialize the process
22
		"""
23
		super().__init__()
24
		self.sender = 'Write'
25
		self.alive = True
26
27
		self.queue = q
28
29
		self.stream = rs.Stream()
30
		self.outdir = rsudp.data_dir
31
		self.debug = debug
32
33
		self.chans = []
34
		rs.set_channels(self, cha)
35
36
		printM('Writing channels: %s' % self.chans, self.sender)
37
		self.numchns = rs.numchns
38
		self.stime = 1/rs.sps
39
		self.inv = rs.inv
40
41
		printM('Starting.', self.sender)
42
43
44
	def getq(self):
45
		'''
46
		Reads data from the queue and updates the stream.
47
48
		:rtype: bool
49
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
50
		'''
51
		d = self.queue.get(True, timeout=None)
52
		self.queue.task_done()
53
		if 'TERM' in str(d):
54
			self.alive = False
55
			printM('Exiting.', self.sender)
56
			sys.exit()
57
		elif 'ALARM' in str(d):
58
			pass
59
		else:
60
			if rs.getCHN(d) in self.chans:
61
				self.stream = rs.update_stream(
62
					stream=self.stream, d=d, fill_value=None)
63
				return True
64
			else:
65
				return False
66
	
67
	def set_sps(self):
68
		'''
69
		Sets samples per second.
70
		'''
71
		self.sps = self.stream[0].stats.sampling_rate
72
73
	def elapse(self, new=False):
74
		'''
75
		Ticks self variables over into a new day for file naming purposes.
76
77
		:param bool new: If ``False``, the program is starting. If ``True``, the UTC date just elapsed.
78
		'''
79
		self.st = UTCDateTime.now()
80
		self.y, self.m, self.d = self.st.year, self.st.month, self.st.day
81
		self.j = self.st.strftime('%j')
82
		self.newday = UTCDateTime(self.y, self.m, self.d, 0, 0) + timedelta(days=1.1)
83
		self.newday = UTCDateTime(self.newday.year, self.newday.month, self.newday.day, 0, 0)
84
		if new:
85
			self.last = self.newday
86
		else:
87
			self.last = self.st
88
89
	def slicestream(self):
90
		'''
91
		Causes the stream to slice down to the time the last write operation was made.
92
		'''
93
		self.stream.slice(starttime=self.last)
94
95
	def write(self, stream=False):
96
		'''
97
		Writes a segment of the stream to disk as miniSEED, and appends it to the
98
		file in question. If there is no file (i.e. if the program is just starting
99
		or a new UTC day has just started, then this function writes to a new file).
100
101
		:type stream: obspy.core.stream.Stream or bool
102
		:param stream: The stream segment to write. If ``False``, the program has just started.
103
		'''
104
		if not stream:
105
			self.last = self.stream[0].stats.endtime - timedelta(seconds=5)
106
			stream = self.stream.copy().slice(
107
						endtime=self.last, nearest_sample=False)
108
109
		for t in stream:
110
			enc = 'STEIM2'	# encoding
111
			if isinstance(t.data, rs.np.ma.masked_array):
112
				t.data = t.data.filled(fill_value=0) # fill array (to avoid obspy write error)
113
			outfile = self.outdir + '/%s.%s.00.%s.D.%s.%s' % (t.stats.network,
114
								t.stats.station, t.stats.channel, self.y, self.j)
115
			if os.path.exists(os.path.abspath(outfile)):
116
				with open(outfile, 'ab') as fh:
117
					if self.debug:
118
						printM('Writing %s records to %s'
119
								% (len(t.data), outfile), self.sender)
120
					t.write(fh, format='MSEED', encoding=enc)
121
			else:
122
				if self.debug:
123
					printM('Writing %s new file %s'
124
							% (len(t.data), outfile), self.sender)
125
				t.write(outfile, format='MSEED', encoding=enc)
126
127
	def run(self):
128
		"""
129
		Reads packets and coordinates write operations.
130
		"""
131
		self.elapse()
132
133
		self.getq()
134
		self.set_sps()
135
		self.getq()
136
		printM('miniSEED output directory: %s' % (self.outdir), self.sender)
137
		if self.inv:
138
			printM('Writing inventory file: %s/%s.%s.00.xml' % (self.outdir,
139
					self.stream[0].stats.network,
140
					self.stream[0].stats.station), self.sender)
141
			self.inv.write('%s/%s.%s.00.xml' % (self.outdir,
142
					self.stream[0].stats.network,
143
					self.stream[0].stats.station),
144
					format='STATIONXML')
145
		printM('Beginning miniSEED output.', self.sender)
146
		wait_pkts = (self.numchns * 10) / (rs.tf / 1000) 	# comes out to 10 seconds (tf is in ms)
147
148
		n = 0
149
		while True:
150
			while True:
151
				if self.queue.qsize() > 0:
152
					self.getq()
153
					time.sleep(0.01)		# wait a few ms to see if another packet will arrive
154
					n += 1
155
				else:
156
					self.getq()
157
					n += 1
158
					break
159
			if n >= wait_pkts:
160
				if self.newday < UTCDateTime.now(): # end of previous day and start of new day
161
					self.write(self.stream.slice(
162
								endtime=self.newday, nearest_sample=False))
163
					self.stream = self.stream.slice(
164
								starttime=self.newday, nearest_sample=False)
165
					self.elapse(new=True)
166
				else:
167
					self.write()
168
					self.stream = self.stream.slice(
169
								starttime=self.last, nearest_sample=False)
170
				self.stream = rs.copy(self.stream)
171
				n = 0
172
173
				self.getq()
174
				time.sleep(0.01)		# wait a few ms to see if another packet will arrive
175
			sys.stdout.flush()
176
			sys.stderr.flush()
177