1 | # -*- coding: utf-8 -*- |
||
2 | # vim:fileencoding=utf-8 |
||
3 | # |
||
4 | # Copyright (c) 2018 Stefan Bender |
||
5 | # |
||
6 | # This module is part of sciapy. |
||
7 | # sciapy is free software: you can redistribute it or modify |
||
8 | # it under the terms of the GNU General Public License as published |
||
9 | # by the Free Software Foundation, version 2. |
||
10 | # See accompanying LICENSE file or http://www.gnu.org/licenses/gpl-2.0.html. |
||
11 | """SCIAMACHY level 2 tests |
||
12 | """ |
||
13 | import os |
||
14 | |||
15 | import numpy as np |
||
16 | import pytest |
||
17 | import xarray as xr |
||
18 | |||
19 | import sciapy.level2 |
||
20 | |||
21 | |||
22 | def test_module_structure(): |
||
23 | assert sciapy.level2 |
||
24 | assert sciapy.level2.binning |
||
25 | assert sciapy.level2.binning.bin_lat_timeavg |
||
26 | assert sciapy.level2.density |
||
27 | assert sciapy.level2.density.scia_densities |
||
28 | |||
29 | |||
30 | @pytest.fixture(scope="module") |
||
31 | def ds(): |
||
32 | alts = np.r_[80:121:20.] |
||
33 | lats = np.r_[-85:86:10.] |
||
34 | lons = np.r_[15:186:10.] |
||
35 | ts = np.linspace(3686, 3686.5, 2) |
||
36 | lons_2d = np.vstack([lons + i * 180. for i in range(ts.size)]) |
||
37 | data = np.arange(ts.size * lats.size * alts.size).reshape( |
||
38 | (ts.size, lats.size, alts.size) |
||
39 | ) |
||
40 | ds = xr.Dataset( |
||
41 | { |
||
42 | "data": ( |
||
43 | ["time", "latitude", "altitude"], |
||
44 | data, |
||
45 | {"units": "fir mfur μftn$^{-2}$ fth"} |
||
46 | ), |
||
47 | "longitude": ( |
||
48 | ["time", "latitude"], |
||
49 | lons_2d, |
||
50 | {"units": "degrees_east"} |
||
51 | ), |
||
52 | }, |
||
53 | coords={ |
||
54 | "latitude": lats, |
||
55 | "altitude": alts, |
||
56 | "time": ( |
||
57 | ["time"], ts, {"units": "days since 2000-01-01 00:00:00+00:00"} |
||
58 | ), |
||
59 | }, |
||
60 | ) |
||
61 | return ds |
||
62 | |||
63 | |||
64 | def test__bin_stats(ds): |
||
65 | from sciapy.level2.binning import _bin_stats |
||
66 | _ds = ds.copy() |
||
67 | _ds["latitude"] = xr.zeros_like(_ds.latitude) |
||
68 | # binning result |
||
69 | avg_aw = _bin_stats( |
||
70 | _ds, |
||
71 | binvar="latitude", tvar="time", |
||
72 | area_weighted=True, |
||
73 | ) |
||
74 | avg_nw = _bin_stats( |
||
75 | _ds, |
||
76 | binvar="latitude", tvar="time", |
||
77 | area_weighted=False, |
||
78 | ) |
||
79 | xr.testing.assert_allclose(avg_nw, avg_aw) |
||
80 | |||
81 | # non-weighted mean using standard functions |
||
82 | dims = ("latitude", "time") |
||
83 | stacked = "__stacked__" |
||
84 | _ds = _ds.stack(**{stacked: dims}) |
||
85 | ds_avg = _ds.mean(dim=stacked) |
||
86 | ds_cnt = _ds.count(dim=stacked) |
||
87 | ds_std = _ds.std(dim=stacked, ddof=1) |
||
88 | ds_std = ds_std.rename({v: v + "_std" for v in ds_std.data_vars}) |
||
89 | ds_cnt = ds_cnt.rename({v: v + "_cnt" for v in ds_cnt.data_vars}) |
||
90 | avg_ds = xr.merge([ds_avg, ds_std, ds_cnt]) |
||
91 | # Re-create the sum of squared weights |
||
92 | _ws = xr.ones_like(_ds.latitude, dtype=float) |
||
93 | _ws /= _ws.sum(dim=stacked) |
||
94 | avg_ds["wsqsum"] = (_ws**2).sum(dim=stacked) |
||
95 | xr.testing.assert_allclose(avg_nw, avg_ds) |
||
96 | |||
97 | |||
98 | def test_binning_aw(ds): |
||
99 | from sciapy.level2.binning import bin_lat_timeavg |
||
100 | data = ds.data.values |
||
101 | lons = ds.longitude.values |
||
102 | nts = ds.time.size |
||
103 | wgts = np.cos(np.radians(ds.latitude.values)) |
||
104 | wgtbs = wgts[::2] + wgts[1::2] |
||
105 | # divide each by the bin sum |
||
106 | wgts[::2] /= wgtbs |
||
107 | wgts[1::2] /= wgtbs |
||
108 | data = data * wgts[None, :, None] |
||
109 | lons = lons * wgts[None, :] |
||
110 | # "manual" result |
||
111 | data_avg = 1.0 / nts * (data[:, ::2, :] + data[:, 1::2, :]).sum(axis=0) |
||
112 | lons_avg = 1.0 / nts * (lons[:, ::2] + lons[:, 1::2]).sum(axis=0) |
||
113 | # binning function result |
||
114 | dst_nw = bin_lat_timeavg(ds, bins=np.r_[-90:91:20]) |
||
115 | np.testing.assert_allclose(dst_nw.data.values, data_avg) |
||
116 | np.testing.assert_allclose(dst_nw.longitude.values, lons_avg) |
||
117 | assert dst_nw.data.attrs["units"] == "fir mfur μftn$^{-2}$ fth" |
||
118 | assert dst_nw.longitude.attrs["units"] == "degrees_east" |
||
119 | |||
120 | |||
121 | def test_binning_nw(ds): |
||
122 | from sciapy.level2.binning import bin_lat_timeavg |
||
123 | data = ds.data.values |
||
124 | lons = ds.longitude.values |
||
125 | nts = ds.time.size |
||
126 | # "manual" result |
||
127 | data_avg = 0.5 / nts * (data[:, ::2, :] + data[:, 1::2, :]).sum(axis=0) |
||
128 | lons_avg = 0.5 / nts * (lons[:, ::2] + lons[:, 1::2]).sum(axis=0) |
||
129 | # binning function result |
||
130 | dst_nw = bin_lat_timeavg(ds, bins=np.r_[-90:91:20], area_weighted=False) |
||
131 | np.testing.assert_allclose(dst_nw.data.values, data_avg) |
||
132 | np.testing.assert_allclose(dst_nw.longitude.values, lons_avg) |
||
133 | assert dst_nw.data.attrs["units"] == "fir mfur μftn$^{-2}$ fth" |
||
134 | assert dst_nw.longitude.attrs["units"] == "degrees_east" |
||
135 | |||
136 | |||
137 | DATADIR = os.path.join(".", "tests", "data", "l2") |
||
138 | IFILE = os.path.join( |
||
139 | DATADIR, |
||
140 | "000NO_orbit_41454_20100203_Dichten.txt", |
||
141 | ) |
||
142 | |||
143 | |||
144 | def _assert_class_equal(l, r): |
||
145 | for _k, _l in l.__dict__.items(): |
||
146 | _r = r.__dict__[_k] |
||
147 | assert np.all(_l == _r), (_k, _l, _r) |
||
148 | |||
149 | |||
150 | View Code Duplication | @pytest.mark.parametrize( |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
151 | "dirname, version", |
||
152 | [["level2_v1.2.3", "1.2.3"], ["foo", None]], |
||
153 | ) |
||
154 | def test_level2_round_trip_nc(tmpdir, dirname, version): |
||
155 | from sciapy.level2.density import scia_densities |
||
156 | odir = os.path.join(tmpdir, dirname) |
||
157 | if not os.path.exists(odir): |
||
158 | os.makedirs(odir) |
||
159 | obase = os.path.join(odir, os.path.basename(IFILE)) |
||
160 | ofnc = obase + ".nc" |
||
161 | l2_o = scia_densities(data_ver=version) |
||
162 | l2_o.read_from_file(IFILE) |
||
163 | l2_o.write_to_netcdf(ofnc) |
||
164 | l2_t = scia_densities() |
||
165 | l2_t.read_from_netcdf(ofnc) |
||
166 | _assert_class_equal(l2_o, l2_t) |
||
167 | |||
168 | |||
169 | View Code Duplication | @pytest.mark.parametrize( |
|
0 ignored issues
–
show
|
|||
170 | "dirname, version", |
||
171 | [["level2_v1.2.3", "1.2.3"], ["foo", None]], |
||
172 | ) |
||
173 | def test_level2_round_trip_txt(tmpdir, dirname, version): |
||
174 | from sciapy.level2.density import scia_densities |
||
175 | odir = os.path.join(tmpdir, dirname) |
||
176 | if not os.path.exists(odir): |
||
177 | os.makedirs(odir) |
||
178 | obase = os.path.join(odir, os.path.basename(IFILE)) |
||
179 | oftxt = obase + ".txt" |
||
180 | l2_o = scia_densities(data_ver=version) |
||
181 | l2_o.read_from_file(IFILE) |
||
182 | l2_o.write_to_textfile(oftxt) |
||
183 | l2_t = scia_densities() |
||
184 | l2_t.read_from_textfile(oftxt) |
||
185 | _assert_class_equal(l2_o, l2_t) |
||
186 | |||
187 | |||
188 | View Code Duplication | @pytest.mark.parametrize( |
|
0 ignored issues
–
show
|
|||
189 | "version", |
||
190 | ["0.1", "0.8", "0.9"], |
||
191 | ) |
||
192 | def test_oldver_round_trip_txt(tmpdir, version): |
||
193 | from sciapy.level2.density import scia_densities |
||
194 | odir = os.path.join(tmpdir, "level2_v{0}".format(version)) |
||
195 | if not os.path.exists(odir): |
||
196 | os.makedirs(odir) |
||
197 | obase = os.path.join(odir, os.path.basename(IFILE)) |
||
198 | oftxt = obase + ".txt" |
||
199 | l2_o = scia_densities(data_ver=version) |
||
200 | l2_o.read_from_file(IFILE[:-4] + "_v{0}.txt".format(version)) |
||
201 | l2_o.write_to_textfile(oftxt) |
||
202 | l2_t = scia_densities() |
||
203 | l2_t.read_from_textfile(oftxt) |
||
204 | _assert_class_equal(l2_o, l2_t) |
||
205 | |||
206 | |||
207 | View Code Duplication | @pytest.mark.parametrize( |
|
0 ignored issues
–
show
|
|||
208 | "version", |
||
209 | ["0.1", "0.8", "0.9"], |
||
210 | ) |
||
211 | def test_oldver_round_trip_nc(tmpdir, version): |
||
212 | from sciapy.level2.density import scia_densities |
||
213 | odir = os.path.join(tmpdir, "level2_v{0}".format(version)) |
||
214 | if not os.path.exists(odir): |
||
215 | os.makedirs(odir) |
||
216 | obase = os.path.join(odir, os.path.basename(IFILE)) |
||
217 | ofnc = obase + ".nc" |
||
218 | l2_o = scia_densities(data_ver=version) |
||
219 | l2_o.read_from_file(IFILE[:-4] + "_v{0}.txt".format(version)) |
||
220 | l2_o.write_to_netcdf(ofnc) |
||
221 | l2_t = scia_densities() |
||
222 | l2_t.read_from_netcdf(ofnc) |
||
223 | _assert_class_equal(l2_o, l2_t) |
||
224 |