Passed
Pull Request — dev (#1375)
by
unknown
02:12
created

data.datasets.vg250.to_postgres()   A

Complexity

Conditions 4

Size

Total Lines 75
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 36
dl 0
loc 75
rs 9.016
c 0
b 0
f 0
cc 4
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""The central module containing all code dealing with VG250 data.
2
3
This module either directly contains the code dealing with importing VG250
4
data, or it re-exports everything needed to handle it. Please refrain
5
from importing code from any modules below this one, because it might
6
lead to unwanted behaviour.
7
8
If you have to import code from a module below this one because the code
9
isn't exported from this module, please file a bug, so we can fix this.
10
"""
11
12
from pathlib import Path
13
from urllib.request import urlretrieve
14
import codecs
15
import datetime
16
import json
17
import os
18
import time
19
20
from geoalchemy2 import Geometry
21
import geopandas as gpd
22
23
from egon.data import db
24
from egon.data.config import settings
25
from egon.data.datasets import Dataset
26
from egon.data.metadata import (
27
    context,
28
    licenses_datenlizenz_deutschland,
29
    meta_metadata,
30
)
31
import egon.data.config
32
from egon_validation import (
33
    RowCountValidation,
34
    DataTypeValidation,
35
    NotNullAndNotNaNValidation,
36
    WholeTableNotNullAndNotNaNValidation,
37
    ValueSetValidation
38
)
39
40
41
def download_files():
42
    """
43
    Download VG250 (Verwaltungsgebiete) shape files.
44
45
    Data is downloaded from source specified in *datasets.yml* in section
46
    *vg250/original_data/source/url* and saved to file specified in
47
    *vg250/original_data/target/file*.
48
49
    """
50
    data_config = egon.data.config.datasets()
51
    vg250_config = data_config["vg250"]["original_data"]
52
53
    download_directory = Path(".") / "vg250"
54
    # Create the folder, if it does not exist already
55
    if not os.path.exists(download_directory):
56
        os.mkdir(download_directory)
57
58
    target_file = download_directory / vg250_config["target"]["file"]
59
60
    if not os.path.isfile(target_file):
61
        urlretrieve(vg250_config["source"]["url"], target_file)
62
63
64
def to_postgres():
65
    """
66
    Writes original VG250 data to database.
67
68
    Creates schema boundaries if it does not yet exist.
69
    Newly creates all tables specified as keys in *datasets.yml* in section
70
    *vg250/processed/file_table_map*.
71
72
    """
73
74
    # Get information from data configuration file
75
    data_config = egon.data.config.datasets()
76
    vg250_orig = data_config["vg250"]["original_data"]
77
    vg250_processed = data_config["vg250"]["processed"]
78
79
    # Create target schema
80
    db.execute_sql(f"CREATE SCHEMA IF NOT EXISTS {vg250_processed['schema']};")
81
82
    zip_file = Path(".") / "vg250" / vg250_orig["target"]["file"]
83
    engine_local_db = db.engine()
84
85
    # Extract shapefiles from zip archive and send it to postgres db
86
    for filename, table in vg250_processed["file_table_map"].items():
87
        # Open files and read .shp (within .zip) with geopandas
88
        data = gpd.read_file(
89
            f"zip://{zip_file}!vg250_01-01.geo84.shape.ebenen/"
90
            f"vg250_ebenen_0101/{filename}"
91
        )
92
93
        boundary = settings()["egon-data"]["--dataset-boundary"]
94
        if boundary != "Everything":
95
            # read-in borders of federal state Schleswig-Holstein
96
            data_sta = gpd.read_file(
97
                f"zip://{zip_file}!vg250_01-01.geo84.shape.ebenen/"
98
                f"vg250_ebenen_0101/VG250_LAN.shp"
99
            ).query(f"GEN == '{boundary}'")
100
            data_sta.BEZ = "Bundesrepublik"
101
            data_sta.NUTS = "DE"
102
            # import borders of Schleswig-Holstein as borders of state
103
            if table == "vg250_sta":
104
                data = data_sta
105
            # choose only areas in Schleswig-Holstein
106
            else:
107
                data = data[
108
                    data.within(data_sta.dissolve(by="GEN").geometry.values[0])
109
                ]
110
111
        # Set index column and format column headings
112
        data.index.set_names("id", inplace=True)
113
        data.columns = [x.lower() for x in data.columns]
114
115
        # Drop table before inserting data
116
        db.execute_sql(
117
            f"DROP TABLE IF EXISTS "
118
            f"{vg250_processed['schema']}.{table} CASCADE;"
119
        )
120
121
        # create database table from geopandas dataframe
122
        data.to_postgis(
123
            table,
124
            engine_local_db,
125
            schema=vg250_processed["schema"],
126
            index=True,
127
            if_exists="replace",
128
            dtype={"geometry": Geometry()},
129
        )
130
131
        db.execute_sql(
132
            f"ALTER TABLE {vg250_processed['schema']}.{table} "
133
            f"ADD PRIMARY KEY (id);"
134
        )
135
136
        # Add index on geometry column
137
        db.execute_sql(
138
            f"CREATE INDEX {table}_geometry_idx ON "
139
            f"{vg250_processed['schema']}.{table} USING gist (geometry);"
140
        )
141
142
143
def add_metadata():
144
    """Writes metadata JSON string into table comment."""
145
    # Prepare variables
146
    vg250_config = egon.data.config.datasets()["vg250"]
147
148
    title_and_description = {
149
        "vg250_sta": {
150
            "title": "BKG - Verwaltungsgebiete 1:250.000 - Staat (STA)",
151
            "description": "Staatsgrenzen der Bundesrepublik Deutschland",
152
        },
153
        "vg250_lan": {
154
            "title": "BKG - Verwaltungsgebiete 1:250.000 - Länder (LAN)",
155
            "description": "Landesgrenzen der Bundesländer in der "
156
            "Bundesrepublik Deutschland",
157
        },
158
        "vg250_rbz": {
159
            "title": "BKG - Verwaltungsgebiete 1:250.000 - Regierungsbezirke "
160
            "(RBZ)",
161
            "description": "Grenzen der Regierungsbezirke in der "
162
            "Bundesrepublik Deutschland",
163
        },
164
        "vg250_krs": {
165
            "title": "BKG - Verwaltungsgebiete 1:250.000 - Kreise (KRS)",
166
            "description": "Grenzen der Landkreise in der "
167
            "Bundesrepublik Deutschland",
168
        },
169
        "vg250_vwg": {
170
            "title": "BKG - Verwaltungsgebiete 1:250.000 - "
171
            "Verwaltungsgemeinschaften (VWG)",
172
            "description": "Grenzen der Verwaltungsgemeinschaften in der "
173
            "Bundesrepublik Deutschland",
174
        },
175
        "vg250_gem": {
176
            "title": "BKG - Verwaltungsgebiete 1:250.000 - Gemeinden (GEM)",
177
            "description": "Grenzen der Gemeinden in der "
178
            "Bundesrepublik Deutschland",
179
        },
180
    }
181
182
    licenses = [
183
        licenses_datenlizenz_deutschland(
184
            attribution="© Bundesamt für Kartographie und Geodäsie "
185
            "2020 (Daten verändert)"
186
        )
187
    ]
188
189
    vg250_source = {
190
        "title": "Verwaltungsgebiete 1:250 000 (Ebenen)",
191
        "description": "Der Datenbestand umfasst sämtliche Verwaltungseinheiten der "
192
        "hierarchischen Verwaltungsebenen vom Staat bis zu den Gemeinden "
193
        "mit ihren Grenzen, statistischen Schlüsselzahlen, Namen der "
194
        "Verwaltungseinheit sowie die spezifische Bezeichnung der "
195
        "Verwaltungsebene des jeweiligen Landes.",
196
        "path": vg250_config["original_data"]["source"]["url"],
197
        "licenses": licenses,
198
    }
199
200
    for table in vg250_config["processed"]["file_table_map"].values():
201
        schema_table = ".".join([vg250_config["processed"]["schema"], table])
202
        meta = {
203
            "name": schema_table,
204
            "title": title_and_description[table]["title"],
205
            "id": "WILL_BE_SET_AT_PUBLICATION",
206
            "description": title_and_description[table]["title"],
207
            "language": ["de-DE"],
208
            "publicationDate": datetime.date.today().isoformat(),
209
            "context": context(),
210
            "spatial": {
211
                "location": None,
212
                "extent": "Germany",
213
                "resolution": "1:250000",
214
            },
215
            "temporal": {
216
                "referenceDate": "2020-01-01",
217
                "timeseries": {
218
                    "start": None,
219
                    "end": None,
220
                    "resolution": None,
221
                    "alignment": None,
222
                    "aggregationType": None,
223
                },
224
            },
225
            "sources": [vg250_source],
226
            "licenses": licenses,
227
            "contributors": [
228
                {
229
                    "title": "Guido Pleßmann",
230
                    "email": "http://github.com/gplssm",
231
                    "date": time.strftime("%Y-%m-%d"),
232
                    "object": None,
233
                    "comment": "Imported data",
234
                },
235
                {
236
                    "title": "Jonathan Amme",
237
                    "email": "http://github.com/nesnoj",
238
                    "date": time.strftime("%Y-%m-%d"),
239
                    "object": None,
240
                    "comment": "Metadata extended",
241
                },
242
            ],
243
            "resources": [
244
                {
245
                    "profile": "tabular-data-resource",
246
                    "name": schema_table,
247
                    "path": None,
248
                    "format": "PostgreSQL",
249
                    "encoding": "UTF-8",
250
                    "schema": {
251
                        "fields": vg250_metadata_resources_fields(),
252
                        "primaryKey": ["id"],
253
                        "foreignKeys": [],
254
                    },
255
                    "dialect": {"delimiter": None, "decimalSeparator": "."},
256
                }
257
            ],
258
            "metaMetadata": meta_metadata(),
259
        }
260
261
        meta_json = "'" + json.dumps(meta) + "'"
262
263
        db.submit_comment(
264
            meta_json, vg250_config["processed"]["schema"], table
265
        )
266
267
268
def nuts_mview():
269
    """
270
    Creates MView boundaries.vg250_lan_nuts_id.
271
272
    """
273
    db.execute_sql_script(
274
        os.path.join(os.path.dirname(__file__), "vg250_lan_nuts_id_mview.sql")
275
    )
276
277
278
def cleaning_and_preperation():
279
    """
280
    Creates tables and MViews with cleaned and corrected geometry data.
281
282
    The following table is created:
283
      * boundaries.vg250_gem_clean where municipalities (Gemeinden) that are fragmented
284
        are cleaned from ringholes
285
286
    The following MViews are created:
287
      * boundaries.vg250_gem_hole
288
      * boundaries.vg250_gem_valid
289
      * boundaries.vg250_krs_area
290
      * boundaries.vg250_lan_union
291
      * boundaries.vg250_sta_bbox
292
      * boundaries.vg250_sta_invalid_geometry
293
      * boundaries.vg250_sta_tiny_buffer
294
      * boundaries.vg250_sta_union
295
296
    """
297
298
    db.execute_sql_script(
299
        os.path.join(os.path.dirname(__file__), "cleaning_and_preparation.sql")
300
    )
301
302
303
def vg250_metadata_resources_fields():
304
    """
305
    Returns metadata string for VG250 tables.
306
307
    """
308
309
    return [
310
        {
311
            "description": "Index",
312
            "name": "id",
313
            "type": "integer",
314
            "unit": "none",
315
        },
316
        {
317
            "description": "Administrative level",
318
            "name": "ade",
319
            "type": "integer",
320
            "unit": "none",
321
        },
322
        {
323
            "description": "Geofactor",
324
            "name": "gf",
325
            "type": "integer",
326
            "unit": "none",
327
        },
328
        {
329
            "description": "Particular areas",
330
            "name": "bsg",
331
            "type": "integer",
332
            "unit": "none",
333
        },
334
        {
335
            "description": "Territorial code",
336
            "name": "ars",
337
            "type": "string",
338
            "unit": "none",
339
        },
340
        {
341
            "description": "Official Municipality Key",
342
            "name": "ags",
343
            "type": "string",
344
            "unit": "none",
345
        },
346
        {
347
            "description": "Seat of the administration (territorial code)",
348
            "name": "sdv_ars",
349
            "type": "string",
350
            "unit": "none",
351
        },
352
        {
353
            "description": "Geographical name",
354
            "name": "gen",
355
            "type": "string",
356
            "unit": "none",
357
        },
358
        {
359
            "description": "Designation of the administrative unit",
360
            "name": "bez",
361
            "type": "string",
362
            "unit": "none",
363
        },
364
        {
365
            "description": "Identifier",
366
            "name": "ibz",
367
            "type": "integer",
368
            "unit": "none",
369
        },
370
        {
371
            "description": "Note",
372
            "name": "bem",
373
            "type": "string",
374
            "unit": "none",
375
        },
376
        {
377
            "description": "Name generation",
378
            "name": "nbd",
379
            "type": "string",
380
            "unit": "none",
381
        },
382
        {
383
            "description": "Land (state)",
384
            "name": "sn_l",
385
            "type": "string",
386
            "unit": "none",
387
        },
388
        {
389
            "description": "Administrative district",
390
            "name": "sn_r",
391
            "type": "string",
392
            "unit": "none",
393
        },
394
        {
395
            "description": "District",
396
            "name": "sn_k",
397
            "type": "string",
398
            "unit": "none",
399
        },
400
        {
401
            "description": "Administrative association – front part",
402
            "name": "sn_v1",
403
            "type": "string",
404
            "unit": "none",
405
        },
406
        {
407
            "description": "Administrative association – rear part",
408
            "name": "sn_v2",
409
            "type": "string",
410
            "unit": "none",
411
        },
412
        {
413
            "description": "Municipality",
414
            "name": "sn_g",
415
            "type": "string",
416
            "unit": "none",
417
        },
418
        {
419
            "description": "Function of the 3rd key digit",
420
            "name": "fk_s3",
421
            "type": "string",
422
            "unit": "none",
423
        },
424
        {
425
            "description": "European statistics key",
426
            "name": "nuts",
427
            "type": "string",
428
            "unit": "none",
429
        },
430
        {
431
            "description": "Filled territorial code",
432
            "name": "ars_0",
433
            "type": "string",
434
            "unit": "none",
435
        },
436
        {
437
            "description": "Filled Official Municipality Key",
438
            "name": "ags_0",
439
            "type": "string",
440
            "unit": "none",
441
        },
442
        {
443
            "description": "Effectiveness",
444
            "name": "wsk",
445
            "type": "string",
446
            "unit": "none",
447
        },
448
        {
449
            "description": "DLM identifier",
450
            "name": "debkg_id",
451
            "type": "string",
452
            "unit": "none",
453
        },
454
        {
455
            "description": "Territorial code (deprecated column)",
456
            "name": "rs",
457
            "type": "string",
458
            "unit": "none",
459
        },
460
        {
461
            "description": "Seat of the administration (territorial code, deprecated column)",
462
            "name": "sdv_rs",
463
            "type": "string",
464
            "unit": "none",
465
        },
466
        {
467
            "description": "Filled territorial code (deprecated column)",
468
            "name": "rs_0",
469
            "type": "string",
470
            "unit": "none",
471
        },
472
        {
473
            "description": "Geometry of areas as WKB",
474
            "name": "geometry",
475
            "type": "Geometry(Polygon, srid=4326)",
476
            "unit": "none",
477
        },
478
    ]
479
480
481
class Vg250(Dataset):
482
    """
483
    Obtains and processes VG250 data and writes it to database.
484
485
    Original data is downloaded using :py:func:`download_files` function and written
486
    to database using :py:func:`to_postgres` function.
487
488
    *Dependencies*
489
      No dependencies
490
491
    *Resulting tables*
492
      * :py:func:`boundaries.vg250_gem <to_postgres>` is created and filled
493
      * :py:func:`boundaries.vg250_krs <to_postgres>` is created and filled
494
      * :py:func:`boundaries.vg250_lan <to_postgres>` is created and filled
495
      * :py:func:`boundaries.vg250_rbz <to_postgres>` is created and filled
496
      * :py:func:`boundaries.vg250_sta <to_postgres>` is created and filled
497
      * :py:func:`boundaries.vg250_vwg <to_postgres>` is created and filled
498
      * :py:func:`boundaries.vg250_lan_nuts_id <nuts_mview>` is created and filled
499
      * :py:func:`boundaries.vg250_gem_hole <cleaning_and_preperation>` is created and
500
        filled
501
      * :py:func:`boundaries.vg250_gem_valid <cleaning_and_preperation>` is created and
502
        filled
503
      * :py:func:`boundaries.vg250_krs_area <cleaning_and_preperation>` is created and
504
        filled
505
      * :py:func:`boundaries.vg250_lan_union <cleaning_and_preperation>` is created and
506
        filled
507
      * :py:func:`boundaries.vg250_sta_bbox <cleaning_and_preperation>` is created and
508
        filled
509
      * :py:func:`boundaries.vg250_sta_invalid_geometry <cleaning_and_preperation>` is
510
        created and filled
511
      * :py:func:`boundaries.vg250_sta_tiny_buffer <cleaning_and_preperation>` is
512
        created and filled
513
      * :py:func:`boundaries.vg250_sta_union <cleaning_and_preperation>` is
514
        created and filled
515
516
    """
517
518
    filename = egon.data.config.datasets()["vg250"]["original_data"]["source"][
519
        "url"
520
    ]
521
522
    #:
523
    name: str = "VG250"
524
    #:
525
    version: str = filename + "-0.0.4.dev"
526
527
    def __init__(self, dependencies):
528
        super().__init__(
529
            name=self.name,
530
            version=self.version,
531
            dependencies=dependencies,
532
            tasks=(
533
                download_files,
534
                to_postgres,
535
                nuts_mview,
536
                add_metadata,
537
                cleaning_and_preperation,
538
            ),
539
            validation={
540
                "data_quality": [
541
                    RowCountValidation(
542
                        table="boundaries.vg250_krs",
543
                        rule_id="TEST_ROW_COUNT",
544
                        expected_count=27
545
                    ),
546
                    DataTypeValidation(
547
                        table="boundaries.vg250_krs",
548
                        rule_id="TEST_DATA_MULTIPLE_TYPES",
549
                        column_types={"id":"bigint","ade":"bigint", "gf":"bigint", "bsg":"bigint","ars":"text",
550
                                      "ags":"text", "sdv_ars":"text", "gen":"text", "bez":"text","ibz":"bigint",
551
                                      "bem":"text", "nbd":"text", "sn_l":"text", "sn_r":"text", "sn_k":"text",
552
                                      "sn_v1":"text", "sn_v2":"text", "sn_g":"text", "fk_s3":"text", "nuts":"text",
553
                                      "ars_0":"text", "ags_0":"text", "wsk":"text", "debkg_id":"text", "rs":"text",
554
                                      "sdv_rs":"text", "rs_0":"text", "geometry":"geometry"}
555
                    ),
556
                    NotNullAndNotNaNValidation(
557
                        table="boundaries.vg250_krs",
558
                        rule_id="TEST_NOT_NAN",
559
                        columns=["gf","bsg"]
560
                    ),
561
                    WholeTableNotNullAndNotNaNValidation(
562
                        table="boundaries.vg250_krs",
563
                        rule_id="TEST_WHOLE_TABLE_NOT_NAN"
564
                    ),
565
                    ValueSetValidation(
566
                        table="boundaries.vg250_krs",
567
                        rule_id="TEST_VALUE_SET",
568
                        column="nbd",
569
                        expected_values=["ja", "nein"]
570
                    )
571
                ]
572
            },
573
            validation_on_failure="continue"
574
        )
575