Passed
Pull Request — dev (#970)
by
unknown
01:37
created

data.datasets.zensus.ZensusPopulation.__init__()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 2
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
"""The central module containing all code dealing with importing Zensus data.
2
"""
3
4
from pathlib import Path
5
from urllib.request import urlretrieve
6
import csv
7
import json
8
import os
9
import zipfile
10
11
from shapely.geometry import Point, shape
12
from shapely.prepared import prep
13
import pandas as pd
14
15
from egon.data import db, subprocess
16
from egon.data.config import settings
17
from egon.data.datasets import Dataset
18
import egon.data.config
19
20
21
class ZensusPopulation(Dataset):
22
    def __init__(self, dependencies):
23
        super().__init__(
24
            name="ZensusPopulation",
25
            version="0.0.0",
26
            dependencies=dependencies,
27
            tasks=(
28
                download_zensus_pop,
29
                create_zensus_pop_table,
30
                population_to_postgres,
31
            ),
32
        )
33
34
35
class ZensusMiscellaneous(Dataset):
36
    def __init__(self, dependencies):
37
        super().__init__(
38
            name="ZensusMiscellaneous",
39
            version="0.0.0",
40
            dependencies=dependencies,
41
            tasks=(
42
                download_zensus_misc,
43
                create_zensus_misc_tables,
44
                zensus_misc_to_postgres,
45
            ),
46
        )
47
48
49 View Code Duplication
def download_zensus_pop():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
50
    """Download Zensus csv file on population per hectar grid cell."""
51
    data_config = egon.data.config.datasets()
52
    zensus_population_config = data_config["zensus_population"][
53
        "original_data"
54
    ]
55
    download_directory = Path(".") / "zensus_population"
56
    # Create the folder, if it does not exists already
57
    if not os.path.exists(download_directory):
58
        os.mkdir(download_directory)
59
60
    target_file = (
61
        download_directory / zensus_population_config["target"]["file"]
62
    )
63
64
    if not os.path.isfile(target_file):
65
        urlretrieve(zensus_population_config["source"]["url"], target_file)
66
67
68
def download_zensus_misc():
69
    """Download Zensus csv files on data per hectar grid cell."""
70
71
    def download_and_check(target_file_misc, max_iteration=5):
72
        """Download file if doesnt exist and check afterwards. If badzip
73
        remove file and re-download. Repeat until file is fine or reached
74
        maximum iterations."""
75
        bad_file = True
76
        count = 0
77
        while bad_file:
78
79
            if not os.path.isfile(target_file_misc):
80
                urlretrieve(url, target_file_misc)
81
82
            # check zipfile
83
            try:
84
                _ = zipfile.ZipFile(target_file_misc)
85
                print(f"Zip file {target_file_misc} is good.")
86
                bad_file = False
87
            except zipfile.BadZipFile:
88
                os.remove(target_file_misc)
89
                count += 1
90
                if count > max_iteration:
91
                    raise zipfile.BadZipFile(
92
                        f"{target_file_misc} is" f" not a zip file"
93
                    )
94
                pass
95
96
    # Get data config
97
    data_config = egon.data.config.datasets()
98
    download_directory = Path(".") / "zensus_population"
99
    # Create the folder, if it does not exists already
100
    if not os.path.exists(download_directory):
101
        os.mkdir(download_directory)
102
    # Download remaining zensus data set on households, buildings, apartments
103
104
    zensus_config = data_config["zensus_misc"]["original_data"]
105
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
106
    zensus_url = zensus_config["source"]["url"]
107
    zensus_files = zensus_misc_processed["file_table_map"].keys()
108
    url_path_map = list(zip(zensus_url, zensus_files))
109
110
    for url, path in url_path_map:
111
        target_file_misc = download_directory / path
112
113
        download_and_check(target_file_misc, max_iteration=5)
114
115
116
def create_zensus_pop_table():
117
    """Create tables for zensus data in postgres database"""
118
119
    # Get information from data configuration file
120
    data_config = egon.data.config.datasets()
121
    zensus_population_processed = data_config["zensus_population"]["processed"]
122
123
    # Create target schema
124
    db.execute_sql(
125
        f"CREATE SCHEMA IF NOT EXISTS {zensus_population_processed['schema']};"
126
    )
127
128
    # Create table for population data
129
    population_table = (
130
        f"{zensus_population_processed['schema']}"
131
        f".{zensus_population_processed['table']}"
132
    )
133
134
    db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;")
135
136
    db.execute_sql(
137
        f"CREATE TABLE {population_table}"
138
        f""" (id        SERIAL NOT NULL,
139
              grid_id    character varying(254) NOT NULL,
140
              x_mp       int,
141
              y_mp       int,
142
              population smallint,
143
              geom_point geometry(Point,3035),
144
              geom geometry (Polygon, 3035),
145
              CONSTRAINT {zensus_population_processed['table']}_pkey
146
              PRIMARY KEY (id)
147
        );
148
        """
149
    )
150
151
152
def create_zensus_misc_tables():
153
    """Create tables for zensus data in postgres database"""
154
155
    # Get information from data configuration file
156
    data_config = egon.data.config.datasets()
157
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
158
159
    # Create target schema
160
    db.execute_sql(
161
        f"CREATE SCHEMA IF NOT EXISTS {zensus_misc_processed['schema']};"
162
    )
163
164
    # Create tables for household, apartment and building
165
    for table in zensus_misc_processed["file_table_map"].values():
166
        misc_table = f"{zensus_misc_processed['schema']}.{table}"
167
168
        db.execute_sql(f"DROP TABLE IF EXISTS {misc_table} CASCADE;")
169
        db.execute_sql(
170
            f"CREATE TABLE {misc_table}"
171
            f""" (id                 SERIAL,
172
                  grid_id            VARCHAR(50),
173
                  grid_id_new        VARCHAR (50),
174
                  attribute          VARCHAR(50),
175
                  characteristics_code smallint,
176
                  characteristics_text text,
177
                  quantity           smallint,
178
                  quantity_q         smallint,
179
                  zensus_population_id int,
180
                  CONSTRAINT {table}_pkey PRIMARY KEY (id)
181
            );
182
            """
183
        )
184
185
186
def target(source, dataset):
187
    """Generate the target path corresponding to a source path.
188
189
    Parameters
190
    ----------
191
    dataset: str
192
        Toggles between production (`dataset='Everything'`) and test mode e.g.
193
        (`dataset='Schleswig-Holstein'`).
194
        In production mode, data covering entire Germany
195
        is used. In the test mode a subset of this data is used for testing the
196
        workflow.
197
    Returns
198
    -------
199
    Path
200
        Path to target csv-file
201
202
    """
203
    return Path(
204
        os.path.join(Path("."), "zensus_population", source.stem)
205
        + "."
206
        + dataset
207
        + source.suffix
208
    )
209
210
211
def select_geom():
212
    """Select the union of the geometries of Schleswig-Holstein from the
213
    database, convert their projection to the one used in the CSV file,
214
    output the result to stdout as a GeoJSON string and read it into a
215
    prepared shape for filtering.
216
217
    """
218
    docker_db_config = db.credentials()
219
220
    geojson = subprocess.run(
221
        ["ogr2ogr"]
222
        + ["-s_srs", "epsg:4326"]
223
        + ["-t_srs", "epsg:3035"]
224
        + ["-f", "GeoJSON"]
225
        + ["/vsistdout/"]
226
        + [
227
            f"PG:host={docker_db_config['HOST']}"
228
            f" user='{docker_db_config['POSTGRES_USER']}'"
229
            f" password='{docker_db_config['POSTGRES_PASSWORD']}'"
230
            f" port={docker_db_config['PORT']}"
231
            f" dbname='{docker_db_config['POSTGRES_DB']}'"
232
        ]
233
        + ["-sql", "SELECT ST_Union(geometry) FROM boundaries.vg250_lan"],
234
        text=True,
235
    )
236
    features = json.loads(geojson.stdout)["features"]
237
    assert (
238
        len(features) == 1
239
    ), f"Found {len(features)} geometry features, expected exactly one."
240
241
    return prep(shape(features[0]["geometry"]))
242
243
244
def filter_zensus_population(filename, dataset):
245
    """This block filters lines in the source CSV file and copies
246
    the appropriate ones to the destination based on geometry.
247
248
249
    Parameters
250
    ----------
251
    filename : str
252
        Path to input csv-file
253
    dataset: str, optional
254
        Toggles between production (`dataset='Everything'`) and test mode e.g.
255
        (`dataset='Schleswig-Holstein'`).
256
        In production mode, data covering entire Germany
257
        is used. In the test mode a subset of this data is used for testing the
258
        workflow.
259
    Returns
260
    -------
261
    str
262
        Path to output csv-file
263
264
    """
265
266
    csv_file = Path(filename).resolve(strict=True)
267
268
    schleswig_holstein = select_geom()
269
270
    if not os.path.isfile(target(csv_file, dataset)):
271
272
        with open(csv_file, mode="r", newline="") as input_lines:
273
            rows = csv.DictReader(input_lines, delimiter=";")
274
            gitter_ids = set()
275
            with open(
276
                target(csv_file, dataset), mode="w", newline=""
277
            ) as destination:
278
                output = csv.DictWriter(
279
                    destination, delimiter=";", fieldnames=rows.fieldnames
280
                )
281
                output.writeheader()
282
                output.writerows(
283
                    gitter_ids.add(row["Gitter_ID_100m"]) or row
284
                    for row in rows
285
                    if schleswig_holstein.intersects(
286
                        Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
287
                    )
288
                )
289
    return target(csv_file, dataset)
290
291
292
def filter_zensus_misc(filename, dataset):
293
    """This block filters lines in the source CSV file and copies
294
    the appropriate ones to the destination based on grid_id values.
295
296
297
    Parameters
298
    ----------
299
    filename : str
300
        Path to input csv-file
301
    dataset: str, optional
302
        Toggles between production (`dataset='Everything'`) and test mode e.g.
303
        (`dataset='Schleswig-Holstein'`).
304
        In production mode, data covering entire Germany
305
        is used. In the test mode a subset of this data is used for testing the
306
        workflow.
307
    Returns
308
    -------
309
    str
310
        Path to output csv-file
311
312
    """
313
    csv_file = Path(filename).resolve(strict=True)
314
315
    gitter_ids = set(
316
        pd.read_sql(
317
            "SELECT grid_id from society.destatis_zensus_population_per_ha",
318
            con=db.engine(),
319
        ).grid_id.values
320
    )
321
322
    if not os.path.isfile(target(csv_file, dataset)):
323
        with open(
324
            csv_file, mode="r", newline="", encoding="iso-8859-1"
325
        ) as inputs:
326
            rows = csv.DictReader(inputs, delimiter=",")
327
            with open(
328
                target(csv_file, dataset),
329
                mode="w",
330
                newline="",
331
                encoding="iso-8859-1",
332
            ) as destination:
333
                output = csv.DictWriter(
334
                    destination, delimiter=",", fieldnames=rows.fieldnames
335
                )
336
                output.writeheader()
337
                output.writerows(
338
                    row for row in rows if row["Gitter_ID_100m"] in gitter_ids
339
                )
340
    return target(csv_file, dataset)
341
342
343
def population_to_postgres():
344
    """Import Zensus population data to postgres database"""
345
    # Get information from data configuration file
346
    data_config = egon.data.config.datasets()
347
    zensus_population_orig = data_config["zensus_population"]["original_data"]
348
    zensus_population_processed = data_config["zensus_population"]["processed"]
349
    input_file = (
350
        Path(".")
351
        / "zensus_population"
352
        / zensus_population_orig["target"]["file"]
353
    )
354
    dataset = settings()["egon-data"]["--dataset-boundary"]
355
356
    # Read database configuration from docker-compose.yml
357
    docker_db_config = db.credentials()
358
359
    population_table = (
360
        f"{zensus_population_processed['schema']}"
361
        f".{zensus_population_processed['table']}"
362
    )
363
364
    with zipfile.ZipFile(input_file) as zf:
365
        for filename in zf.namelist():
366
367
            zf.extract(filename)
368
369
            if dataset == "Everything":
370
                filename_insert = filename
371
            else:
372
                filename_insert = filter_zensus_population(filename, dataset)
373
374
            host = ["-h", f"{docker_db_config['HOST']}"]
375
            port = ["-p", f"{docker_db_config['PORT']}"]
376
            pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
377
            user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
378
            command = [
379
                "-c",
380
                rf"\copy {population_table} (grid_id, x_mp, y_mp, population)"
381
                rf" FROM '{filename_insert}' DELIMITER ';' CSV HEADER;",
382
            ]
383
            subprocess.run(
384
                ["psql"] + host + port + pgdb + user + command,
385
                env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
386
            )
387
388
        os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined in case the for loop on line 365 is not entered. Are you sure this can never be the case?
Loading history...
389
390
    db.execute_sql(
391
        f"UPDATE {population_table} zs"
392
        " SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);"
393
    )
394
395
    db.execute_sql(
396
        f"UPDATE {population_table} zs"
397
        """ SET geom=ST_SetSRID(
398
                (ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)),
399
                3035
400
            );
401
        """
402
    )
403
404
    db.execute_sql(
405
        f"CREATE INDEX {zensus_population_processed['table']}_geom_idx ON"
406
        f" {population_table} USING gist (geom);"
407
    )
408
409
    db.execute_sql(
410
        f"CREATE INDEX"
411
        f" {zensus_population_processed['table']}_geom_point_idx"
412
        f" ON  {population_table} USING gist (geom_point);"
413
    )
414
415
416
def zensus_misc_to_postgres():
417
    """Import data on buildings, households and apartments to postgres db"""
418
419
    # Get information from data configuration file
420
    data_config = egon.data.config.datasets()
421
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
422
    zensus_population_processed = data_config["zensus_population"]["processed"]
423
    file_path = Path(".") / "zensus_population"
424
    dataset = settings()["egon-data"]["--dataset-boundary"]
425
426
    population_table = (
427
        f"{zensus_population_processed['schema']}"
428
        f".{zensus_population_processed['table']}"
429
    )
430
431
    # Read database configuration from docker-compose.yml
432
    docker_db_config = db.credentials()
433
434
    for input_file, table in zensus_misc_processed["file_table_map"].items():
435
        with zipfile.ZipFile(file_path / input_file) as zf:
436
            csvfiles = [n for n in zf.namelist() if n.lower()[-3:] == "csv"]
437
            for filename in csvfiles:
438
                zf.extract(filename)
439
440
                if dataset == "Everything":
441
                    filename_insert = filename
442
                else:
443
                    filename_insert = filter_zensus_misc(filename, dataset)
444
445
                host = ["-h", f"{docker_db_config['HOST']}"]
446
                port = ["-p", f"{docker_db_config['PORT']}"]
447
                pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
448
                user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
449
                command = [
450
                    "-c",
451
                    rf"\copy {zensus_population_processed['schema']}.{table}"
452
                    f"""(grid_id,
453
                        grid_id_new,
454
                        attribute,
455
                        characteristics_code,
456
                        characteristics_text,
457
                        quantity,
458
                        quantity_q)
459
                        FROM '{filename_insert}' DELIMITER ','
460
                        CSV HEADER
461
                        ENCODING 'iso-8859-1';""",
462
                ]
463
                subprocess.run(
464
                    ["psql"] + host + port + pgdb + user + command,
465
                    env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
466
                )
467
468
            os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined for all execution paths.
Loading history...
469
470
        db.execute_sql(
471
            f"""UPDATE {zensus_population_processed['schema']}.{table} as b
472
                    SET zensus_population_id = zs.id
473
                    FROM {population_table} zs
474
                    WHERE b.grid_id = zs.grid_id;"""
475
        )
476
477
        db.execute_sql(
478
            f"""ALTER TABLE {zensus_population_processed['schema']}.{table}
479
                    ADD CONSTRAINT {table}_fkey
480
                    FOREIGN KEY (zensus_population_id)
481
                    REFERENCES {population_table}(id);"""
482
        )
483
484
    # Create combined table
485
    create_combined_zensus_table()
486
487
    # Delete entries for unpopulated cells
488
    adjust_zensus_misc()
489
490
491
def create_combined_zensus_table():
492
    """Create combined table with buildings, apartments and population per cell
493
494
    Only apartment and building data with acceptable data quality
495
    (quantity_q<2) is used, all other data is dropped. For more details on data
496
    quality see Zensus docs:
497
    https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html
498
499
    If there's no data on buildings or apartments for a certain cell, the value
500
    for building_count resp. apartment_count contains NULL.
501
    """
502
    sql_script = os.path.join(
503
        os.path.dirname(__file__), "create_combined_zensus_table.sql"
504
    )
505
    db.execute_sql_script(sql_script)
506
507
508
def adjust_zensus_misc():
509
    """Delete unpopulated cells in zensus-households, -buildings and -apartments
510
511
    Some unpopulated zensus cells are listed in:
512
    - egon_destatis_zensus_household_per_ha
513
    - egon_destatis_zensus_building_per_ha
514
    - egon_destatis_zensus_apartment_per_ha
515
516
    This can be caused by missing population
517
    information due to privacy or other special cases (e.g. holiday homes
518
    are listed as buildings but are not permanently populated.)
519
    In the follwong tasks of egon-data, only data of populated cells is used.
520
521
    Returns
522
    -------
523
    None.
524
525
    """
526
    # Get information from data configuration file
527
    data_config = egon.data.config.datasets()
528
    zensus_population_processed = data_config["zensus_population"]["processed"]
529
    zensus_misc_processed = data_config["zensus_misc"]["processed"]
530
531
    population_table = (
532
        f"{zensus_population_processed['schema']}"
533
        f".{zensus_population_processed['table']}"
534
    )
535
536
    for input_file, table in zensus_misc_processed["file_table_map"].items():
537
        db.execute_sql(
538
            f"""
539
             DELETE FROM {zensus_population_processed['schema']}.{table} as b
540
             WHERE b.zensus_population_id IN (
541
                 SELECT id FROM {population_table}
542
                 WHERE population < 0);"""
543
        )
544