Passed
Pull Request — dev (#1304)
by
unknown
01:59
created

data.datasets.zensus.population_to_postgres()   A

Complexity

Conditions 4

Size

Total Lines 58
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 58
rs 9.16
c 0
b 0
f 0
cc 4
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""The central module containing all code dealing with importing Zensus data.
2
"""
3
4
from pathlib import Path
5
import csv
6
import json
7
import os
8
import requests
9
import zipfile
10
11
from shapely.geometry import Point, shape
12
from shapely.prepared import prep
13
import pandas as pd
14
15
from egon.data import db, subprocess
16
from egon.data.config import settings
17
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
18
19
20
class ZensusPopulation(Dataset):
21
    sources = DatasetSources(
22
        urls={
23
            "original_data": (
24
                "https://www.zensus2011.de/SharedDocs/Downloads/DE/"
25
                "Pressemitteilung/DemografischeGrunddaten/"
26
                "csv_Bevoelkerung_100m_Gitter.zip?__blob=publicationFile&v=3"
27
                ),
28
            }
29
    )
30
31
    targets = DatasetTargets(
32
        files = {
33
            "zensus_population":
34
                "zensus_population/csv_Bevoelkerung_100m_Gitter.zip"},
35
        tables= {
36
            "zensus_population":
37
                "society.destatis_zensus_population_per_ha"}
38
        )
39
40
    def __init__(self, dependencies):
41
        super().__init__(
42
            name="ZensusPopulation",
43
            version="0.0.2",
44
            dependencies=dependencies,
45
            tasks=(
46
                download_zensus_pop,
47
                create_zensus_pop_table,
48
                population_to_postgres,
49
            )
50
        )
51
52
53
class ZensusMiscellaneous(Dataset):
54
    sources = DatasetSources(
55
        urls={
56
            "zensus_households": (
57
                "https://www.zensus2011.de/SharedDocs/Downloads/DE/"
58
                "Pressemitteilung/DemografischeGrunddaten/"
59
                "csv_Haushalte_100m_Gitter.zip?__blob=publicationFile&v=2"
60
            ),
61
            "zensus_buildings": (
62
                "https://www.zensus2011.de/SharedDocs/Downloads/DE/"
63
                "Pressemitteilung/DemografischeGrunddaten/"
64
                "csv_Gebaeude_100m_Gitter.zip?__blob=publicationFile&v=2"
65
            ),
66
            "zensus_apartments": (
67
                "https://www.zensus2011.de/SharedDocs/Downloads/DE/"
68
                "Pressemitteilung/DemografischeGrunddaten/"
69
                "csv_Wohnungen_100m_Gitter.zip?__blob=publicationFile&v=5"
70
            ),
71
        }
72
    )
73
    targets = DatasetTargets(
74
        files = {
75
            "zensus_households":
76
                "zensus_population/csv_Haushalte_100m_Gitter.zip",
77
            "zensus_buildings":
78
                "zensus_population/csv_Gebaeude_100m_Gitter.zip",
79
            "zensus_apartments":
80
                "zensus_population/csv_Wohnungen_100m_Gitter.zip"
81
            },
82
        tables = {
83
            "zensus_households":
84
                "society.egon_destatis_zensus_household_per_ha",
85
            "zensus_buildings":
86
                "society.egon_destatis_zensus_building_per_ha",
87
            "zensus_apartments":
88
                "society.egon_destatis_zensus_apartment_per_ha",
89
            }
90
        )
91
    def __init__(self, dependencies):
92
        super().__init__(
93
            name="ZensusMiscellaneous",
94
            version="0.0.1",
95
            dependencies=dependencies,
96
            tasks=(
97
                download_zensus_misc,
98
                create_zensus_misc_tables,
99
                zensus_misc_to_postgres,
100
            ),
101
        )
102
103
104
def download_and_check(url, target_file, max_iteration=5):
105
    """Download file from url (http) if it doesn't exist and check afterwards.
106
    If bad zip remove file and re-download. Repeat until file is fine or
107
    reached maximum iterations."""
108
    bad_file = True
109
    count = 0
110
    while bad_file:
111
112
        # download file if it doesn't exist
113
        if not os.path.isfile(target_file):
114
            # check if url
115
            if url.lower().startswith("http"):
116
                print("Downloading: ", url)
117
                req = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, stream=True)
118
                open(target_file, 'wb').write(req.content)
119
            else:
120
                raise ValueError("No http url")
121
122
        # check zipfile
123
        try:
124
            with zipfile.ZipFile(target_file):
125
                print(f"Zip file {target_file} is good.")
126
            bad_file = False
127
        except zipfile.BadZipFile as ex:
128
            os.remove(target_file)
129
            count += 1
130
            if count > max_iteration:
131
                raise StopIteration(
132
                    f"Max iteration of {max_iteration} is exceeded"
133
                ) from ex
134
135
136
def download_zensus_pop():
137
    """Download Zensus csv file on population per hectare grid cell."""
138
139
    download_directory = Path(".") / ZensusPopulation.targets.files["zensus_population"]
140
    # Create the folder, if it does not exist already
141
    if not os.path.exists(download_directory):
142
        os.mkdir(download_directory)
143
144
    download_and_check(
145
        ZensusPopulation.sources.urls["original_data"],
146
        ZensusPopulation.targets.files["zensus_population"],
147
        max_iteration=5)
148
149
150
def download_zensus_misc():
151
    """Download Zensus csv files on data per hectare grid cell."""
152
153
    # Get data config
154
    download_directory = Path(".") / ZensusMiscellaneous.targets.files["zensus_buildings"]
155
    # Create the folder, if it does not exist already
156
    if not os.path.exists(download_directory):
157
        os.mkdir(download_directory)
158
    # Download remaining zensus data set on households, buildings, apartments
159
    for key in ZensusMiscellaneous.sources.urls:
160
       download_and_check(
161
           ZensusMiscellaneous.sources.urls[key],
162
           ZensusMiscellaneous.targets.files[key],
163
           max_iteration=5)
164
165
166
167
def create_zensus_pop_table():
168
    """Create tables for zensus data in postgres database"""
169
170
    # Create table for population data
171
    population_table = ZensusPopulation.targets.tables["zensus_population"]
172
173
174
    # Create target schema
175
    db.execute_sql(
176
        f"""
177
        CREATE SCHEMA IF NOT EXISTS
178
        {ZensusPopulation.targets.get_table_schema("zensus_population")};
179
        """
180
    )
181
182
    db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;")
183
184
    db.execute_sql(
185
        f"CREATE TABLE {population_table}"
186
        f""" (id        SERIAL NOT NULL,
187
              grid_id    character varying(254) NOT NULL,
188
              x_mp       int,
189
              y_mp       int,
190
              population smallint,
191
              geom_point geometry(Point,3035),
192
              geom geometry (Polygon, 3035),
193
              CONSTRAINT {population_table.split('.')[1]}_pkey
194
              PRIMARY KEY (id)
195
        );
196
        """
197
    )
198
199
200
def create_zensus_misc_tables():
201
    """Create tables for zensus data in postgres database"""
202
203
    # Create tables for household, apartment and building
204
    for table in ZensusMiscellaneous.targets.tables:
205
        table_name = ZensusMiscellaneous.targets.tables[table]
206
        # Create target schema
207
        db.execute_sql(
208
            f"CREATE SCHEMA IF NOT EXISTS {table_name.split('.')[0]};"
209
        )
210
211
        db.execute_sql(f"DROP TABLE IF EXISTS {table_name} CASCADE;")
212
        db.execute_sql(
213
            f"CREATE TABLE {table_name}"
214
            f""" (id                 SERIAL,
215
                  grid_id            VARCHAR(50),
216
                  grid_id_new        VARCHAR (50),
217
                  attribute          VARCHAR(50),
218
                  characteristics_code smallint,
219
                  characteristics_text text,
220
                  quantity           smallint,
221
                  quantity_q         smallint,
222
                  zensus_population_id int,
223
                  CONSTRAINT {table_name.split('.')[1]}_pkey PRIMARY KEY (id)
224
            );
225
            """
226
        )
227
228
229
def target(source, dataset):
230
    """Generate the target path corresponding to a source path.
231
232
    Parameters
233
    ----------
234
    dataset: str
235
        Toggles between production (`dataset='Everything'`) and test mode e.g.
236
        (`dataset='Schleswig-Holstein'`).
237
        In production mode, data covering entire Germany
238
        is used. In the test mode a subset of this data is used for testing the
239
        workflow.
240
    Returns
241
    -------
242
    Path
243
        Path to target csv-file
244
245
    """
246
    return Path(
247
        os.path.join(Path("."), "zensus_population", source.stem)
248
        + "."
249
        + dataset
250
        + source.suffix
251
    )
252
253
254
def select_geom():
255
    """Select the union of the geometries of Schleswig-Holstein from the
256
    database, convert their projection to the one used in the CSV file,
257
    output the result to stdout as a GeoJSON string and read it into a
258
    prepared shape for filtering.
259
260
    """
261
    docker_db_config = db.credentials()
262
263
    geojson = subprocess.run(
264
        ["ogr2ogr"]
265
        + ["-s_srs", "epsg:4326"]
266
        + ["-t_srs", "epsg:3035"]
267
        + ["-f", "GeoJSON"]
268
        + ["/vsistdout/"]
269
        + [
270
            f"PG:host={docker_db_config['HOST']}"
271
            f" user='{docker_db_config['POSTGRES_USER']}'"
272
            f" password='{docker_db_config['POSTGRES_PASSWORD']}'"
273
            f" port={docker_db_config['PORT']}"
274
            f" dbname='{docker_db_config['POSTGRES_DB']}'"
275
        ]
276
        + ["-sql", "SELECT ST_Union(geometry) FROM boundaries.vg250_lan"],
277
        text=True,
278
    )
279
    features = json.loads(geojson.stdout)["features"]
280
    assert (
281
        len(features) == 1
282
    ), f"Found {len(features)} geometry features, expected exactly one."
283
284
    return prep(shape(features[0]["geometry"]))
285
286
287
def filter_zensus_population(filename, dataset):
288
    """This block filters lines in the source CSV file and copies
289
    the appropriate ones to the destination based on geometry.
290
291
292
    Parameters
293
    ----------
294
    filename : str
295
        Path to input csv-file
296
    dataset: str, optional
297
        Toggles between production (`dataset='Everything'`) and test mode e.g.
298
        (`dataset='Schleswig-Holstein'`).
299
        In production mode, data covering entire Germany
300
        is used. In the test mode a subset of this data is used for testing the
301
        workflow.
302
    Returns
303
    -------
304
    str
305
        Path to output csv-file
306
307
    """
308
309
    csv_file = Path(filename).resolve(strict=True)
310
311
    schleswig_holstein = select_geom()
312
313
    if not os.path.isfile(target(csv_file, dataset)):
314
315
        with open(csv_file, mode="r", newline="") as input_lines:
316
            rows = csv.DictReader(input_lines, delimiter=";")
317
            gitter_ids = set()
318
            with open(
319
                target(csv_file, dataset), mode="w", newline=""
320
            ) as destination:
321
                output = csv.DictWriter(
322
                    destination, delimiter=";", fieldnames=rows.fieldnames
323
                )
324
                output.writeheader()
325
                output.writerows(
326
                    gitter_ids.add(row["Gitter_ID_100m"]) or row
327
                    for row in rows
328
                    if schleswig_holstein.intersects(
329
                        Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
330
                    )
331
                )
332
    return target(csv_file, dataset)
333
334
335
def filter_zensus_misc(filename, dataset):
336
    """This block filters lines in the source CSV file and copies
337
    the appropriate ones to the destination based on grid_id values.
338
339
340
    Parameters
341
    ----------
342
    filename : str
343
        Path to input csv-file
344
    dataset: str, optional
345
        Toggles between production (`dataset='Everything'`) and test mode e.g.
346
        (`dataset='Schleswig-Holstein'`).
347
        In production mode, data covering entire Germany
348
        is used. In the test mode a subset of this data is used for testing the
349
        workflow.
350
    Returns
351
    -------
352
    str
353
        Path to output csv-file
354
355
    """
356
    csv_file = Path(filename).resolve(strict=True)
357
358
    gitter_ids = set(
359
        pd.read_sql(
360
            "SELECT grid_id from society.destatis_zensus_population_per_ha",
361
            con=db.engine(),
362
        ).grid_id.values
363
    )
364
365
    if not os.path.isfile(target(csv_file, dataset)):
366
        with open(
367
            csv_file, mode="r", newline="", encoding="iso-8859-1"
368
        ) as inputs:
369
            rows = csv.DictReader(inputs, delimiter=",")
370
            with open(
371
                target(csv_file, dataset),
372
                mode="w",
373
                newline="",
374
                encoding="iso-8859-1",
375
            ) as destination:
376
                output = csv.DictWriter(
377
                    destination, delimiter=",", fieldnames=rows.fieldnames
378
                )
379
                output.writeheader()
380
                output.writerows(
381
                    row for row in rows if row["Gitter_ID_100m"] in gitter_ids
382
                )
383
    return target(csv_file, dataset)
384
385
386
def population_to_postgres():
387
    """Import Zensus population data to postgres database"""
388
    # Get information from data configuration file
389
    input_file = ZensusPopulation.targets.files["zensus_population"]
390
    dataset = settings()["egon-data"]["--dataset-boundary"]
391
392
    # Read database configuration from docker-compose.yml
393
    docker_db_config = db.credentials()
394
395
    population_table = ZensusPopulation.targets.tables["zensus_population"]
396
397
    with zipfile.ZipFile(input_file) as zf:
398
        for filename in zf.namelist():
399
400
            zf.extract(filename)
401
402
            if dataset == "Everything":
403
                filename_insert = filename
404
            else:
405
                filename_insert = filter_zensus_population(filename, dataset)
406
407
            host = ["-h", f"{docker_db_config['HOST']}"]
408
            port = ["-p", f"{docker_db_config['PORT']}"]
409
            pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
410
            user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
411
            command = [
412
                "-c",
413
                rf"\copy {population_table} (grid_id, x_mp, y_mp, population)"
414
                rf" FROM '{filename_insert}' DELIMITER ';' CSV HEADER;",
415
            ]
416
            subprocess.run(
417
                ["psql"] + host + port + pgdb + user + command,
418
                env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
419
            )
420
421
        os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined in case the for loop on line 398 is not entered. Are you sure this can never be the case?
Loading history...
422
423
    db.execute_sql(
424
        f"UPDATE {population_table} zs"
425
        " SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);"
426
    )
427
428
    db.execute_sql(
429
        f"UPDATE {population_table} zs"
430
        """ SET geom=ST_SetSRID(
431
                (ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)),
432
                3035
433
            );
434
        """
435
    )
436
437
    db.execute_sql(
438
        f"CREATE INDEX {population_table.split('.')[1]}_geom_idx ON"
439
        f" {population_table} USING gist (geom);"
440
    )
441
442
    db.execute_sql(
443
        f"CREATE INDEX"
444
        f" {population_table.split('.')[1]}_geom_point_idx"
445
        f" ON  {population_table} USING gist (geom_point);"
446
    )
447
448
449
def zensus_misc_to_postgres():
450
    """Import data on buildings, households and apartments to postgres db"""
451
452
    dataset = settings()["egon-data"]["--dataset-boundary"]
453
454
    population_table = ZensusPopulation.targets.tables["zensus_population"]
455
456
    # Read database configuration from docker-compose.yml
457
    docker_db_config = db.credentials()
458
459
    for key in ZensusMiscellaneous.sources.urls:
460
461
        with zipfile.ZipFile(ZensusMiscellaneous.targets.files[key]) as zf:
462
            csvfiles = [n for n in zf.namelist() if n.lower()[-3:] == "csv"]
463
            for filename in csvfiles:
464
                zf.extract(filename)
465
466
                if dataset == "Everything":
467
                    filename_insert = filename
468
                else:
469
                    filename_insert = filter_zensus_misc(filename, dataset)
470
471
                host = ["-h", f"{docker_db_config['HOST']}"]
472
                port = ["-p", f"{docker_db_config['PORT']}"]
473
                pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
474
                user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
475
                command = [
476
                    "-c",
477
                    rf"\copy {ZensusMiscellaneous.targets.tables[key]}"
478
                    f"""(grid_id,
479
                        grid_id_new,
480
                        attribute,
481
                        characteristics_code,
482
                        characteristics_text,
483
                        quantity,
484
                        quantity_q)
485
                        FROM '{filename_insert}' DELIMITER ','
486
                        CSV HEADER
487
                        ENCODING 'iso-8859-1';""",
488
                ]
489
                subprocess.run(
490
                    ["psql"] + host + port + pgdb + user + command,
491
                    env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
492
                )
493
494
            os.remove(filename)
0 ignored issues
show
introduced by
The variable filename does not seem to be defined for all execution paths.
Loading history...
495
496
        db.execute_sql(
497
            f"""UPDATE {ZensusMiscellaneous.targets.tables[key]} as b
498
                    SET zensus_population_id = zs.id
499
                    FROM {population_table} zs
500
                    WHERE b.grid_id = zs.grid_id;"""
501
        )
502
503
        db.execute_sql(
504
            f"""ALTER TABLE {ZensusMiscellaneous.targets.tables[key]}
505
                    ADD CONSTRAINT
506
                    {ZensusMiscellaneous.targets.get_table_name(key)}_fkey
507
                    FOREIGN KEY (zensus_population_id)
508
                    REFERENCES {population_table}(id);"""
509
        )
510
511
    # Create combined table
512
    create_combined_zensus_table()
513
514
    # Delete entries for unpopulated cells
515
    adjust_zensus_misc()
516
517
518
def create_combined_zensus_table():
519
    """Create combined table with buildings, apartments and population per cell
520
521
    Only apartment and building data with acceptable data quality
522
    (quantity_q<2) is used, all other data is dropped. For more details on data
523
    quality see Zensus docs:
524
    https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html
525
526
    If there's no data on buildings or apartments for a certain cell, the value
527
    for building_count resp. apartment_count contains NULL.
528
    """
529
    sql_script = os.path.join(
530
        os.path.dirname(__file__), "create_combined_zensus_table.sql"
531
    )
532
    db.execute_sql_script(sql_script)
533
534
535
def adjust_zensus_misc():
536
    """Delete unpopulated cells in zensus-households, -buildings and -apartments
537
538
    Some unpopulated zensus cells are listed in:
539
    - egon_destatis_zensus_household_per_ha
540
    - egon_destatis_zensus_building_per_ha
541
    - egon_destatis_zensus_apartment_per_ha
542
543
    This can be caused by missing population
544
    information due to privacy or other special cases (e.g. holiday homes
545
    are listed as buildings but are not permanently populated.)
546
    In the following tasks of egon-data, only data of populated cells is used.
547
548
    Returns
549
    -------
550
    None.
551
552
    """
553
554
    for table in ZensusMiscellaneous.targets.tables:
555
        db.execute_sql(
556
            f"""
557
             DELETE FROM {ZensusMiscellaneous.targets.tables[table]} as b
558
             WHERE b.zensus_population_id IN (
559
                 SELECT id FROM {
560
                     ZensusPopulation.targets.tables["zensus_population"]}
561
                 WHERE population < 0);"""
562
        )
563