Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

household_prognosis_per_year()   A

Complexity

Conditions 3

Size

Total Lines 38
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 25
nop 3
dl 0
loc 38
rs 9.28
c 0
b 0
f 0
1
"""The central module containing all code dealing with processing and
2
forecast Zensus data.
3
"""
4
5
from sqlalchemy import Column, Float, Integer
6
from sqlalchemy.ext.declarative import declarative_base
7
import numpy as np
8
import pandas as pd
9
10
from egon.data import db
11
from egon.data.datasets import Dataset
12
import egon.data.config
13
14
from egon_validation import(
15
    RowCountValidation,
16
    DataTypeValidation,
17
    NotNullAndNotNaNValidation,
18
    WholeTableNotNullAndNotNaNValidation
19
)
20
21
# will be later imported from another file ###
22
Base = declarative_base()
23
24
25
class SocietyPrognosis(Dataset):
26
    def __init__(self, dependencies):
27
        super().__init__(
28
            name="SocietyPrognosis",
29
            version="0.0.1",
30
            dependencies=dependencies,
31
            tasks=(create_tables, {zensus_population, zensus_household}),
32
            validation={
33
                "data-quality":[
34
                    RowCountValidation(
35
                        table="society.egon_household_prognosis",
36
                        rule_id="TEST_ROW_COUNT.egon_household_prognosis",
37
                        expected_count={"Everything": 5319490}
38
                    ),
39
                    DataTypeValidation(
40
                        table="society.egon_household_prognosis",
41
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_household_prognosis",
42
                        column_types={"zensus_population_id": "integer", "year": "integer", "households": "double precision"}
43
                    ),
44
                    NotNullAndNotNaNValidation(
45
                        table="society.egon_household_prognosis",
46
                        rule_id="TEST_NOT_NAN.egon_household_prognosis",
47
                        columns=["zensus_population_id", "year", "households"]
48
                    ),
49
                    WholeTableNotNullAndNotNaNValidation(
50
                        table="society.egon_household_prognosis",
51
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_household_prognosis"
52
                    ),
53
                    RowCountValidation(
54
                        table="society.egon_population_prognosis",
55
                        rule_id="TEST_ROW_COUNT.egon_population_prognosis",
56
                        expected_count={"Everything": 6355446}
57
                    ),
58
                    DataTypeValidation(
59
                        table="society.egon_population_prognosis",
60
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_population_prognosis",
61
                        column_types={"zensus_population_id": "integer", "year": "integer",
62
                                      "population": "double precision"}
63
                    ),
64
                    NotNullAndNotNaNValidation(
65
                        table="society.egon_population_prognosis",
66
                        rule_id="TEST_NOT_NAN.egon_population_prognosis",
67
                        columns=["zensus_population_id", "year", "population"]
68
                    ),
69
                    WholeTableNotNullAndNotNaNValidation(
70
                        table="society.egon_population_prognosis",
71
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_population_prognosis"
72
                    ),
73
                ]
74
            },
75
            on_validation_failure="continue"
76
        )
77
78
79
class EgonPopulationPrognosis(Base):
80
    __tablename__ = "egon_population_prognosis"
81
    __table_args__ = {"schema": "society"}
82
    zensus_population_id = Column(Integer, primary_key=True)
83
    year = Column(Integer, primary_key=True)
84
    population = Column(Float)
85
86
87
class EgonHouseholdPrognosis(Base):
88
    __tablename__ = "egon_household_prognosis"
89
    __table_args__ = {"schema": "society"}
90
    zensus_population_id = Column(Integer, primary_key=True)
91
    year = Column(Integer, primary_key=True)
92
    households = Column(Float)
93
94
95
def create_tables():
96
    """Create table to map zensus grid and administrative districts (nuts3)"""
97
    engine = db.engine()
98
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
99
    EgonPopulationPrognosis.__table__.create(bind=engine, checkfirst=True)
100
    EgonHouseholdPrognosis.__table__.create(bind=engine, checkfirst=True)
101
102
103
def zensus_population():
104
    """Bring population prognosis from DemandRegio to Zensus grid"""
105
106
    cfg = egon.data.config.datasets()["society_prognosis"]
107
108
    local_engine = db.engine()
109
110
    # Input: Zensus2011 population data including the NUTS3-Code
111
    zensus_district = db.select_dataframe(
112
        f"""SELECT zensus_population_id, vg250_nuts3
113
        FROM {cfg['soucres']['map_zensus_vg250']['schema']}.
114
        {cfg['soucres']['map_zensus_vg250']['table']}
115
        WHERE zensus_population_id IN (
116
            SELECT id
117
        FROM {cfg['soucres']['zensus_population']['schema']}.
118
        {cfg['soucres']['zensus_population']['table']})""",
119
        index_col="zensus_population_id",
120
    )
121
122
    zensus = db.select_dataframe(
123
        f"""SELECT id, population
124
        FROM {cfg['soucres']['zensus_population']['schema']}.
125
        {cfg['soucres']['zensus_population']['table']}
126
        WHERE population > 0""",
127
        index_col="id",
128
    )
129
130
    zensus["nuts3"] = zensus_district.vg250_nuts3
131
132
    # Rename index
133
    zensus.index = zensus.index.rename("zensus_population_id")
134
135
    # Replace population value of uninhabited cells for calculation
136
    zensus.population = zensus.population.replace(-1, 0)
137
138
    # Calculate share of population per cell in nuts3-region
139
    zensus["share"] = (
140
        zensus.groupby(zensus.nuts3)
141
        .population.apply(lambda grp: grp / grp.sum())
142
        .fillna(0)
143
    ).values
144
145
    db.execute_sql(
146
        f"""DELETE FROM {cfg['target']['population_prognosis']['schema']}.
147
        {cfg['target']['population_prognosis']['table']}"""
148
    )
149
    # Scale to pogosis values from demandregio
150
    for year in [2035, 2050]:
151
        # Input: dataset on population prognosis on district-level (NUTS3)
152
        prognosis = db.select_dataframe(
153
            f"""SELECT nuts3, population
154
            FROM {cfg['soucres']['demandregio_population']['schema']}.
155
            {cfg['soucres']['demandregio_population']['table']}
156
            WHERE year={year}""",
157
            index_col="nuts3",
158
        )
159
160
        df = pd.DataFrame(
161
            zensus["share"]
162
            .mul(prognosis.population[zensus["nuts3"]].values)
163
            .replace(0, -1)
164
        ).rename({"share": "population"}, axis=1)
165
166
        df["year"] = year
167
168
        # Insert to database
169
        df.to_sql(
170
            cfg["target"]["population_prognosis"]["table"],
171
            schema=cfg["target"]["population_prognosis"]["schema"],
172
            con=local_engine,
173
            if_exists="append",
174
        )
175
176
177
def household_prognosis_per_year(prognosis_nuts3, zensus, year):
178
    """Calculate household prognosis for a specitic year"""
179
180
    prognosis_total = prognosis_nuts3.groupby(
181
        prognosis_nuts3.index
182
    ).households.sum()
183
184
    prognosis = pd.DataFrame(index=zensus.index)
185
    prognosis["nuts3"] = zensus.nuts3
186
    prognosis["quantity"] = zensus["share"].mul(
187
        prognosis_total[zensus["nuts3"]].values
188
    )
189
    prognosis["rounded"] = prognosis["quantity"].astype(int)
190
    prognosis["rest"] = prognosis["quantity"] - prognosis["rounded"]
191
192
    # Set seed for reproducibility
193
    np.random.seed(
194
        seed=egon.data.config.settings()["egon-data"]["--random-seed"]
195
    )
196
197
    # Rounding process to meet exact values from demandregio on nuts3-level
198
    for name, group in prognosis.groupby(prognosis.nuts3):
199
        print(f"start progosis nuts3 {name}")
200
        while prognosis_total[name] > group["rounded"].sum():
201
            index = np.random.choice(
202
                group["rest"].index.values[group["rest"] == max(group["rest"])]
203
            )
204
            group.at[index, "rounded"] += 1
205
            group.at[index, "rest"] = 0
206
        print(f"finished progosis nuts3 {name}")
207
        prognosis[prognosis.index.isin(group.index)] = group
208
209
    prognosis = prognosis.drop(["nuts3", "quantity", "rest"], axis=1).rename(
210
        {"rounded": "households"}, axis=1
211
    )
212
    prognosis["year"] = year
213
214
    return prognosis
215
216
217
def zensus_household():
218
    """Bring household prognosis from DemandRegio to Zensus grid"""
219
    cfg = egon.data.config.datasets()["society_prognosis"]
220
221
    local_engine = db.engine()
222
223
    # Input: Zensus2011 household data including the NUTS3-Code
224
    district = db.select_dataframe(
225
        f"""SELECT zensus_population_id, vg250_nuts3
226
        FROM {cfg['soucres']['map_zensus_vg250']['schema']}.
227
        {cfg['soucres']['map_zensus_vg250']['table']}""",
228
        index_col="zensus_population_id",
229
    )
230
231
    zensus = db.select_dataframe(
232
        f"""SELECT zensus_population_id, quantity
233
        FROM {cfg['soucres']['zensus_households']['schema']}.
234
        {cfg['soucres']['zensus_households']['table']}""",
235
        index_col="zensus_population_id",
236
    )
237
238
    # Group all household types
239
    zensus = zensus.groupby(zensus.index).sum()
240
241
    zensus["nuts3"] = district.loc[zensus.index, "vg250_nuts3"]
242
243
    # Calculate share of households per nuts3 region in each zensus cell
244
    zensus["share"] = (
245
        zensus.groupby(zensus.nuts3)
246
        .quantity.apply(lambda grp: grp / grp.sum())
247
        .fillna(0)
248
        .values
249
    )
250
251
    db.execute_sql(
252
        f"""DELETE FROM {cfg['target']['household_prognosis']['schema']}.
253
        {cfg['target']['household_prognosis']['table']}"""
254
    )
255
256
    # Apply prognosis function
257
    for year in [2035, 2050]:
258
        print(f"start prognosis for year {year}")
259
        # Input: dataset on household prognosis on district-level (NUTS3)
260
        prognosis_nuts3 = db.select_dataframe(
261
            f"""SELECT nuts3, hh_size, households
262
            FROM {cfg['soucres']['demandregio_households']['schema']}.
263
            {cfg['soucres']['demandregio_households']['table']}
264
            WHERE year={year}""",
265
            index_col="nuts3",
266
        )
267
268
        # Insert into database
269
        household_prognosis_per_year(prognosis_nuts3, zensus, year).to_sql(
270
            cfg["target"]["household_prognosis"]["table"],
271
            schema=cfg["target"]["household_prognosis"]["schema"],
272
            con=local_engine,
273
            if_exists="append",
274
        )
275
        print(f"finished prognosis for year {year}")
276