Passed
Push — master ( 021d22...fccee2 )
by Stefan
04:11
created

scia_solar.write_to_textfile()   B

Complexity

Conditions 6

Size

Total Lines 46
Code Lines 24

Duplication

Lines 46
Ratio 100 %

Code Coverage

Tests 14
CRAP Score 6.0702

Importance

Changes 0
Metric Value
cc 6
eloc 24
nop 2
dl 46
loc 46
ccs 14
cts 16
cp 0.875
crap 6.0702
rs 8.3706
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# vim:fileencoding=utf-8
3
#
4
# Copyright (c) 2014-2017 Stefan Bender
5
#
6
# This module is part of sciapy.
7
# sciapy is free software: you can redistribute it or modify
8
# it under the terms of the GNU General Public License as published
9
# by the Free Software Foundation, version 2.
10
# See accompanying LICENSE file or http://www.gnu.org/licenses/gpl-2.0.html.
11 1
"""SCIAMACHY level 1c solar spectra module
12
13
This module contains the python class for SCIAMACHY level 1c solar spectra.
14
It include some simple conversion routines, from and to ascii and from and to netcdf.
15
16
A simple import from SRON nadc_tools (https://github.com/rmvanhees/nadc_tools)
17
produced HDF5 is also supported.
18
"""
19 1
from __future__ import absolute_import, division, print_function
20
21 1
__all__ = ["scia_solar", "__doc__"]
22
23 1
import datetime
24 1
import logging
25
26 1
import numpy as np
27 1
try:
28 1
	from netCDF4 import Dataset as netcdf_file
29 1
	_fmtargs = {"format": "NETCDF4"}
30
except ImportError:
31
	try:
32
		from scipy.io.netcdf import netcdf_file
33
		_fmtargs = {"version": 1}
34
	except ImportError:
35
		from pupynere import netcdf_file
36
		_fmtargs = {"version": 1}
37
38 1
logging.basicConfig(level=logging.INFO,
39
		format="[%(levelname)-8s] (%(asctime)s) "
40
		"%(filename)s:%(lineno)d %(message)s",
41
		datefmt="%Y-%m-%d %H:%M:%S %z")
42
43 1 View Code Duplication
class scia_solar(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
44
	"""SCIAMACHY solar reference spectrum class
45
46
	Contains the SCIAMACHY level 1c solar reference spectrum.
47
	The format is inspired by the SCIAMACHY ascii data format.
48
49
	Attributes
50
	----------
51
	textheader_length : int
52
		The number of lines of the text header.
53
	textheader : str
54
		The header containing the solar spectrum meta data.
55
	npix : int
56
		The number of spectral points.
57
	solar_id : str
58
		The solar reference spectrum ID,
59
		choices: "D0", "D1", "D2", "E0", "E1", "A0", "A1",
60
		"N1", "N2", "N3", "N4", "N5".
61
	orbit : int
62
		The SCIAMACHY/Envisat orbit number.
63
	time : :class:`datetime.datetime`
64
		The sensing start time of the (semi-)orbit.
65
	wls : (M,) array_like
66
		The spectral wavelengths.
67
	rads : (M,) array_like
68
		The radiances at the spectral points, M = len(wls).
69
	errs : (M,) array_like
70
		The relative radiance uncertainties at the tangent points, M = len(wls).
71
	"""
72
73 1
	def __init__(self):
74 1
		self.textheader_length = 0
75 1
		self.textheader = ""
76 1
		self.npix = 0
77 1
		self.solar_id = ""
78 1
		self.orbit = -1
79 1
		self.time = None
80 1
		self.wls = np.array([])
81 1
		self.rads = np.array([])
82 1
		self.errs = None
83
84 1
	def read_from_netcdf(self, filename):
85
		"""SCIAMACHY level 1c solar reference netcdf import
86
87
		Parameters
88
		----------
89
		filename : str
90
			The netcdf filename to read the data from.
91
92
		Returns
93
		-------
94
		nothing
95
		"""
96 1
		ncf = netcdf_file(filename, 'r')
97 1
		self.textheader_length = ncf.textheader_length
98 1
		self.textheader = ncf.textheader
99 1
		if self.textheader_length > 6:
100 1
			self.solar_id = ncf.solar_id
101 1
			self.orbit = ncf.orbit
102 1
			self.time = datetime.datetime.strptime(ncf.time, '%Y-%m-%d %H:%M:%S%z')
103 1
		self.npix = len(ncf.dimensions['wavelength'])
104 1
		self.wls = ncf.variables['wavelength'][:].copy()
105 1
		self.rads = ncf.variables['radiance'][:].copy()
106 1
		try:
107 1
			self.errs = ncf.variables['radiance errors'][:].copy()
108 1
		except KeyError:
109 1
			self.errs = None
110 1
		ncf.close()
111
112 1
	def read_from_textfile(self, filename):
113
		"""SCIAMACHY level 1c solar reference text import
114
115
		Parameters
116
		----------
117
		filename : str
118
			The (plain) ascii table filename to read the data from.
119
120
		Returns
121
		-------
122
		nothing
123
		"""
124 1
		if hasattr(filename, 'seek'):
125
			f = filename
126
		else:
127 1
			f = open(filename, 'r')
128 1
		h_list = []
129 1
		try:
130 1
			nh = int(f.readline())
131
		except:
132
			nh = 6
133
			f.seek(0)
134 1
		for i in range(0, nh):
135 1
			h_list.append(f.readline().rstrip())
136 1
		self.textheader_length = nh
137 1
		self.textheader = '\n'.join(h_list)
138 1
		self.npix = int(f.readline())
139 1
		if nh > 6:
140 1
			self.solar_id = f.readline().rstrip()
141 1
			self.orbit = int(f.readline())
142 1
			self.time = datetime.datetime.strptime(f.readline() + "+0000", "%Y %m %d %H %M %S %z")
143 1
			self.wls, self.rads = np.genfromtxt(filename, skip_header=nh + 5, unpack=True)
144 1
			self.errs = None
145
		else:
146
			self.wls, self.rads, self.errs = np.genfromtxt(filename, skip_header=7, unpack=True)
147
148 1
	def read_from_hdf5(self, hf, ref="D0"):
149
		"""SCIAMACHY level 1c solar reference HDF5 import
150
151
		Parameters
152
		----------
153
		hf : opened file
154
			Pointer to the opened level 1c HDF5 file
155
		ref : str
156
			The solar reference spectra id name,
157
			choose from: "D0", "D1", "D2", "E0", "E1", "A0", "A1",
158
			"N1", "N2", "N3", "N4", "N5". Defaults to "D0".
159
160
		Returns
161
		-------
162
		success : int
163
			0 on success, 1 if an error occured.
164
		"""
165
		product = hf.get("/MPH")["product_name"][0].decode()
166
		soft_ver = hf.get("/MPH")["software_version"][0].decode()
167
		key_ver = hf.get("/SPH")["key_data_version"][0].decode()
168
		mf_ver = hf.get("/SPH")["m_factor_version"][0].decode()
169
		init_version = hf.get("/SPH")["init_version"][0].decode().strip()
170
		init_ver, decont = init_version.split(' ')
171
		decont = decont.lstrip("DECONT=")
172
		start_date = hf.get("/MPH")["sensing_start"][0].decode().rstrip('"')
173
		# fill some class variables
174
		self.time = (datetime.datetime.strptime(start_date, "%d-%b-%Y %H:%M:%S.%f")
175
					.replace(tzinfo=datetime.timezone.utc))
176
		self.orbit = hf.get("/MPH")["abs_orbit"][0]
177
		self.solar_id = ref
178
179
		logging.debug("product: %s, orbit: %s", product, self.orbit)
180
		logging.debug("soft_ver: %s, key_ver: %s, mf_ver: %s, init_ver: %s, "
181
				"decont_ver: %s", soft_ver, key_ver, mf_ver, init_ver, decont)
182
183
		# Prepare the header
184
		datatype_txt = "SCIAMACHY solar mean ref."
185
		n_header = 10
186
		line = n_header + 2
187
		header = ("#Data type          : {0}\n".format(datatype_txt))
188
		header += ("#L1b product        : {0}\n".format(product))
189
		header += ("#Orbit nr.          : {0:05d}\n".format(self.orbit))
190
		header += ("#Ver. Proc/Key/M/I/D: {0}  {1}  {2}  {3}  {4}\n"
191
				.format(soft_ver, key_ver, mf_ver, init_ver, decont))
192
		header += ("#Starttime          : {0}\n".format(start_date))
193
		header += ("#L.{0:2d} : Number_of_pixels\n".format(line))
194
		line += 1
195
		header += ("#L.{0:2d} : Solar ID\n".format(line))
196
		line += 1
197
		header += ("#L.{0:2d} : Orbit\n".format(line))
198
		line += 1
199
		header += ("#L.{0:2d} : Date Time : yyyy mm dd hh mm ss\n".format(line))
200
		line += 1
201
		header += ("#L.{0:2d} : Npix lines : wavelangth  irradiance  accuracy".format(line))
202
203
		sun_ref = hf.get("/GADS/SUN_REFERENCE")
204
		this_ref = sun_ref[sun_ref["sun_spec_id"] == ref]
205
206
		# fill the remaining class variables
207
		self.textheader_length = n_header
208
		self.textheader = header
209
		self.wls = this_ref["wvlen_sun"][:]
210
		self.npix = len(self.wls)
211
		self.rads = this_ref["mean_sun"][:]
212
		self.errs = this_ref["accuracy_sun"][:]
213
		return 0
214
215 1
	def read_from_file(self, filename):
216
		"""SCIAMACHY level 1c solar reference data import
217
218
		Convenience function to read the reference spectrum.
219
		Tries to detect the file format automatically trying netcdf first,
220
		and if that fails falls back to the text reader.
221
		Currently no HDF5 support.
222
223
		Parameters
224
		----------
225
		filename : str
226
			The filename to read from.
227
228
		Returns
229
		-------
230
		nothing
231
		"""
232 1
		try:
233
			# try netcdf first
234 1
			self.read_from_netcdf(filename)
235 1
		except:
236
			# fall back to text file
237 1
			self.read_from_textfile(filename)
238
239 1
	def write_to_netcdf(self, filename):
240
		"""SCIAMACHY level 1c solar reference netcdf export
241
242
		Parameters
243
		----------
244
		filename : str
245
			The netcdf filename to write the data to.
246
247
		Returns
248
		-------
249
		nothing
250
		"""
251 1
		ncf = netcdf_file(filename, 'w', **_fmtargs)
252 1
		ncf.textheader_length = self.textheader_length
253 1
		ncf.textheader = self.textheader
254 1
		ncf.solar_id = self.solar_id
255 1
		ncf.orbit = self.orbit
256 1
		ncf.time = self.time.strftime('%Y-%m-%d %H:%M:%S+0000')
257
258 1
		ncf.createDimension('wavelength', self.npix)
259 1
		wavs = ncf.createVariable('wavelength', np.dtype('float64').char, ('wavelength',))
260 1
		wavs.units = 'nm'
261 1
		wavs[:] = self.wls
262
263 1
		rads = ncf.createVariable('radiance', np.dtype('float64').char, ('wavelength',))
264 1
		rads.units = 'ph / s / cm^2 / nm'
265 1
		rads[:] = self.rads
266
267 1
		if self.errs is not None:
268
			errs = ncf.createVariable('radiance errors', np.dtype('float64').char, ('wavelength',))
269
			errs.units = 'ph / s / cm^2 / nm'
270
			errs[:] = self.errs
271
272 1
		ncf.close()
273
274 1
	def write_to_textfile(self, filename):
275
		"""SCIAMACHY level 1c solar reference text export
276
277
		Parameters
278
		----------
279
		filename : str
280
			The (plain) ascii table filename to write the data to.
281
			Passing sys.STDOUT writes to the console.
282
283
		Returns
284
		-------
285
		nothing
286
		"""
287 1
		if hasattr(filename, 'seek'):
288
			f = filename
289
		else:
290 1
			f = open(filename, 'w')
291 1
		if self.textheader_length > 6:
292 1
			print("{0:2d}".format(self.textheader_length), file=f)
293 1
		print(self.textheader, file=f)
294 1
		print(self.npix, file=f)
295 1
		if self.textheader_length > 6:
296 1
			print(self.solar_id, file=f)
297 1
			print(self.orbit, file=f)
298 1
			print("%4d %2d %2d %2d %2d %2d" % (self.time.year, self.time.month,
299
					self.time.day, self.time.hour, self.time.minute, self.time.second),
300
				file=f)
301 1
		for i in range(self.npix):
302
			#output = []
303
			#output.append(self.wls[i])
304
			#output.append(self.rads[i])
305
			#output.append(self.errs[i])
306
			#print('\t'.join(map(str, output)), file=f)
307 1
			if self.errs is not None:
308
				print(
309
					"{0:9.4f}  {1:12.5e}  {2:12.5e}".format(
310
						self.wls[i], self.rads[i], self.errs[i],
311
					),
312
					file=f,
313
				)
314
			else:
315 1
				print(
316
					"{0:9.4f}  {1:12.5e}".format(
317
						self.wls[i], self.rads[i],
318
					),
319
					file=f,
320
				)
321