Passed
Pull Request — dev (#1020)
by
unknown
01:26
created

data.datasets.loadarea.create_landuse_table()   A

Complexity

Conditions 1

Size

Total Lines 19
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 0
dl 0
loc 19
rs 10
c 0
b 0
f 0
1
"""The central module containing all code to create tables for osm landuse
2
extraction.
3
"""
4
5
import os
6
7
from airflow.operators.postgres_operator import PostgresOperator
8
from geoalchemy2.types import Geometry
9
from sqlalchemy import Column, Float, Integer, String
10
from sqlalchemy.dialects.postgresql import HSTORE
11
from sqlalchemy.ext.declarative import declarative_base
12
import importlib_resources as resources
13
14
from egon.data import db
15
from egon.data.datasets import Dataset
16
import egon.data.config
17
18
# will be later imported from another file ###
19
Base = declarative_base()
20
21
22
class OsmPolygonUrban(Base):
23
    __tablename__ = "osm_landuse"
24
    __table_args__ = {"schema": "openstreetmap"}
25
    id = Column(Integer, primary_key=True)
26
    osm_id = Column(Integer)
27
    name = Column(String)
28
    sector = Column(Integer)
29
    sector_name = Column(String(20))
30
    area_ha = Column(Float)
31
    tags = Column(HSTORE)
32
    vg250 = Column(String(10))
33
    geom = Column(Geometry("MultiPolygon", 3035))
34
35
36
class OsmLanduse(Dataset):
37
    def __init__(self, dependencies):
38
        super().__init__(
39
            name="OsmLanduse",
40
            version="0.0.0",
41
            dependencies=dependencies,
42
            tasks=(
43
                create_landuse_table,
44
                PostgresOperator(
45
                    task_id="osm_landuse_extraction",
46
                    sql=resources.read_text(
47
                        __name__, "osm_landuse_extraction.sql"
48
                    ),
49
                    postgres_conn_id="egon_data",
50
                    autocommit=True,
51
                ),
52
            ),
53
        )
54
55
56
class LoadArea(Dataset):
57
    def __init__(self, dependencies):
58
        super().__init__(
59
            name="LoadArea",
60
            version="0.0.1",
61
            dependencies=dependencies,
62
            tasks=(
63
                osm_landuse_melt,
64
                census_cells_melt,
65
                osm_landuse_census_cells_melt,
66
                loadareas_create,
67
                loadareas_load_data,
68
                drop_temp_tables,
69
            ),
70
        )
71
72
73
def create_landuse_table():
74
    """Create tables for landuse data
75
    Returns
76
    -------
77
    None.
78
    """
79
    cfg = egon.data.config.datasets()["landuse"]["target"]
80
81
    # Create schema if not exists
82
    db.execute_sql(f"""CREATE SCHEMA IF NOT EXISTS {cfg['schema']};""")
83
84
    # Drop tables
85
    db.execute_sql(
86
        f"""DROP TABLE IF EXISTS
87
            {cfg['schema']}.{cfg['table']} CASCADE;"""
88
    )
89
90
    engine = db.engine()
91
    OsmPolygonUrban.__table__.create(bind=engine, checkfirst=True)
92
93
94
def execute_sql_script(script):
95
    """Execute SQL script
96
97
    Parameters
98
    ----------
99
    script : str
100
        Filename of script
101
    """
102
    db.execute_sql_script(os.path.join(os.path.dirname(__file__), script))
103
104
105
def osm_landuse_melt():
106
    """Melt all OSM landuse areas by: buffer, union, unbuffer"""
107
    print("Melting OSM landuse areas from openstreetmap.osm_landuse...")
108
    execute_sql_script("osm_landuse_melt.sql")
109
110
111
def census_cells_melt():
112
    """Melt all census cells: buffer, union, unbuffer"""
113
    print(
114
        "Melting census cells from "
115
        "society.destatis_zensus_population_per_ha_inside_germany..."
116
    )
117
    execute_sql_script("census_cells_melt.sql")
118
119
120
def osm_landuse_census_cells_melt():
121
    """Melt OSM landuse areas and census cells"""
122
    print(
123
        "Melting OSM landuse areas from openstreetmap.osm_landuse_melted and "
124
        "census cells from "
125
        "society.egon_destatis_zensus_cells_melted_cluster..."
126
    )
127
    execute_sql_script("osm_landuse_census_cells_melt.sql")
128
129
130
def loadareas_create():
131
    """Create load areas from merged OSM landuse and census cells:
132
133
    * Cut Loadarea with MV Griddistrict
134
    * Identify and exclude Loadarea smaller than 100m².
135
    * Generate Centre of Loadareas with Centroid and PointOnSurface.
136
    * Calculate population from Census 2011.
137
    * Cut all 4 OSM sectors with MV Griddistricts.
138
    * Calculate statistics like NUTS and AGS code.
139
    * Check for Loadareas without AGS code.
140
    """
141
    print("Create initial load areas and add some sector stats...")
142
    execute_sql_script("loadareas_create.sql")
143
144
145
def loadareas_load_data():
146
    """Adds consumption and peak load per sector to load areas"""
147
    print("Add consumption and peak loads to load areas...")
148
    execute_sql_script("loadareas_load_data.sql")
149
150
151
def drop_temp_tables():
152
    print("Dropping temp tables...")
153
    execute_sql_script("drop_temp_tables.sql")
154