test_level2.test_oldver_round_trip_txt()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 16

Duplication

Lines 17
Ratio 100 %

Importance

Changes 0
Metric Value
cc 2
eloc 16
nop 2
dl 17
loc 17
rs 9.6
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# vim:fileencoding=utf-8
3
#
4
# Copyright (c) 2018 Stefan Bender
5
#
6
# This module is part of sciapy.
7
# sciapy is free software: you can redistribute it or modify
8
# it under the terms of the GNU General Public License as published
9
# by the Free Software Foundation, version 2.
10
# See accompanying LICENSE file or http://www.gnu.org/licenses/gpl-2.0.html.
11
"""SCIAMACHY level 2 tests
12
"""
13
import os
14
15
import numpy as np
16
import pytest
17
import xarray as xr
18
19
import sciapy.level2
20
21
22
def test_module_structure():
23
	assert sciapy.level2
24
	assert sciapy.level2.binning
25
	assert sciapy.level2.binning.bin_lat_timeavg
26
	assert sciapy.level2.density
27
	assert sciapy.level2.density.scia_densities
28
29
30
@pytest.fixture(scope="module")
31
def ds():
32
	alts = np.r_[80:121:20.]
33
	lats = np.r_[-85:86:10.]
34
	lons = np.r_[15:186:10.]
35
	ts = np.linspace(3686, 3686.5, 2)
36
	lons_2d = np.vstack([lons + i * 180. for i in range(ts.size)])
37
	data = np.arange(ts.size * lats.size * alts.size).reshape(
38
		(ts.size, lats.size, alts.size)
39
	)
40
	ds = xr.Dataset(
41
		{
42
			"data": (
43
				["time", "latitude", "altitude"],
44
				data,
45
				{"units": "fir mfur μftn$^{-2}$ fth"}
46
			),
47
			"longitude": (
48
				["time", "latitude"],
49
				lons_2d,
50
				{"units": "degrees_east"}
51
			),
52
		},
53
		coords={
54
			"latitude": lats,
55
			"altitude": alts,
56
			"time": (
57
				["time"], ts, {"units": "days since 2000-01-01 00:00:00+00:00"}
58
			),
59
		},
60
	)
61
	return ds
62
63
64
def test__bin_stats(ds):
65
	from sciapy.level2.binning import _bin_stats
66
	_ds = ds.copy()
67
	_ds["latitude"] = xr.zeros_like(_ds.latitude)
68
	# binning result
69
	avg_aw = _bin_stats(
70
		_ds,
71
		binvar="latitude", tvar="time",
72
		area_weighted=True,
73
	)
74
	avg_nw = _bin_stats(
75
		_ds,
76
		binvar="latitude", tvar="time",
77
		area_weighted=False,
78
	)
79
	xr.testing.assert_allclose(avg_nw, avg_aw)
80
81
	# non-weighted mean using standard functions
82
	dims = ("latitude", "time")
83
	stacked = "__stacked__"
84
	_ds = _ds.stack(**{stacked: dims})
85
	ds_avg = _ds.mean(dim=stacked)
86
	ds_cnt = _ds.count(dim=stacked)
87
	ds_std = _ds.std(dim=stacked, ddof=1)
88
	ds_std = ds_std.rename({v: v + "_std" for v in ds_std.data_vars})
89
	ds_cnt = ds_cnt.rename({v: v + "_cnt" for v in ds_cnt.data_vars})
90
	avg_ds = xr.merge([ds_avg, ds_std, ds_cnt])
91
	# Re-create the sum of squared weights
92
	_ws = xr.ones_like(_ds.latitude, dtype=float)
93
	_ws /= _ws.sum(dim=stacked)
94
	avg_ds["wsqsum"] = (_ws**2).sum(dim=stacked)
95
	xr.testing.assert_allclose(avg_nw, avg_ds)
96
97
98
def test_binning_aw(ds):
99
	from sciapy.level2.binning import bin_lat_timeavg
100
	data = ds.data.values
101
	lons = ds.longitude.values
102
	nts = ds.time.size
103
	wgts = np.cos(np.radians(ds.latitude.values))
104
	wgtbs = wgts[::2] + wgts[1::2]
105
	# divide each by the bin sum
106
	wgts[::2] /= wgtbs
107
	wgts[1::2] /= wgtbs
108
	data = data * wgts[None, :, None]
109
	lons = lons * wgts[None, :]
110
	# "manual" result
111
	data_avg = 1.0 / nts * (data[:, ::2, :] + data[:, 1::2, :]).sum(axis=0)
112
	lons_avg = 1.0 / nts * (lons[:, ::2] + lons[:, 1::2]).sum(axis=0)
113
	# binning function result
114
	dst_nw = bin_lat_timeavg(ds, bins=np.r_[-90:91:20])
115
	np.testing.assert_allclose(dst_nw.data.values, data_avg)
116
	np.testing.assert_allclose(dst_nw.longitude.values, lons_avg)
117
	assert dst_nw.data.attrs["units"] == "fir mfur μftn$^{-2}$ fth"
118
	assert dst_nw.longitude.attrs["units"] == "degrees_east"
119
120
121
def test_binning_nw(ds):
122
	from sciapy.level2.binning import bin_lat_timeavg
123
	data = ds.data.values
124
	lons = ds.longitude.values
125
	nts = ds.time.size
126
	# "manual" result
127
	data_avg = 0.5 / nts * (data[:, ::2, :] + data[:, 1::2, :]).sum(axis=0)
128
	lons_avg = 0.5 / nts * (lons[:, ::2] + lons[:, 1::2]).sum(axis=0)
129
	# binning function result
130
	dst_nw = bin_lat_timeavg(ds, bins=np.r_[-90:91:20], area_weighted=False)
131
	np.testing.assert_allclose(dst_nw.data.values, data_avg)
132
	np.testing.assert_allclose(dst_nw.longitude.values, lons_avg)
133
	assert dst_nw.data.attrs["units"] == "fir mfur μftn$^{-2}$ fth"
134
	assert dst_nw.longitude.attrs["units"] == "degrees_east"
135
136
137
DATADIR = os.path.join(".", "tests", "data", "l2")
138
IFILE = os.path.join(
139
	DATADIR,
140
	"000NO_orbit_41454_20100203_Dichten.txt",
141
)
142
143
144
def _assert_class_equal(l, r):
145
	for _k, _l in l.__dict__.items():
146
		_r = r.__dict__[_k]
147
		assert np.all(_l == _r), (_k, _l, _r)
148
149
150 View Code Duplication
@pytest.mark.parametrize(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
151
	"dirname, version",
152
	[["level2_v1.2.3", "1.2.3"], ["foo", None]],
153
)
154
def test_level2_round_trip_nc(tmpdir, dirname, version):
155
	from sciapy.level2.density import scia_densities
156
	odir = os.path.join(tmpdir, dirname)
157
	if not os.path.exists(odir):
158
		os.makedirs(odir)
159
	obase = os.path.join(odir, os.path.basename(IFILE))
160
	ofnc = obase + ".nc"
161
	l2_o = scia_densities(data_ver=version)
162
	l2_o.read_from_file(IFILE)
163
	l2_o.write_to_netcdf(ofnc)
164
	l2_t = scia_densities()
165
	l2_t.read_from_netcdf(ofnc)
166
	_assert_class_equal(l2_o, l2_t)
167
168
169 View Code Duplication
@pytest.mark.parametrize(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
170
	"dirname, version",
171
	[["level2_v1.2.3", "1.2.3"], ["foo", None]],
172
)
173
def test_level2_round_trip_txt(tmpdir, dirname, version):
174
	from sciapy.level2.density import scia_densities
175
	odir = os.path.join(tmpdir, dirname)
176
	if not os.path.exists(odir):
177
		os.makedirs(odir)
178
	obase = os.path.join(odir, os.path.basename(IFILE))
179
	oftxt = obase + ".txt"
180
	l2_o = scia_densities(data_ver=version)
181
	l2_o.read_from_file(IFILE)
182
	l2_o.write_to_textfile(oftxt)
183
	l2_t = scia_densities()
184
	l2_t.read_from_textfile(oftxt)
185
	_assert_class_equal(l2_o, l2_t)
186
187
188 View Code Duplication
@pytest.mark.parametrize(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
189
	"version",
190
	["0.1", "0.8", "0.9"],
191
)
192
def test_oldver_round_trip_txt(tmpdir, version):
193
	from sciapy.level2.density import scia_densities
194
	odir = os.path.join(tmpdir, "level2_v{0}".format(version))
195
	if not os.path.exists(odir):
196
		os.makedirs(odir)
197
	obase = os.path.join(odir, os.path.basename(IFILE))
198
	oftxt = obase + ".txt"
199
	l2_o = scia_densities(data_ver=version)
200
	l2_o.read_from_file(IFILE[:-4] + "_v{0}.txt".format(version))
201
	l2_o.write_to_textfile(oftxt)
202
	l2_t = scia_densities()
203
	l2_t.read_from_textfile(oftxt)
204
	_assert_class_equal(l2_o, l2_t)
205
206
207 View Code Duplication
@pytest.mark.parametrize(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
	"version",
209
	["0.1", "0.8", "0.9"],
210
)
211
def test_oldver_round_trip_nc(tmpdir, version):
212
	from sciapy.level2.density import scia_densities
213
	odir = os.path.join(tmpdir, "level2_v{0}".format(version))
214
	if not os.path.exists(odir):
215
		os.makedirs(odir)
216
	obase = os.path.join(odir, os.path.basename(IFILE))
217
	ofnc = obase + ".nc"
218
	l2_o = scia_densities(data_ver=version)
219
	l2_o.read_from_file(IFILE[:-4] + "_v{0}.txt".format(version))
220
	l2_o.write_to_netcdf(ofnc)
221
	l2_t = scia_densities()
222
	l2_t.read_from_netcdf(ofnc)
223
	_assert_class_equal(l2_o, l2_t)
224