Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

data.datasets.zensus.create_zensus_pop_table()   A

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 0
dl 0
loc 22
rs 9.9
c 0
b 0
f 0
1
"""The central module containing all code dealing with importing Zensus data.
2
"""
3
4
from pathlib import Path
5
import csv
6
import json
7
import os
8
import zipfile
9
10
from shapely.geometry import Point, shape
11
from shapely.prepared import prep
12
import pandas as pd
13
import requests
14
15
from egon.data import db, subprocess
16
from egon.data.config import settings
17
from egon.data.datasets import Dataset
18
import egon.data.config
19
20
from egon_validation import(
21
    RowCountValidation,
22
    DataTypeValidation,
23
    NotNullAndNotNaNValidation,
24
    WholeTableNotNullAndNotNaNValidation,
25
    SRIDUniqueNonZero
26
)
27
28
29
class ZensusPopulation(Dataset):
30 View Code Duplication
    def __init__(self, dependencies):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
31
        super().__init__(
32
            name="ZensusPopulation",
33
            version="0.0.2",
34
            dependencies=dependencies,
35
            tasks=(
36
                create_zensus_pop_table,
37
                population_to_postgres,
38
            ),
39
            validation={
40
                "data-quality":[
41
                    RowCountValidation(
42
                        table="society.egon_destatis_zensus_apartment_building_population_per_ha",
43
                        rule_id="TEST_ROW_COUNT.egon_destatis_zensus_apartment_building_population_per_ha",
44
                        expected_count={"Schleswig-Holstein": 145634, "Everything": 3206490}
45
                    ),
46
                    DataTypeValidation(
47
                        table="society.egon_destatis_zensus_apartment_building_population_per_ha",
48
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_destatis_zensus_apartment_building_population_per_ha",
49
                        column_types={
50
                            "grid_id": "character varying", "zensus_population_id": "integer", "building_count": "smallint",
51
                            "apartment_count": "smallint", "geom": "geometry", "geom_point": "geometry"
52
                        }
53
                    ),
54
                    NotNullAndNotNaNValidation(
55
                        table="society.egon_destatis_zensus_apartment_building_population_per_ha",
56
                        rule_id="TEST_NOT_NAN.egon_destatis_zensus_apartment_building_population_per_ha",
57
                        columns=["grid_id", "zensus_population_id", "building_count", "apartment_count", "geom", "geom_point"]
58
                    ),
59
                    WholeTableNotNullAndNotNaNValidation(
60
                        table="society.egon_destatis_zensus_apartment_building_population_per_ha",
61
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_destatis_zensus_apartment_building_population_per_ha"
62
                    ),
63
                    SRIDUniqueNonZero(
64
                        table="society.egon_destatis_zensus_apartment_building_population_per_ha",
65
                        rule_id="SRIDUniqueNonZero.egon_destatis_zensus_apartment_building_population_per_ha.geom",
66
                        column="geom"
67
                    ),
68
                    SRIDUniqueNonZero(
69
                        table="society.egon_destatis_zensus_apartment_building_population_per_ha",
70
                        rule_id="SRIDUniqueNonZero.egon_destatis_zensus_apartment_building_population_per_ha.geom_point",
71
                        column="geom_point"
72
                    ),
73
                ]
74
            },
75
            on_validation_failure="continue"
76
        )
77
78
79
class ZensusMiscellaneous(Dataset):
80
    def __init__(self, dependencies):
81
        super().__init__(
82
            name="ZensusMiscellaneous",
83
            version="0.0.1",
84
            dependencies=dependencies,
85
            tasks=(
86
                create_zensus_misc_tables,
87
                zensus_misc_to_postgres,
88
            ),
89
            validation={
90
                "data-quality":[
91
92
                    RowCountValidation(
93
                        table="society.egon_destatis_zensus_apartment_per_ha",
94
                        rule_id="TEST_ROW_COUNT.egon_destatis_zensus_apartment_per_ha",
95
                        expected_count={"Schleswig-Holstein": 1946300, "Everything": 51095280}
96
                    ),
97
                    DataTypeValidation(
98
                        table="society.egon_destatis_zensus_apartment_per_ha",
99
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_destatis_zensus_apartment_per_ha",
100
                        column_types={
101
                            "id": "integer", "grid_id": "character varying", "grid_id_new": "character varying",
102
                            "attribute": "character varying", "characteristics_code": "smallint",
103
                            "characteristics_text": "text", "quantity": "smallint", "quantity_q": "smallint",
104
                            "zensus_population_id": "integer"
105
                        }
106
                    ),
107
                    NotNullAndNotNaNValidation(
108
                        table="society.egon_destatis_zensus_apartment_per_ha",
109
                        rule_id="TEST_NOT_NAN.egon_destatis_zensus_apartment_per_ha",
110
                        columns=[
111
                            "id", "grid_id", "grid_id_new", "attribute", "characteristics_code", "characteristics_text",
112
                            "quantity", "quantity_q", "zensus_population_id"
113
                        ]
114
                    ),
115
                    WholeTableNotNullAndNotNaNValidation(
116
                        table="society.egon_destatis_zensus_apartment_per_ha",
117
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_destatis_zensus_apartment_per_ha"
118
                    ),
119
                    RowCountValidation(
120
                        table="society.egon_destatis_zensus_building_per_ha",
121
                        rule_id="TEST_ROW_COUNT.egon_destatis_zensus_building_per_ha",
122
                        expected_count={"Schleswig-Holstein": 978493, "Everything": 24297136}
123
                    ),
124
                    DataTypeValidation(
125
                        table="society.egon_destatis_zensus_building_per_ha",
126
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_destatis_zensus_building_per_ha",
127
                        column_types={
128
                            "id": "integer",
129
                            "grid_id": "character varying",
130
                            "grid_id_new": "character varying",
131
                            "attribute": "character varying",
132
                            "characteristics_code": "smallint",
133
                            "characteristics_text": "text",
134
                            "quantity": "smallint",
135
                            "quantity_q": "smallint",
136
                            "zensus_population_id": "integer"
137
                        }
138
                    ),
139
                    NotNullAndNotNaNValidation(
140
                        table="society.egon_destatis_zensus_building_per_ha",
141
                        rule_id="TEST_NOT_NAN.egon_destatis_zensus_building_per_ha",
142
                        columns=[
143
                            "id",
144
                            "grid_id",
145
                            "grid_id_new",
146
                            "attribute",
147
                            "characteristics_code",
148
                            "characteristics_text",
149
                            "quantity",
150
                            "quantity_q",
151
                            "zensus_population_id"
152
                        ]
153
                    ),
154
                    WholeTableNotNullAndNotNaNValidation(
155
                        table="society.egon_destatis_zensus_building_per_ha",
156
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_destatis_zensus_building_per_ha"
157
                    ),
158
                    RowCountValidation(
159
                        table="society.egon_destatis_zensus_household_per_ha",
160
                        rule_id="TEST_ROW_COUNT.egon_destatis_zensus_household_per_ha",
161
                        expected_count={"Schleswig-Holstein": 724970, "Everything": 18788917}
162
                    ),
163
                    DataTypeValidation(
164
                        table="society.egon_destatis_zensus_household_per_ha",
165
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_destatis_zensus_household_per_ha",
166
                        column_types={
167
                            "id": "integer",
168
                            "grid_id": "character varying",
169
                            "grid_id_new": "character varying",
170
                            "attribute": "character varying",
171
                            "characteristics_code": "smallint",
172
                            "characteristics_text": "text",
173
                            "quantity": "smallint",
174
                            "quantity_q": "smallint",
175
                            "zensus_population_id": "integer"
176
                        }
177
                    ),
178
                    NotNullAndNotNaNValidation(
179
                        table="society.egon_destatis_zensus_household_per_ha",
180
                        rule_id="TEST_NOT_NAN.egon_destatis_zensus_household_per_ha",
181
                        columns=[
182
                            "id",
183
                            "grid_id",
184
                            "grid_id_new",
185
                            "attribute",
186
                            "characteristics_code",
187
                            "characteristics_text",
188
                            "quantity",
189
                            "quantity_q",
190
                            "zensus_population_id"
191
                        ]
192
                    ),
193
                    WholeTableNotNullAndNotNaNValidation(
194
                        table="society.egon_destatis_zensus_household_per_ha",
195
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_destatis_zensus_household_per_ha"
196
                    ),
197
                    RowCountValidation(
198
                        table="society.egon_destatis_zensus_household_per_ha_refined",
199
                        rule_id="TEST_ROW_COUNT.egon_destatis_zensus_household_per_ha_refined",
200
                        expected_count={"Schleswig-Holstein": 551678, "Everything": 13304814}
201
                    ),
202
                    DataTypeValidation(
203
                        table="society.egon_destatis_zensus_household_per_ha_refined",
204
                        rule_id="TEST_DATA_MULTIPLE_TYPES.egon_destatis_zensus_household_per_ha_refined",
205
                        column_types={
206
                            "id": "integer",
207
                            "cell_id": "integer",
208
                            "grid_id": "character varying",
209
                            "nuts3": "character varying",
210
                            "nuts1": "character varying",
211
                            "characteristics_code": "integer",
212
                            "hh_5types": "integer",
213
                            "hh_type": "character",
214
                            "hh_10types": "integer"
215
                        }
216
                    ),
217
                    NotNullAndNotNaNValidation(
218
                        table="society.egon_destatis_zensus_household_per_ha_refined",
219
                        rule_id="TEST_NOT_NAN.egon_destatis_zensus_household_per_ha_refined",
220
                        columns=[
221
                            "id",
222
                            "cell_id",
223
                            "grid_id",
224
                            "nuts3",
225
                            "nuts1",
226
                            "characteristics_code",
227
                            "hh_5types",
228
                            "hh_type",
229
                            "hh_10types"
230
                        ]
231
                    ),
232
                    WholeTableNotNullAndNotNaNValidation(
233
                        table="society.egon_destatis_zensus_household_per_ha_refined",
234
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN.egon_destatis_zensus_household_per_ha_refined"
235
                    ),
236
                ]
237
            },
238
            on_validation_failure="continue"
239
        )
240
241
242
def download_and_check(url, target_file, max_iteration=5):
243
    """Download file from url (http) if it doesn't exist and check afterwards.
244
    If bad zip remove file and re-download. Repeat until file is fine or
245
    reached maximum iterations."""
246
    bad_file = True
247
    count = 0
248
    while bad_file:
249
250
        # download file if it doesn't exist
251
        if not os.path.isfile(target_file):
252
            # check if url
253
            if url.lower().startswith("http"):
254
                print("Downloading: ", url)
255
                req = requests.get(
256
                    url, headers={"User-Agent": "Mozilla/5.0"}, stream=True
257
                )
258
                open(target_file, "wb").write(req.content)
259
            else:
260
                raise ValueError("No http url")
261
262
        # check zipfile
263
        try:
264
            with zipfile.ZipFile(target_file):
265
                print(f"Zip file {target_file} is good.")
266
            bad_file = False
267
        except zipfile.BadZipFile as ex:
268
            os.remove(target_file)
269
            count += 1
270
            if count > max_iteration:
271
                raise StopIteration(
272
                    f"Max iteration of {max_iteration} is exceeded"
273
                ) from ex
274
275
276
def download_zensus_pop():
277
    """Download Zensus csv file on population per hectare grid cell."""
278
279
    data_config = egon.data.config.datasets()
280
    zensus_population_config = data_config["zensus_population"][
281
        "original_data"
282
    ]
283
    download_directory = Path(".") / "zensus_population"
284
    # Create the folder, if it does not exist already
285
    if not os.path.exists(download_directory):
286
        os.mkdir(download_directory)
287
288
    target_file = (
289
        download_directory / zensus_population_config["target"]["file"]
290
    )
291
292
    url = zensus_population_config["source"]["url"]
293
    download_and_check(url, target_file, max_iteration=5)
294
295
296
def download_zensus_misc():
297
    """Download Zensus csv files on data per hectare grid cell."""
298
299
    # Get data config
300
    data_config = egon.data.config.datasets()
301
    download_directory = Path(".") / "zensus_population"
302
    # Create the folder, if it does not exist already
303
    if not os.path.exists(download_directory):
304
        os.mkdir(download_directory)
305
    # Download remaining zensus data set on households, buildings, apartments
306
307
    zensus_config = data_config["zensus_misc"]["original_data"]
308
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
309
    zensus_url = zensus_config["source"]["url"]
310
    zensus_files = zensus_misc_processed["file_table_map"].keys()
311
    url_path_map = list(zip(zensus_url, zensus_files))
312
313
    for url, path in url_path_map:
314
        target_file_misc = download_directory / path
315
316
        download_and_check(url, target_file_misc, max_iteration=5)
317
318
319
def create_zensus_pop_table():
320
    """Create tables for zensus data in postgres database"""
321
322
    # Get information from data configuration file
323
    data_config = egon.data.config.datasets()
324
    zensus_population_processed = data_config["zensus_population"]["processed"]
325
326
    # Create target schema
327
    db.execute_sql(
328
        f"CREATE SCHEMA IF NOT EXISTS {zensus_population_processed['schema']};"
329
    )
330
331
    # Create table for population data
332
    population_table = (
333
        f"{zensus_population_processed['schema']}"
334
        f".{zensus_population_processed['table']}"
335
    )
336
337
    db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;")
338
339
    db.execute_sql(
340
        f"CREATE TABLE {population_table}"
341
        f""" (id        SERIAL NOT NULL,
342
              grid_id    character varying(254) NOT NULL,
343
              x_mp       int,
344
              y_mp       int,
345
              population smallint,
346
              geom_point geometry(Point,3035),
347
              geom geometry (Polygon, 3035),
348
              CONSTRAINT {zensus_population_processed['table']}_pkey
349
              PRIMARY KEY (id)
350
        );
351
        """
352
    )
353
354
355
def create_zensus_misc_tables():
356
    """Create tables for zensus data in postgres database"""
357
358
    # Get information from data configuration file
359
    data_config = egon.data.config.datasets()
360
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
361
362
    # Create target schema
363
    db.execute_sql(
364
        f"CREATE SCHEMA IF NOT EXISTS {zensus_misc_processed['schema']};"
365
    )
366
367
    # Create tables for household, apartment and building
368
    for table in zensus_misc_processed["file_table_map"].values():
369
        misc_table = f"{zensus_misc_processed['schema']}.{table}"
370
371
        db.execute_sql(f"DROP TABLE IF EXISTS {misc_table} CASCADE;")
372
        db.execute_sql(
373
            f"CREATE TABLE {misc_table}"
374
            f""" (id                 SERIAL,
375
                  grid_id            VARCHAR(50),
376
                  grid_id_new        VARCHAR (50),
377
                  attribute          VARCHAR(50),
378
                  characteristics_code smallint,
379
                  characteristics_text text,
380
                  quantity           smallint,
381
                  quantity_q         smallint,
382
                  zensus_population_id int,
383
                  CONSTRAINT {table}_pkey PRIMARY KEY (id)
384
            );
385
            """
386
        )
387
388
389
def target(source, dataset):
390
    """Generate the target path corresponding to a source path.
391
392
    Parameters
393
    ----------
394
    dataset: str
395
        Toggles between production (`dataset='Everything'`) and test mode e.g.
396
        (`dataset='Schleswig-Holstein'`).
397
        In production mode, data covering entire Germany
398
        is used. In the test mode a subset of this data is used for testing the
399
        workflow.
400
    Returns
401
    -------
402
    Path
403
        Path to target csv-file
404
405
    """
406
    return Path(
407
        os.path.join(Path("."), "data_bundle_egon_data", source.stem)
408
        + "zensus_population"
409
        + "."
410
        + dataset
411
        + source.suffix
412
    )
413
414
415
def select_geom():
416
    """Select the union of the geometries of Schleswig-Holstein from the
417
    database, convert their projection to the one used in the CSV file,
418
    output the result to stdout as a GeoJSON string and read it into a
419
    prepared shape for filtering.
420
421
    """
422
    docker_db_config = db.credentials()
423
424
    geojson = subprocess.run(
425
        ["ogr2ogr"]
426
        + ["-s_srs", "epsg:4326"]
427
        + ["-t_srs", "epsg:3035"]
428
        + ["-f", "GeoJSON"]
429
        + ["/vsistdout/"]
430
        + [
431
            f"PG:host={docker_db_config['HOST']}"
432
            f" user='{docker_db_config['POSTGRES_USER']}'"
433
            f" password='{docker_db_config['POSTGRES_PASSWORD']}'"
434
            f" port={docker_db_config['PORT']}"
435
            f" dbname='{docker_db_config['POSTGRES_DB']}'"
436
        ]
437
        + ["-sql", "SELECT ST_Union(geometry) FROM boundaries.vg250_lan"],
438
        text=True,
439
    )
440
    features = json.loads(geojson.stdout)["features"]
441
    assert (
442
        len(features) == 1
443
    ), f"Found {len(features)} geometry features, expected exactly one."
444
445
    return prep(shape(features[0]["geometry"]))
446
447
448
def filter_zensus_population(filename, dataset):
449
    """This block filters lines in the source CSV file and copies
450
    the appropriate ones to the destination based on geometry.
451
452
453
    Parameters
454
    ----------
455
    filename : str
456
        Path to input csv-file
457
    dataset: str, optional
458
        Toggles between production (`dataset='Everything'`) and test mode e.g.
459
        (`dataset='Schleswig-Holstein'`).
460
        In production mode, data covering entire Germany
461
        is used. In the test mode a subset of this data is used for testing the
462
        workflow.
463
    Returns
464
    -------
465
    str
466
        Path to output csv-file
467
468
    """
469
470
    csv_file = Path(filename).resolve(strict=True)
471
472
    schleswig_holstein = select_geom()
473
474
    if not os.path.isfile(target(csv_file, dataset)):
475
476
        with open(csv_file, mode="r", newline="") as input_lines:
477
            rows = csv.DictReader(input_lines, delimiter=";")
478
            gitter_ids = set()
479
            with open(
480
                target(csv_file, dataset), mode="w", newline=""
481
            ) as destination:
482
                output = csv.DictWriter(
483
                    destination, delimiter=";", fieldnames=rows.fieldnames
484
                )
485
                output.writeheader()
486
                output.writerows(
487
                    gitter_ids.add(row["Gitter_ID_100m"]) or row
488
                    for row in rows
489
                    if schleswig_holstein.intersects(
490
                        Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
491
                    )
492
                )
493
    return target(csv_file, dataset)
494
495
496
def filter_zensus_misc(filename, dataset):
497
    """This block filters lines in the source CSV file and copies
498
    the appropriate ones to the destination based on grid_id values.
499
500
501
    Parameters
502
    ----------
503
    filename : str
504
        Path to input csv-file
505
    dataset: str, optional
506
        Toggles between production (`dataset='Everything'`) and test mode e.g.
507
        (`dataset='Schleswig-Holstein'`).
508
        In production mode, data covering entire Germany
509
        is used. In the test mode a subset of this data is used for testing the
510
        workflow.
511
    Returns
512
    -------
513
    str
514
        Path to output csv-file
515
516
    """
517
    csv_file = Path(filename).resolve(strict=True)
518
519
    gitter_ids = set(
520
        pd.read_sql(
521
            "SELECT grid_id from society.destatis_zensus_population_per_ha",
522
            con=db.engine(),
523
        ).grid_id.values
524
    )
525
526
    if not os.path.isfile(target(csv_file, dataset)):
527
        with open(
528
            csv_file, mode="r", newline="", encoding="iso-8859-1"
529
        ) as inputs:
530
            rows = csv.DictReader(inputs, delimiter=",")
531
            with open(
532
                target(csv_file, dataset),
533
                mode="w",
534
                newline="",
535
                encoding="iso-8859-1",
536
            ) as destination:
537
                output = csv.DictWriter(
538
                    destination, delimiter=",", fieldnames=rows.fieldnames
539
                )
540
                output.writeheader()
541
                output.writerows(
542
                    row for row in rows if row["Gitter_ID_100m"] in gitter_ids
543
                )
544
    return target(csv_file, dataset)
545
546
547
def population_to_postgres():
548
    """Import Zensus population data to postgres database"""
549
    # Get information from data configuration file
550
    data_config = egon.data.config.datasets()
551
    zensus_population_orig = data_config["zensus_population"]["original_data"]
552
    zensus_population_processed = data_config["zensus_population"]["processed"]
553
    input_file = (
554
        Path(".")
555
        / "data_bundle_egon_data"
556
        / "zensus_population"
557
        / zensus_population_orig["target"]["file"]
558
    )
559
    dataset = settings()["egon-data"]["--dataset-boundary"]
560
561
    # Read database configuration from docker-compose.yml
562
    docker_db_config = db.credentials()
563
564
    population_table = (
565
        f"{zensus_population_processed['schema']}"
566
        f".{zensus_population_processed['table']}"
567
    )
568
569
    with zipfile.ZipFile(input_file) as zf:
570
        for filename in zf.namelist():
571
572
            zf.extract(filename)
573
574
            if dataset == "Everything":
575
                filename_insert = filename
576
            else:
577
                filename_insert = filter_zensus_population(filename, dataset)
578
579
            host = ["-h", f"{docker_db_config['HOST']}"]
580
            port = ["-p", f"{docker_db_config['PORT']}"]
581
            pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
582
            user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
583
            command = [
584
                "-c",
585
                rf"\copy {population_table} (grid_id, x_mp, y_mp, population)"
586
                rf" FROM '{filename_insert}' DELIMITER ';' CSV HEADER;",
587
            ]
588
            subprocess.run(
589
                ["psql"] + host + port + pgdb + user + command,
590
                env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
591
            )
592
593
        os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined in case the for loop on line 570 is not entered. Are you sure this can never be the case?
Loading history...
594
595
    db.execute_sql(
596
        f"UPDATE {population_table} zs"
597
        " SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);"
598
    )
599
600
    db.execute_sql(
601
        f"UPDATE {population_table} zs"
602
        """ SET geom=ST_SetSRID(
603
                (ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)),
604
                3035
605
            );
606
        """
607
    )
608
609
    db.execute_sql(
610
        f"CREATE INDEX {zensus_population_processed['table']}_geom_idx ON"
611
        f" {population_table} USING gist (geom);"
612
    )
613
614
    db.execute_sql(
615
        f"CREATE INDEX"
616
        f" {zensus_population_processed['table']}_geom_point_idx"
617
        f" ON  {population_table} USING gist (geom_point);"
618
    )
619
620
621
def zensus_misc_to_postgres():
622
    """Import data on buildings, households and apartments to postgres db"""
623
624
    # Get information from data configuration file
625
    data_config = egon.data.config.datasets()
626
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
627
    zensus_population_processed = data_config["zensus_population"]["processed"]
628
    file_path = Path(".") / "data_bundle_egon_data" / "zensus_population"
629
    dataset = settings()["egon-data"]["--dataset-boundary"]
630
631
    population_table = (
632
        f"{zensus_population_processed['schema']}"
633
        f".{zensus_population_processed['table']}"
634
    )
635
636
    # Read database configuration from docker-compose.yml
637
    docker_db_config = db.credentials()
638
639
    for input_file, table in zensus_misc_processed["file_table_map"].items():
640
        with zipfile.ZipFile(file_path / input_file) as zf:
641
            csvfiles = [n for n in zf.namelist() if n.lower()[-3:] == "csv"]
642
            for filename in csvfiles:
643
                zf.extract(filename)
644
645
                if dataset == "Everything":
646
                    filename_insert = filename
647
                else:
648
                    filename_insert = filter_zensus_misc(filename, dataset)
649
650
                host = ["-h", f"{docker_db_config['HOST']}"]
651
                port = ["-p", f"{docker_db_config['PORT']}"]
652
                pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
653
                user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
654
                command = [
655
                    "-c",
656
                    rf"\copy {zensus_population_processed['schema']}.{table}"
657
                    f"""(grid_id,
658
                        grid_id_new,
659
                        attribute,
660
                        characteristics_code,
661
                        characteristics_text,
662
                        quantity,
663
                        quantity_q)
664
                        FROM '{filename_insert}' DELIMITER ','
665
                        CSV HEADER
666
                        ENCODING 'iso-8859-1';""",
667
                ]
668
                subprocess.run(
669
                    ["psql"] + host + port + pgdb + user + command,
670
                    env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
671
                )
672
673
            os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined for all execution paths.
Loading history...
674
675
        db.execute_sql(
676
            f"""UPDATE {zensus_population_processed['schema']}.{table} as b
677
                    SET zensus_population_id = zs.id
678
                    FROM {population_table} zs
679
                    WHERE b.grid_id = zs.grid_id;"""
680
        )
681
682
        db.execute_sql(
683
            f"""ALTER TABLE {zensus_population_processed['schema']}.{table}
684
                    ADD CONSTRAINT {table}_fkey
685
                    FOREIGN KEY (zensus_population_id)
686
                    REFERENCES {population_table}(id);"""
687
        )
688
689
    # Create combined table
690
    create_combined_zensus_table()
691
692
    # Delete entries for unpopulated cells
693
    adjust_zensus_misc()
694
695
696
def create_combined_zensus_table():
697
    """Create combined table with buildings, apartments and population per cell
698
699
    Only apartment and building data with acceptable data quality
700
    (quantity_q<2) is used, all other data is dropped. For more details on data
701
    quality see Zensus docs:
702
    https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html
703
704
    If there's no data on buildings or apartments for a certain cell, the value
705
    for building_count resp. apartment_count contains NULL.
706
    """
707
    sql_script = os.path.join(
708
        os.path.dirname(__file__), "create_combined_zensus_table.sql"
709
    )
710
    db.execute_sql_script(sql_script)
711
712
713
def adjust_zensus_misc():
714
    """Delete unpopulated cells in zensus-households, -buildings and -apartments
715
716
    Some unpopulated zensus cells are listed in:
717
    - egon_destatis_zensus_household_per_ha
718
    - egon_destatis_zensus_building_per_ha
719
    - egon_destatis_zensus_apartment_per_ha
720
721
    This can be caused by missing population
722
    information due to privacy or other special cases (e.g. holiday homes
723
    are listed as buildings but are not permanently populated.)
724
    In the following tasks of egon-data, only data of populated cells is used.
725
726
    Returns
727
    -------
728
    None.
729
730
    """
731
    # Get information from data configuration file
732
    data_config = egon.data.config.datasets()
733
    zensus_population_processed = data_config["zensus_population"]["processed"]
734
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
735
736
    population_table = (
737
        f"{zensus_population_processed['schema']}"
738
        f".{zensus_population_processed['table']}"
739
    )
740
741
    for input_file, table in zensus_misc_processed["file_table_map"].items():
742
        db.execute_sql(
743
            f"""
744
             DELETE FROM {zensus_population_processed['schema']}.{table} as b
745
             WHERE b.zensus_population_id IN (
746
                 SELECT id FROM {population_table}
747
                 WHERE population < 0);"""
748
        )
749