Passed
Pull Request — dev (#970)
by
unknown
01:32
created

data.datasets.zensus.download_and_check()   A

Complexity

Conditions 5

Size

Total Lines 24
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 24
rs 9.0833
c 0
b 0
f 0
cc 5
nop 3
1
"""The central module containing all code dealing with importing Zensus data.
2
"""
3
4
from pathlib import Path
5
from urllib.request import urlretrieve
6
import csv
7
import json
8
import os
9
import zipfile
10
11
from shapely.geometry import Point, shape
12
from shapely.prepared import prep
13
import pandas as pd
14
15
from egon.data import db, subprocess
16
from egon.data.config import settings
17
from egon.data.datasets import Dataset
18
import egon.data.config
19
20
21
class ZensusPopulation(Dataset):
22
    def __init__(self, dependencies):
23
        super().__init__(
24
            name="ZensusPopulation",
25
            version="0.0.1",
26
            dependencies=dependencies,
27
            tasks=(
28
                download_zensus_pop,
29
                create_zensus_pop_table,
30
                population_to_postgres,
31
            ),
32
        )
33
34
35
class ZensusMiscellaneous(Dataset):
36
    def __init__(self, dependencies):
37
        super().__init__(
38
            name="ZensusMiscellaneous",
39
            version="0.0.1",
40
            dependencies=dependencies,
41
            tasks=(
42
                download_zensus_misc,
43
                create_zensus_misc_tables,
44
                zensus_misc_to_postgres,
45
            ),
46
        )
47
48
49
def download_and_check(url, target_file, max_iteration=5):
50
    """Download file if doesnt exist and check afterwards. If badzip
51
    remove file and re-download. Repeat until file is fine or reached
52
    maximum iterations."""
53
    bad_file = True
54
    count = 0
55
    while bad_file:
56
57
        if not os.path.isfile(target_file):
58
            urlretrieve(url, target_file)
59
60
        # check zipfile
61
        try:
62
            _ = zipfile.ZipFile(target_file)
63
            print(f"Zip file {target_file} is good.")
64
            bad_file = False
65
        except zipfile.BadZipFile:
66
            os.remove(target_file)
67
            count += 1
68
            if count > max_iteration:
69
                raise zipfile.BadZipFile(
70
                    f"{target_file} is" f" not a zip file"
71
                )
72
            pass
73
74
75
def download_zensus_pop():
76
    """Download Zensus csv file on population per hectar grid cell."""
77
    data_config = egon.data.config.datasets()
78
    zensus_population_config = data_config["zensus_population"][
79
        "original_data"
80
    ]
81
    download_directory = Path(".") / "zensus_population"
82
    # Create the folder, if it does not exists already
83
    if not os.path.exists(download_directory):
84
        os.mkdir(download_directory)
85
86
    target_file = (
87
        download_directory / zensus_population_config["target"]["file"]
88
    )
89
90
    url = zensus_population_config["source"]["url"]
91
    download_and_check(url, target_file, max_iteration=5)
92
93
94
def download_zensus_misc():
95
    """Download Zensus csv files on data per hectar grid cell."""
96
97
    # Get data config
98
    data_config = egon.data.config.datasets()
99
    download_directory = Path(".") / "zensus_population"
100
    # Create the folder, if it does not exists already
101
    if not os.path.exists(download_directory):
102
        os.mkdir(download_directory)
103
    # Download remaining zensus data set on households, buildings, apartments
104
105
    zensus_config = data_config["zensus_misc"]["original_data"]
106
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
107
    zensus_url = zensus_config["source"]["url"]
108
    zensus_files = zensus_misc_processed["file_table_map"].keys()
109
    url_path_map = list(zip(zensus_url, zensus_files))
110
111
    for url, path in url_path_map:
112
        target_file_misc = download_directory / path
113
114
        download_and_check(url, target_file_misc, max_iteration=5)
115
116
117
def create_zensus_pop_table():
118
    """Create tables for zensus data in postgres database"""
119
120
    # Get information from data configuration file
121
    data_config = egon.data.config.datasets()
122
    zensus_population_processed = data_config["zensus_population"]["processed"]
123
124
    # Create target schema
125
    db.execute_sql(
126
        f"CREATE SCHEMA IF NOT EXISTS {zensus_population_processed['schema']};"
127
    )
128
129
    # Create table for population data
130
    population_table = (
131
        f"{zensus_population_processed['schema']}"
132
        f".{zensus_population_processed['table']}"
133
    )
134
135
    db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;")
136
137
    db.execute_sql(
138
        f"CREATE TABLE {population_table}"
139
        f""" (id        SERIAL NOT NULL,
140
              grid_id    character varying(254) NOT NULL,
141
              x_mp       int,
142
              y_mp       int,
143
              population smallint,
144
              geom_point geometry(Point,3035),
145
              geom geometry (Polygon, 3035),
146
              CONSTRAINT {zensus_population_processed['table']}_pkey
147
              PRIMARY KEY (id)
148
        );
149
        """
150
    )
151
152
153
def create_zensus_misc_tables():
154
    """Create tables for zensus data in postgres database"""
155
156
    # Get information from data configuration file
157
    data_config = egon.data.config.datasets()
158
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
159
160
    # Create target schema
161
    db.execute_sql(
162
        f"CREATE SCHEMA IF NOT EXISTS {zensus_misc_processed['schema']};"
163
    )
164
165
    # Create tables for household, apartment and building
166
    for table in zensus_misc_processed["file_table_map"].values():
167
        misc_table = f"{zensus_misc_processed['schema']}.{table}"
168
169
        db.execute_sql(f"DROP TABLE IF EXISTS {misc_table} CASCADE;")
170
        db.execute_sql(
171
            f"CREATE TABLE {misc_table}"
172
            f""" (id                 SERIAL,
173
                  grid_id            VARCHAR(50),
174
                  grid_id_new        VARCHAR (50),
175
                  attribute          VARCHAR(50),
176
                  characteristics_code smallint,
177
                  characteristics_text text,
178
                  quantity           smallint,
179
                  quantity_q         smallint,
180
                  zensus_population_id int,
181
                  CONSTRAINT {table}_pkey PRIMARY KEY (id)
182
            );
183
            """
184
        )
185
186
187
def target(source, dataset):
188
    """Generate the target path corresponding to a source path.
189
190
    Parameters
191
    ----------
192
    dataset: str
193
        Toggles between production (`dataset='Everything'`) and test mode e.g.
194
        (`dataset='Schleswig-Holstein'`).
195
        In production mode, data covering entire Germany
196
        is used. In the test mode a subset of this data is used for testing the
197
        workflow.
198
    Returns
199
    -------
200
    Path
201
        Path to target csv-file
202
203
    """
204
    return Path(
205
        os.path.join(Path("."), "zensus_population", source.stem)
206
        + "."
207
        + dataset
208
        + source.suffix
209
    )
210
211
212
def select_geom():
213
    """Select the union of the geometries of Schleswig-Holstein from the
214
    database, convert their projection to the one used in the CSV file,
215
    output the result to stdout as a GeoJSON string and read it into a
216
    prepared shape for filtering.
217
218
    """
219
    docker_db_config = db.credentials()
220
221
    geojson = subprocess.run(
222
        ["ogr2ogr"]
223
        + ["-s_srs", "epsg:4326"]
224
        + ["-t_srs", "epsg:3035"]
225
        + ["-f", "GeoJSON"]
226
        + ["/vsistdout/"]
227
        + [
228
            f"PG:host={docker_db_config['HOST']}"
229
            f" user='{docker_db_config['POSTGRES_USER']}'"
230
            f" password='{docker_db_config['POSTGRES_PASSWORD']}'"
231
            f" port={docker_db_config['PORT']}"
232
            f" dbname='{docker_db_config['POSTGRES_DB']}'"
233
        ]
234
        + ["-sql", "SELECT ST_Union(geometry) FROM boundaries.vg250_lan"],
235
        text=True,
236
    )
237
    features = json.loads(geojson.stdout)["features"]
238
    assert (
239
        len(features) == 1
240
    ), f"Found {len(features)} geometry features, expected exactly one."
241
242
    return prep(shape(features[0]["geometry"]))
243
244
245
def filter_zensus_population(filename, dataset):
246
    """This block filters lines in the source CSV file and copies
247
    the appropriate ones to the destination based on geometry.
248
249
250
    Parameters
251
    ----------
252
    filename : str
253
        Path to input csv-file
254
    dataset: str, optional
255
        Toggles between production (`dataset='Everything'`) and test mode e.g.
256
        (`dataset='Schleswig-Holstein'`).
257
        In production mode, data covering entire Germany
258
        is used. In the test mode a subset of this data is used for testing the
259
        workflow.
260
    Returns
261
    -------
262
    str
263
        Path to output csv-file
264
265
    """
266
267
    csv_file = Path(filename).resolve(strict=True)
268
269
    schleswig_holstein = select_geom()
270
271
    if not os.path.isfile(target(csv_file, dataset)):
272
273
        with open(csv_file, mode="r", newline="") as input_lines:
274
            rows = csv.DictReader(input_lines, delimiter=";")
275
            gitter_ids = set()
276
            with open(
277
                target(csv_file, dataset), mode="w", newline=""
278
            ) as destination:
279
                output = csv.DictWriter(
280
                    destination, delimiter=";", fieldnames=rows.fieldnames
281
                )
282
                output.writeheader()
283
                output.writerows(
284
                    gitter_ids.add(row["Gitter_ID_100m"]) or row
285
                    for row in rows
286
                    if schleswig_holstein.intersects(
287
                        Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
288
                    )
289
                )
290
    return target(csv_file, dataset)
291
292
293
def filter_zensus_misc(filename, dataset):
294
    """This block filters lines in the source CSV file and copies
295
    the appropriate ones to the destination based on grid_id values.
296
297
298
    Parameters
299
    ----------
300
    filename : str
301
        Path to input csv-file
302
    dataset: str, optional
303
        Toggles between production (`dataset='Everything'`) and test mode e.g.
304
        (`dataset='Schleswig-Holstein'`).
305
        In production mode, data covering entire Germany
306
        is used. In the test mode a subset of this data is used for testing the
307
        workflow.
308
    Returns
309
    -------
310
    str
311
        Path to output csv-file
312
313
    """
314
    csv_file = Path(filename).resolve(strict=True)
315
316
    gitter_ids = set(
317
        pd.read_sql(
318
            "SELECT grid_id from society.destatis_zensus_population_per_ha",
319
            con=db.engine(),
320
        ).grid_id.values
321
    )
322
323
    if not os.path.isfile(target(csv_file, dataset)):
324
        with open(
325
            csv_file, mode="r", newline="", encoding="iso-8859-1"
326
        ) as inputs:
327
            rows = csv.DictReader(inputs, delimiter=",")
328
            with open(
329
                target(csv_file, dataset),
330
                mode="w",
331
                newline="",
332
                encoding="iso-8859-1",
333
            ) as destination:
334
                output = csv.DictWriter(
335
                    destination, delimiter=",", fieldnames=rows.fieldnames
336
                )
337
                output.writeheader()
338
                output.writerows(
339
                    row for row in rows if row["Gitter_ID_100m"] in gitter_ids
340
                )
341
    return target(csv_file, dataset)
342
343
344
def population_to_postgres():
345
    """Import Zensus population data to postgres database"""
346
    # Get information from data configuration file
347
    data_config = egon.data.config.datasets()
348
    zensus_population_orig = data_config["zensus_population"]["original_data"]
349
    zensus_population_processed = data_config["zensus_population"]["processed"]
350
    input_file = (
351
        Path(".")
352
        / "zensus_population"
353
        / zensus_population_orig["target"]["file"]
354
    )
355
    dataset = settings()["egon-data"]["--dataset-boundary"]
356
357
    # Read database configuration from docker-compose.yml
358
    docker_db_config = db.credentials()
359
360
    population_table = (
361
        f"{zensus_population_processed['schema']}"
362
        f".{zensus_population_processed['table']}"
363
    )
364
365
    with zipfile.ZipFile(input_file) as zf:
366
        for filename in zf.namelist():
367
368
            zf.extract(filename)
369
370
            if dataset == "Everything":
371
                filename_insert = filename
372
            else:
373
                filename_insert = filter_zensus_population(filename, dataset)
374
375
            host = ["-h", f"{docker_db_config['HOST']}"]
376
            port = ["-p", f"{docker_db_config['PORT']}"]
377
            pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
378
            user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
379
            command = [
380
                "-c",
381
                rf"\copy {population_table} (grid_id, x_mp, y_mp, population)"
382
                rf" FROM '{filename_insert}' DELIMITER ';' CSV HEADER;",
383
            ]
384
            subprocess.run(
385
                ["psql"] + host + port + pgdb + user + command,
386
                env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
387
            )
388
389
        os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined in case the for loop on line 366 is not entered. Are you sure this can never be the case?
Loading history...
390
391
    db.execute_sql(
392
        f"UPDATE {population_table} zs"
393
        " SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);"
394
    )
395
396
    db.execute_sql(
397
        f"UPDATE {population_table} zs"
398
        """ SET geom=ST_SetSRID(
399
                (ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)),
400
                3035
401
            );
402
        """
403
    )
404
405
    db.execute_sql(
406
        f"CREATE INDEX {zensus_population_processed['table']}_geom_idx ON"
407
        f" {population_table} USING gist (geom);"
408
    )
409
410
    db.execute_sql(
411
        f"CREATE INDEX"
412
        f" {zensus_population_processed['table']}_geom_point_idx"
413
        f" ON  {population_table} USING gist (geom_point);"
414
    )
415
416
417
def zensus_misc_to_postgres():
418
    """Import data on buildings, households and apartments to postgres db"""
419
420
    # Get information from data configuration file
421
    data_config = egon.data.config.datasets()
422
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
423
    zensus_population_processed = data_config["zensus_population"]["processed"]
424
    file_path = Path(".") / "zensus_population"
425
    dataset = settings()["egon-data"]["--dataset-boundary"]
426
427
    population_table = (
428
        f"{zensus_population_processed['schema']}"
429
        f".{zensus_population_processed['table']}"
430
    )
431
432
    # Read database configuration from docker-compose.yml
433
    docker_db_config = db.credentials()
434
435
    for input_file, table in zensus_misc_processed["file_table_map"].items():
436
        with zipfile.ZipFile(file_path / input_file) as zf:
437
            csvfiles = [n for n in zf.namelist() if n.lower()[-3:] == "csv"]
438
            for filename in csvfiles:
439
                zf.extract(filename)
440
441
                if dataset == "Everything":
442
                    filename_insert = filename
443
                else:
444
                    filename_insert = filter_zensus_misc(filename, dataset)
445
446
                host = ["-h", f"{docker_db_config['HOST']}"]
447
                port = ["-p", f"{docker_db_config['PORT']}"]
448
                pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
449
                user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
450
                command = [
451
                    "-c",
452
                    rf"\copy {zensus_population_processed['schema']}.{table}"
453
                    f"""(grid_id,
454
                        grid_id_new,
455
                        attribute,
456
                        characteristics_code,
457
                        characteristics_text,
458
                        quantity,
459
                        quantity_q)
460
                        FROM '{filename_insert}' DELIMITER ','
461
                        CSV HEADER
462
                        ENCODING 'iso-8859-1';""",
463
                ]
464
                subprocess.run(
465
                    ["psql"] + host + port + pgdb + user + command,
466
                    env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
467
                )
468
469
            os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined for all execution paths.
Loading history...
470
471
        db.execute_sql(
472
            f"""UPDATE {zensus_population_processed['schema']}.{table} as b
473
                    SET zensus_population_id = zs.id
474
                    FROM {population_table} zs
475
                    WHERE b.grid_id = zs.grid_id;"""
476
        )
477
478
        db.execute_sql(
479
            f"""ALTER TABLE {zensus_population_processed['schema']}.{table}
480
                    ADD CONSTRAINT {table}_fkey
481
                    FOREIGN KEY (zensus_population_id)
482
                    REFERENCES {population_table}(id);"""
483
        )
484
485
    # Create combined table
486
    create_combined_zensus_table()
487
488
    # Delete entries for unpopulated cells
489
    adjust_zensus_misc()
490
491
492
def create_combined_zensus_table():
493
    """Create combined table with buildings, apartments and population per cell
494
495
    Only apartment and building data with acceptable data quality
496
    (quantity_q<2) is used, all other data is dropped. For more details on data
497
    quality see Zensus docs:
498
    https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html
499
500
    If there's no data on buildings or apartments for a certain cell, the value
501
    for building_count resp. apartment_count contains NULL.
502
    """
503
    sql_script = os.path.join(
504
        os.path.dirname(__file__), "create_combined_zensus_table.sql"
505
    )
506
    db.execute_sql_script(sql_script)
507
508
509
def adjust_zensus_misc():
510
    """Delete unpopulated cells in zensus-households, -buildings and -apartments
511
512
    Some unpopulated zensus cells are listed in:
513
    - egon_destatis_zensus_household_per_ha
514
    - egon_destatis_zensus_building_per_ha
515
    - egon_destatis_zensus_apartment_per_ha
516
517
    This can be caused by missing population
518
    information due to privacy or other special cases (e.g. holiday homes
519
    are listed as buildings but are not permanently populated.)
520
    In the follwong tasks of egon-data, only data of populated cells is used.
521
522
    Returns
523
    -------
524
    None.
525
526
    """
527
    # Get information from data configuration file
528
    data_config = egon.data.config.datasets()
529
    zensus_population_processed = data_config["zensus_population"]["processed"]
530
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
531
532
    population_table = (
533
        f"{zensus_population_processed['schema']}"
534
        f".{zensus_population_processed['table']}"
535
    )
536
537
    for input_file, table in zensus_misc_processed["file_table_map"].items():
538
        db.execute_sql(
539
            f"""
540
             DELETE FROM {zensus_population_processed['schema']}.{table} as b
541
             WHERE b.zensus_population_id IN (
542
                 SELECT id FROM {population_table}
543
                 WHERE population < 0);"""
544
        )
545