scia_daily_zonal_mean   A
last analyzed

Complexity

Total Complexity 0

Size/Duplication

Total Lines 215
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 151
dl 0
loc 215
rs 10
c 0
b 0
f 0
wmc 0
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# vim:fileencoding=utf-8
4
#
5
# Copyright (c) 2017-2018 Stefan Bender
6
#
7
# This file is part of sciapy.
8
# sciapy is free software: you can redistribute it or modify it
9
# under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, version 2.
11
# See accompanying LICENSE file or http://www.gnu.org/licenses/gpl-2.0.html.
12
"""SCIAMACHY level 2 binning
13
14
Command line interface to SCIAMACHY level 2 data binning,
15
both in time (daily) and (geomagnetic) latitude.
16
"""
17
18
import argparse as ap
19
import datetime as dt
20
import logging
21
from os import path
22
23
import numpy as np
24
import pandas as pd
25
import xarray as xr
26
try:
27
	from dask import compute, delayed
28
	from dask.distributed import Client
29
except ImportError:
30
	delayed = None
31
32
from sciapy.level2.binning import bin_lat_timeavg
33
34
# non-sensible variables to drop
35
_drop_vars = ["NO_RSTD_cnt", "NO_RSTD_std"]
36
37
38
if __name__ == "__main__":
39
	logging.basicConfig(level=logging.WARNING,
40
		format="[%(levelname)-8s] (%(asctime)s) %(filename)s:%(lineno)d %(message)s",
41
		datefmt="%Y-%m-%d %H:%M:%S %z")
42
43
	parser = ap.ArgumentParser()
44
	parser.add_argument("file", default="SCIA_NO.nc",
45
			help="the filename of the input netcdf file")
46
	parser.add_argument("-a", "--area_weighted",
47
			action="store_true", default=True,
48
			help="calculate the area-weighted mean within the bins")
49
	parser.add_argument("-u", "--unweighted",
50
			dest="area_weighted", action="store_false",
51
			help="calculate the equally weighted mean within the bins")
52
	parser.add_argument("-g", "--geomagnetic",
53
			dest="geomag", action="store_true", default=False,
54
			help="bin according to geomagnetic latitude instead of "
55
			"geographic latitude (turns off area weighting). "
56
			"(default: %(default)s)")
57
	parser.add_argument("-G", "--bin_var", type=str, default=None,
58
			help="bin according to the variable given instead of "
59
				"geographic latitude (turns off area weighting).")
60
	parser.add_argument("-b", "--bins", metavar="START:END:SIZE",
61
			default="-90:90:5",
62
			help="bins from START to END (inclusive both) with into SIZE sized bins "
63
			"(default: %(default)s)")
64
	parser.add_argument("-B", "--binn", metavar="START:END:NUM",
65
			default=None,
66
			help="bins from START to END (inclusive both) into NUM bins "
67
			"(default: not used)")
68
	parser.add_argument("-m", "--mlt", action="store_true", default=False,
69
			help="indicate whether to deal with nominal or MLT data (default: False)")
70
	parser.add_argument("-o", "--output", help="filename of the output file")
71
	parser.add_argument("-t", "--akm_threshold", type=float, default=0.002,
72
			help="the averaging kernel diagonal element threshold "
73
			"for the mask calculation "
74
			"(default: %(default)s)")
75
	parser.add_argument("-j", "--jobs", metavar="N", type=int, default=1,
76
			help="Use N parallel threads for binning "
77
			"(default: %(default)s)")
78
	loglevels = parser.add_mutually_exclusive_group()
79
	loglevels.add_argument("-l", "--loglevel", default="WARNING",
80
			choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
81
			help="change the loglevel "
82
			"(default: %(default)s)")
83
	loglevels.add_argument("-q", "--quiet", action="store_true", default=False,
84
			help="less output, same as --loglevel=ERROR "
85
			"(default: %(default)s)")
86
	loglevels.add_argument("-v", "--verbose", action="store_true", default=False,
87
			help="verbose output, same as --loglevel=INFO "
88
			"(default: %(default)s)")
89
	args = parser.parse_args()
90
	if args.quiet:
91
		logging.getLogger().setLevel(logging.ERROR)
92
	elif args.verbose:
93
		logging.getLogger().setLevel(logging.INFO)
94
	else:
95
		logging.getLogger().setLevel(args.loglevel)
96
97
	orbit_filename = args.file
98
99
	# geomagnetic/geographic setup
100
	if args.geomag:
101
		logging.debug("using default geomagnetic latitudes")
102
		binvar = "gm_lats"
103
		args.area_weighted = False
104
	elif args.bin_var is not None:
105
		logging.debug("using custom latitude variable")
106
		binvar = args.bin_var
107
		args.area_weighted = False
108
	else:
109
		logging.debug("using default geographic latitudes")
110
		binvar = "latitude"
111
	logging.info("binning according to \"%s\"", binvar)
112
	lats_rename_dict = {"{0}_bins".format(binvar): "latitude"}
113
114
	if args.area_weighted:
115
		logging.info("area weighted bins")
116
	else:
117
		logging.info("equally weighted bins")
118
119
	if args.binn is None:
120
		bin0, bin1, binstep = list(map(float, args.bins.split(':')))
121
		bins = np.r_[bin0:bin1 + 0.5 * binstep:binstep]
122
	else:
123
		bin0, bin1, binnum = list(map(float, args.binn.split(':')))
124
		bins = np.linspace(bin0, bin1, binnum + 1)
125
		binstep = bins.diff[0]
126
	logging.debug("using %s deg sized bins: %s", binstep, bins)
127
128
	if args.output is None:
129
		output = ("scia_dzm_{0}_akm{1:.3f}_{2}{3:.0f}_{4}.nc"
130
				.format("".join(c if c.isalnum() else '_'
131
								for c in path.basename(orbit_filename[:-3])),
132
					args.akm_threshold,
133
					"geomag" if args.geomag else "geogra",
134
					binstep,
135
					"aw" if args.area_weighted else "nw"))
136
	else:
137
		output = args.output
138
	logging.info("saving to: %s", output)
139
140
	ds = xr.open_mfdataset(orbit_filename, decode_times=False,
141
			chunks={"time": 820, "latitude": 18, "altitude": 17})
142
	ds["longitude"].values = ds.longitude.values % 360.
143
144
	if args.mlt:
145
		# construct the time (day) bin edges from jumps in the time variable
146
		# works reliably only for the MLT data
147
		time_rename_dict = {"time_bins": "time"}
148
		tbin_edges = np.concatenate([[ds.time.values[0] - 0.5],
149
			ds.time.values[np.where(np.diff(ds.time) > 1)] + 0.01,
150
			[ds.time.values[-1] + 0.5]])
151
		tbin_labels = ds.time.groupby_bins("time", tbin_edges).mean("time").values
152
		ds_bins_daily_gb = ds.groupby_bins("time", tbin_edges, labels=tbin_labels)
153
	else:
154
		time_rename_dict = {"date": "time"}
155
		ds["time"] = xr.conventions.decode_cf_variable("time", ds.time)
156
		# ds.groupby("time.date") does not work anymore :(
157
		ds_bins_daily_gb = ds.groupby(
158
				xr.DataArray(
159
					pd.to_datetime(pd.DatetimeIndex(ds.time.data).date),
160
					coords=[ds.time], dims=["time"], name="date"))
161
162
	if args.jobs > 1 and delayed is not None:
163
		# use dask.delayed and dask.compute to distribute the binning
164
		logging.info("multi-threaded binning with dask using %s threads",
165
					args.jobs)
166
		binned = (delayed(bin_lat_timeavg)(
167
						ds, binvar=binvar,
168
						bins=bins, area_weighted=args.area_weighted)
169
					for _, ds in iter(ds_bins_daily_gb))
170
		client = Client()
171
		logging.info("dask.distributed client: %s", client)
172
		ds_bins_daily = (ds_bins_daily_gb
173
				._combine(compute(*binned, num_workers=args.jobs))
174
				.rename(lats_rename_dict)
175
				.drop(_drop_vars))
176
	else:
177
		# serial binning with .apply()
178
		logging.info("single-threaded binning")
179
		ds_bins_daily = (ds_bins_daily_gb
180
				.apply(bin_lat_timeavg,
181
					binvar=binvar, bins=bins,
182
					area_weighted=args.area_weighted)
183
				.rename(lats_rename_dict)
184
				.drop(_drop_vars))
185
	logging.info("finished binning.")
186
	del ds_bins_daily_gb
187
188
	ds_bins_daily = ds_bins_daily.rename(time_rename_dict)
189
	ds_bins_daily["time"].attrs = ds["time"].attrs
190
	ds_bins_daily["time"].attrs.update(
191
		{"axis": "T", "long_name": "measurement date"})
192
193
	# construct tha mask from the averaging kernel diagonal elements
194
	ds_bins_daily["NO_MASK"] = (ds_bins_daily.NO_AKDIAG < args.akm_threshold)
195
	ds_bins_daily["NO_MASK"].attrs = {"long_name": "density mask", "units": "1"}
196
197
	# copy coordinate attributes
198
	# "time" was already set above
199
	for var in filter(lambda c: c != "time", ds.coords):
200
		logging.debug("copying coordinate attributes for: %s", var)
201
		ds_bins_daily[var].attrs = ds[var].attrs
202
	if args.geomag:
203
		ds_bins_daily["latitude"].attrs.update(
204
			{"long_name": "geomagnetic_latitude"})
205
	# copy global attributes
206
	ds_bins_daily.attrs = ds.attrs
207
	# note binning time
208
	ds_bins_daily.attrs["binned_on"] = (dt.datetime.utcnow()
209
			.replace(tzinfo=dt.timezone.utc)
210
			.strftime("%a %b %d %Y %H:%M:%S %Z"))
211
	ds_bins_daily.attrs["latitude_bin_type"] = \
212
		"geomagnetic" if args.geomag else "geographic"
213
214
	ds_bins_daily.to_netcdf(output, unlimited_dims=["time"])
215