Passed
Pull Request — dev (#970)
by
unknown
02:05 queued 11s
created

data.datasets.zensus.adjust_zensus_misc()   A

Complexity

Conditions 2

Size

Total Lines 31
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 31
rs 9.95
c 0
b 0
f 0
cc 2
nop 0
1
"""The central module containing all code dealing with importing Zensus data.
2
"""
3
4
from pathlib import Path
5
from urllib.request import urlretrieve
6
import csv
7
import json
8
import os
9
import zipfile
10
11
from shapely.geometry import Point, shape
12
from shapely.prepared import prep
13
import pandas as pd
14
15
from egon.data import db, subprocess
16
from egon.data.config import settings
17
from egon.data.datasets import Dataset
18
import egon.data.config
19
20
21
class ZensusPopulation(Dataset):
22
    def __init__(self, dependencies):
23
        super().__init__(
24
            name="ZensusPopulation",
25
            version="0.0.1",
26
            dependencies=dependencies,
27
            tasks=(
28
                download_zensus_pop,
29
                create_zensus_pop_table,
30
                population_to_postgres,
31
            ),
32
        )
33
34
35
class ZensusMiscellaneous(Dataset):
36
    def __init__(self, dependencies):
37
        super().__init__(
38
            name="ZensusMiscellaneous",
39
            version="0.0.1",
40
            dependencies=dependencies,
41
            tasks=(
42
                download_zensus_misc,
43
                create_zensus_misc_tables,
44
                zensus_misc_to_postgres,
45
            ),
46
        )
47
48
49
def download_and_check(url, target_file, max_iteration=5):
50
    """Download file from url (http) if it doesn't exist and check afterwards.
51
    If bad zip remove file and re-download. Repeat until file is fine or
52
    reached maximum iterations."""
53
    bad_file = True
54
    count = 0
55
    while bad_file:
56
57
        # download file if it doesn't exist
58
        if not os.path.isfile(target_file):
59
            # check if url
60
            if url.lower().startswith("http"):
61
                urlretrieve(url, target_file)
62
            else:
63
                raise ValueError("No http url")
64
65
        # check zipfile
66
        try:
67
            with zipfile.ZipFile(target_file):
68
                print(f"Zip file {target_file} is good.")
69
            bad_file = False
70
        except zipfile.BadZipFile as ex:
71
            os.remove(target_file)
72
            count += 1
73
            if count > max_iteration:
74
                raise StopIteration(
75
                    f"Max iteration of {max_iteration} is exceeded"
76
                ) from ex
77
78
79
def download_zensus_pop():
80
    """Download Zensus csv file on population per hectare grid cell."""
81
    data_config = egon.data.config.datasets()
82
    zensus_population_config = data_config["zensus_population"][
83
        "original_data"
84
    ]
85
    download_directory = Path(".") / "zensus_population"
86
    # Create the folder, if it does not exist already
87
    if not os.path.exists(download_directory):
88
        os.mkdir(download_directory)
89
90
    target_file = (
91
        download_directory / zensus_population_config["target"]["file"]
92
    )
93
94
    url = zensus_population_config["source"]["url"]
95
    download_and_check(url, target_file, max_iteration=5)
96
97
98
def download_zensus_misc():
99
    """Download Zensus csv files on data per hectare grid cell."""
100
101
    # Get data config
102
    data_config = egon.data.config.datasets()
103
    download_directory = Path(".") / "zensus_population"
104
    # Create the folder, if it does not exist already
105
    if not os.path.exists(download_directory):
106
        os.mkdir(download_directory)
107
    # Download remaining zensus data set on households, buildings, apartments
108
109
    zensus_config = data_config["zensus_misc"]["original_data"]
110
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
111
    zensus_url = zensus_config["source"]["url"]
112
    zensus_files = zensus_misc_processed["file_table_map"].keys()
113
    url_path_map = list(zip(zensus_url, zensus_files))
114
115
    for url, path in url_path_map:
116
        target_file_misc = download_directory / path
117
118
        download_and_check(url, target_file_misc, max_iteration=5)
119
120
121
def create_zensus_pop_table():
122
    """Create tables for zensus data in postgres database"""
123
124
    # Get information from data configuration file
125
    data_config = egon.data.config.datasets()
126
    zensus_population_processed = data_config["zensus_population"]["processed"]
127
128
    # Create target schema
129
    db.execute_sql(
130
        f"CREATE SCHEMA IF NOT EXISTS {zensus_population_processed['schema']};"
131
    )
132
133
    # Create table for population data
134
    population_table = (
135
        f"{zensus_population_processed['schema']}"
136
        f".{zensus_population_processed['table']}"
137
    )
138
139
    db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;")
140
141
    db.execute_sql(
142
        f"CREATE TABLE {population_table}"
143
        f""" (id        SERIAL NOT NULL,
144
              grid_id    character varying(254) NOT NULL,
145
              x_mp       int,
146
              y_mp       int,
147
              population smallint,
148
              geom_point geometry(Point,3035),
149
              geom geometry (Polygon, 3035),
150
              CONSTRAINT {zensus_population_processed['table']}_pkey
151
              PRIMARY KEY (id)
152
        );
153
        """
154
    )
155
156
157
def create_zensus_misc_tables():
158
    """Create tables for zensus data in postgres database"""
159
160
    # Get information from data configuration file
161
    data_config = egon.data.config.datasets()
162
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
163
164
    # Create target schema
165
    db.execute_sql(
166
        f"CREATE SCHEMA IF NOT EXISTS {zensus_misc_processed['schema']};"
167
    )
168
169
    # Create tables for household, apartment and building
170
    for table in zensus_misc_processed["file_table_map"].values():
171
        misc_table = f"{zensus_misc_processed['schema']}.{table}"
172
173
        db.execute_sql(f"DROP TABLE IF EXISTS {misc_table} CASCADE;")
174
        db.execute_sql(
175
            f"CREATE TABLE {misc_table}"
176
            f""" (id                 SERIAL,
177
                  grid_id            VARCHAR(50),
178
                  grid_id_new        VARCHAR (50),
179
                  attribute          VARCHAR(50),
180
                  characteristics_code smallint,
181
                  characteristics_text text,
182
                  quantity           smallint,
183
                  quantity_q         smallint,
184
                  zensus_population_id int,
185
                  CONSTRAINT {table}_pkey PRIMARY KEY (id)
186
            );
187
            """
188
        )
189
190
191
def target(source, dataset):
192
    """Generate the target path corresponding to a source path.
193
194
    Parameters
195
    ----------
196
    dataset: str
197
        Toggles between production (`dataset='Everything'`) and test mode e.g.
198
        (`dataset='Schleswig-Holstein'`).
199
        In production mode, data covering entire Germany
200
        is used. In the test mode a subset of this data is used for testing the
201
        workflow.
202
    Returns
203
    -------
204
    Path
205
        Path to target csv-file
206
207
    """
208
    return Path(
209
        os.path.join(Path("."), "zensus_population", source.stem)
210
        + "."
211
        + dataset
212
        + source.suffix
213
    )
214
215
216
def select_geom():
217
    """Select the union of the geometries of Schleswig-Holstein from the
218
    database, convert their projection to the one used in the CSV file,
219
    output the result to stdout as a GeoJSON string and read it into a
220
    prepared shape for filtering.
221
222
    """
223
    docker_db_config = db.credentials()
224
225
    geojson = subprocess.run(
226
        ["ogr2ogr"]
227
        + ["-s_srs", "epsg:4326"]
228
        + ["-t_srs", "epsg:3035"]
229
        + ["-f", "GeoJSON"]
230
        + ["/vsistdout/"]
231
        + [
232
            f"PG:host={docker_db_config['HOST']}"
233
            f" user='{docker_db_config['POSTGRES_USER']}'"
234
            f" password='{docker_db_config['POSTGRES_PASSWORD']}'"
235
            f" port={docker_db_config['PORT']}"
236
            f" dbname='{docker_db_config['POSTGRES_DB']}'"
237
        ]
238
        + ["-sql", "SELECT ST_Union(geometry) FROM boundaries.vg250_lan"],
239
        text=True,
240
    )
241
    features = json.loads(geojson.stdout)["features"]
242
    assert (
243
        len(features) == 1
244
    ), f"Found {len(features)} geometry features, expected exactly one."
245
246
    return prep(shape(features[0]["geometry"]))
247
248
249
def filter_zensus_population(filename, dataset):
250
    """This block filters lines in the source CSV file and copies
251
    the appropriate ones to the destination based on geometry.
252
253
254
    Parameters
255
    ----------
256
    filename : str
257
        Path to input csv-file
258
    dataset: str, optional
259
        Toggles between production (`dataset='Everything'`) and test mode e.g.
260
        (`dataset='Schleswig-Holstein'`).
261
        In production mode, data covering entire Germany
262
        is used. In the test mode a subset of this data is used for testing the
263
        workflow.
264
    Returns
265
    -------
266
    str
267
        Path to output csv-file
268
269
    """
270
271
    csv_file = Path(filename).resolve(strict=True)
272
273
    schleswig_holstein = select_geom()
274
275
    if not os.path.isfile(target(csv_file, dataset)):
276
277
        with open(csv_file, mode="r", newline="") as input_lines:
278
            rows = csv.DictReader(input_lines, delimiter=";")
279
            gitter_ids = set()
280
            with open(
281
                target(csv_file, dataset), mode="w", newline=""
282
            ) as destination:
283
                output = csv.DictWriter(
284
                    destination, delimiter=";", fieldnames=rows.fieldnames
285
                )
286
                output.writeheader()
287
                output.writerows(
288
                    gitter_ids.add(row["Gitter_ID_100m"]) or row
289
                    for row in rows
290
                    if schleswig_holstein.intersects(
291
                        Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
292
                    )
293
                )
294
    return target(csv_file, dataset)
295
296
297
def filter_zensus_misc(filename, dataset):
298
    """This block filters lines in the source CSV file and copies
299
    the appropriate ones to the destination based on grid_id values.
300
301
302
    Parameters
303
    ----------
304
    filename : str
305
        Path to input csv-file
306
    dataset: str, optional
307
        Toggles between production (`dataset='Everything'`) and test mode e.g.
308
        (`dataset='Schleswig-Holstein'`).
309
        In production mode, data covering entire Germany
310
        is used. In the test mode a subset of this data is used for testing the
311
        workflow.
312
    Returns
313
    -------
314
    str
315
        Path to output csv-file
316
317
    """
318
    csv_file = Path(filename).resolve(strict=True)
319
320
    gitter_ids = set(
321
        pd.read_sql(
322
            "SELECT grid_id from society.destatis_zensus_population_per_ha",
323
            con=db.engine(),
324
        ).grid_id.values
325
    )
326
327
    if not os.path.isfile(target(csv_file, dataset)):
328
        with open(
329
            csv_file, mode="r", newline="", encoding="iso-8859-1"
330
        ) as inputs:
331
            rows = csv.DictReader(inputs, delimiter=",")
332
            with open(
333
                target(csv_file, dataset),
334
                mode="w",
335
                newline="",
336
                encoding="iso-8859-1",
337
            ) as destination:
338
                output = csv.DictWriter(
339
                    destination, delimiter=",", fieldnames=rows.fieldnames
340
                )
341
                output.writeheader()
342
                output.writerows(
343
                    row for row in rows if row["Gitter_ID_100m"] in gitter_ids
344
                )
345
    return target(csv_file, dataset)
346
347
348
def population_to_postgres():
349
    """Import Zensus population data to postgres database"""
350
    # Get information from data configuration file
351
    data_config = egon.data.config.datasets()
352
    zensus_population_orig = data_config["zensus_population"]["original_data"]
353
    zensus_population_processed = data_config["zensus_population"]["processed"]
354
    input_file = (
355
        Path(".")
356
        / "zensus_population"
357
        / zensus_population_orig["target"]["file"]
358
    )
359
    dataset = settings()["egon-data"]["--dataset-boundary"]
360
361
    # Read database configuration from docker-compose.yml
362
    docker_db_config = db.credentials()
363
364
    population_table = (
365
        f"{zensus_population_processed['schema']}"
366
        f".{zensus_population_processed['table']}"
367
    )
368
369
    with zipfile.ZipFile(input_file) as zf:
370
        for filename in zf.namelist():
371
372
            zf.extract(filename)
373
374
            if dataset == "Everything":
375
                filename_insert = filename
376
            else:
377
                filename_insert = filter_zensus_population(filename, dataset)
378
379
            host = ["-h", f"{docker_db_config['HOST']}"]
380
            port = ["-p", f"{docker_db_config['PORT']}"]
381
            pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
382
            user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
383
            command = [
384
                "-c",
385
                rf"\copy {population_table} (grid_id, x_mp, y_mp, population)"
386
                rf" FROM '{filename_insert}' DELIMITER ';' CSV HEADER;",
387
            ]
388
            subprocess.run(
389
                ["psql"] + host + port + pgdb + user + command,
390
                env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
391
            )
392
393
        os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined in case the for loop on line 370 is not entered. Are you sure this can never be the case?
Loading history...
394
395
    db.execute_sql(
396
        f"UPDATE {population_table} zs"
397
        " SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);"
398
    )
399
400
    db.execute_sql(
401
        f"UPDATE {population_table} zs"
402
        """ SET geom=ST_SetSRID(
403
                (ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)),
404
                3035
405
            );
406
        """
407
    )
408
409
    db.execute_sql(
410
        f"CREATE INDEX {zensus_population_processed['table']}_geom_idx ON"
411
        f" {population_table} USING gist (geom);"
412
    )
413
414
    db.execute_sql(
415
        f"CREATE INDEX"
416
        f" {zensus_population_processed['table']}_geom_point_idx"
417
        f" ON  {population_table} USING gist (geom_point);"
418
    )
419
420
421
def zensus_misc_to_postgres():
422
    """Import data on buildings, households and apartments to postgres db"""
423
424
    # Get information from data configuration file
425
    data_config = egon.data.config.datasets()
426
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
427
    zensus_population_processed = data_config["zensus_population"]["processed"]
428
    file_path = Path(".") / "zensus_population"
429
    dataset = settings()["egon-data"]["--dataset-boundary"]
430
431
    population_table = (
432
        f"{zensus_population_processed['schema']}"
433
        f".{zensus_population_processed['table']}"
434
    )
435
436
    # Read database configuration from docker-compose.yml
437
    docker_db_config = db.credentials()
438
439
    for input_file, table in zensus_misc_processed["file_table_map"].items():
440
        with zipfile.ZipFile(file_path / input_file) as zf:
441
            csvfiles = [n for n in zf.namelist() if n.lower()[-3:] == "csv"]
442
            for filename in csvfiles:
443
                zf.extract(filename)
444
445
                if dataset == "Everything":
446
                    filename_insert = filename
447
                else:
448
                    filename_insert = filter_zensus_misc(filename, dataset)
449
450
                host = ["-h", f"{docker_db_config['HOST']}"]
451
                port = ["-p", f"{docker_db_config['PORT']}"]
452
                pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
453
                user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
454
                command = [
455
                    "-c",
456
                    rf"\copy {zensus_population_processed['schema']}.{table}"
457
                    f"""(grid_id,
458
                        grid_id_new,
459
                        attribute,
460
                        characteristics_code,
461
                        characteristics_text,
462
                        quantity,
463
                        quantity_q)
464
                        FROM '{filename_insert}' DELIMITER ','
465
                        CSV HEADER
466
                        ENCODING 'iso-8859-1';""",
467
                ]
468
                subprocess.run(
469
                    ["psql"] + host + port + pgdb + user + command,
470
                    env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
471
                )
472
473
            os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined for all execution paths.
Loading history...
474
475
        db.execute_sql(
476
            f"""UPDATE {zensus_population_processed['schema']}.{table} as b
477
                    SET zensus_population_id = zs.id
478
                    FROM {population_table} zs
479
                    WHERE b.grid_id = zs.grid_id;"""
480
        )
481
482
        db.execute_sql(
483
            f"""ALTER TABLE {zensus_population_processed['schema']}.{table}
484
                    ADD CONSTRAINT {table}_fkey
485
                    FOREIGN KEY (zensus_population_id)
486
                    REFERENCES {population_table}(id);"""
487
        )
488
489
    # Create combined table
490
    create_combined_zensus_table()
491
492
    # Delete entries for unpopulated cells
493
    adjust_zensus_misc()
494
495
496
def create_combined_zensus_table():
497
    """Create combined table with buildings, apartments and population per cell
498
499
    Only apartment and building data with acceptable data quality
500
    (quantity_q<2) is used, all other data is dropped. For more details on data
501
    quality see Zensus docs:
502
    https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html
503
504
    If there's no data on buildings or apartments for a certain cell, the value
505
    for building_count resp. apartment_count contains NULL.
506
    """
507
    sql_script = os.path.join(
508
        os.path.dirname(__file__), "create_combined_zensus_table.sql"
509
    )
510
    db.execute_sql_script(sql_script)
511
512
513
def adjust_zensus_misc():
514
    """Delete unpopulated cells in zensus-households, -buildings and -apartments
515
516
    Some unpopulated zensus cells are listed in:
517
    - egon_destatis_zensus_household_per_ha
518
    - egon_destatis_zensus_building_per_ha
519
    - egon_destatis_zensus_apartment_per_ha
520
521
    This can be caused by missing population
522
    information due to privacy or other special cases (e.g. holiday homes
523
    are listed as buildings but are not permanently populated.)
524
    In the following tasks of egon-data, only data of populated cells is used.
525
526
    Returns
527
    -------
528
    None.
529
530
    """
531
    # Get information from data configuration file
532
    data_config = egon.data.config.datasets()
533
    zensus_population_processed = data_config["zensus_population"]["processed"]
534
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
535
536
    population_table = (
537
        f"{zensus_population_processed['schema']}"
538
        f".{zensus_population_processed['table']}"
539
    )
540
541
    for input_file, table in zensus_misc_processed["file_table_map"].items():
542
        db.execute_sql(
543
            f"""
544
             DELETE FROM {zensus_population_processed['schema']}.{table} as b
545
             WHERE b.zensus_population_id IN (
546
                 SELECT id FROM {population_table}
547
                 WHERE population < 0);"""
548
        )
549