|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
# vim:fileencoding=utf-8 |
|
3
|
|
|
# |
|
4
|
|
|
# Copyright (c) 2022 Stefan Bender |
|
5
|
|
|
# |
|
6
|
|
|
# This module is part of pyspaceweather. |
|
7
|
|
|
# pyspaceweather is free software: you can redistribute it or modify |
|
8
|
|
|
# it under the terms of the GNU General Public License as published |
|
9
|
|
|
# by the Free Software Foundation, version 2. |
|
10
|
|
|
# See accompanying COPYING.GPLv2 file or http://www.gnu.org/licenses/gpl-2.0.html. |
|
11
|
|
|
"""OMNI data read tests |
|
12
|
|
|
""" |
|
13
|
|
|
import os |
|
14
|
|
|
import requests |
|
15
|
|
|
|
|
16
|
|
|
import numpy as np |
|
17
|
|
|
import pandas as pd |
|
18
|
|
|
|
|
19
|
|
|
import pytest |
|
20
|
|
|
|
|
21
|
|
|
from spaceweather import cache_omnie, omnie_hourly, omnie_mask_missing, sw_daily |
|
22
|
|
|
from spaceweather.omni import OMNI_URL_BASE, OMNI_PREFIX, OMNI_EXT |
|
23
|
|
|
|
|
24
|
|
|
_TEST_YEAR = 2012 |
|
25
|
|
|
_TEST_FILE = "{0}_{1:04d}.{2}".format(OMNI_PREFIX, _TEST_YEAR, OMNI_EXT) |
|
26
|
|
|
_TEST_URL = os.path.join(OMNI_URL_BASE, _TEST_FILE) |
|
27
|
|
|
_TEST_PATH = os.path.join(".", "tests") |
|
28
|
|
|
|
|
29
|
|
|
|
|
30
|
|
|
def test_cache(mocker, tmpdir): |
|
31
|
|
|
mocker.patch("requests.get") |
|
32
|
|
|
tmpdir = str(tmpdir) |
|
33
|
|
|
# Check non-existent file |
|
34
|
|
|
cache_omnie(year=_TEST_YEAR, local_path=tmpdir) |
|
35
|
|
|
requests.get.assert_called_once_with(_TEST_URL, stream=True) |
|
36
|
|
|
# Check non-existent (sub)dir |
|
37
|
|
|
tmppath = os.path.join(tmpdir, "data") |
|
38
|
|
|
cache_omnie(year=_TEST_YEAR, local_path=tmppath) |
|
39
|
|
|
requests.get.assert_called_with(_TEST_URL, stream=True) |
|
40
|
|
|
|
|
41
|
|
|
|
|
42
|
|
|
def test_auto_update(mocker, tmpdir): |
|
43
|
|
|
# test with non-existent file |
|
44
|
|
|
mocker.patch("requests.get") |
|
45
|
|
|
tmpdir = str(tmpdir) |
|
46
|
|
|
with pytest.raises(IOError): |
|
47
|
|
|
with pytest.warns(UserWarning): |
|
48
|
|
|
omnie_hourly(year=_TEST_YEAR, cache=True, local_path=tmpdir) |
|
49
|
|
|
requests.get.assert_called_once_with(_TEST_URL, stream=True) |
|
50
|
|
|
|
|
51
|
|
|
|
|
52
|
|
|
def test_not_avail(mocker, tmpdir): |
|
53
|
|
|
# test with non-existent file |
|
54
|
|
|
tmpdir = str(tmpdir) |
|
55
|
|
|
with pytest.raises(IOError): |
|
56
|
|
|
with pytest.warns(UserWarning): |
|
57
|
|
|
omnie_hourly(year=_TEST_YEAR, cache=False, local_path=tmpdir) |
|
58
|
|
|
|
|
59
|
|
|
|
|
60
|
|
|
@pytest.mark.parametrize("hour", range(0, 24, 3)) |
|
61
|
|
|
@pytest.mark.parametrize("index", ["Ap", "Kp"]) |
|
62
|
|
|
def test_hourly(hour, index): |
|
63
|
|
|
df1 = omnie_hourly(2000, local_path=_TEST_PATH, prefix="omni2t") |
|
64
|
|
|
# The last row is for the missing value test. |
|
65
|
|
|
df1 = df1.iloc[:-1] |
|
66
|
|
|
df2 = sw_daily() |
|
67
|
|
|
ind_name = "{0}{1}".format(index, hour) |
|
68
|
|
|
df1_ind = df1[df1["hour"] == hour][index] |
|
69
|
|
|
df2_ind = df2[ind_name].loc[df1_ind.index.date].rename(index) |
|
70
|
|
|
df2_ind.index = df1_ind.index |
|
71
|
|
|
pd.testing.assert_series_equal(df1_ind, df2_ind) |
|
72
|
|
|
|
|
73
|
|
|
|
|
74
|
|
|
@pytest.mark.parametrize( |
|
75
|
|
|
"name, result", |
|
76
|
|
|
[ |
|
77
|
|
|
("Ap", np.array([56, 39, 27, 18, 32, 15, 32, 22])), |
|
78
|
|
|
("Kp", np.array([5.3, 4.7, 4.0, 3.3, 4.3, 3.0, 4.3, 3.7])), |
|
79
|
|
|
] |
|
80
|
|
|
) |
|
81
|
|
|
def test_3hourly_index(name, result): |
|
82
|
|
|
df = omnie_hourly(2000, local_path=_TEST_PATH, prefix="omni2t") |
|
83
|
|
|
np.testing.assert_allclose( |
|
84
|
|
|
df.loc[ |
|
85
|
|
|
pd.date_range( |
|
86
|
|
|
"2000-01-01 00:00", "2000-01-01 23:00", freq="3h" |
|
87
|
|
|
) |
|
88
|
|
|
][name].values, |
|
89
|
|
|
result, |
|
90
|
|
|
rtol=1e-12, |
|
91
|
|
|
) |
|
92
|
|
|
|
|
93
|
|
|
|
|
94
|
|
|
def test_mask_missing(): |
|
95
|
|
|
df = omnie_hourly(2000, local_path=_TEST_PATH, prefix="omni2t") |
|
96
|
|
|
dfp = omnie_mask_missing(df) |
|
97
|
|
|
# The last row should contain all NaNs. |
|
98
|
|
|
dfp = dfp.iloc[-1] |
|
99
|
|
|
for v in filter( |
|
100
|
|
|
lambda n: n not in ["year", "doy", "hour"], |
|
101
|
|
|
dfp.index, |
|
102
|
|
|
): |
|
103
|
|
|
assert np.isnan(dfp[v]) |
|
104
|
|
|
|