Passed
Pull Request — dev (#1020)
by
unknown
01:56
created

data.datasets.loadarea.loadareas_create()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 13
rs 10
c 0
b 0
f 0
cc 1
nop 0
1
"""
2
OSM landuse extraction and load areas creation.
3
4
**Landuse**
5
6
* Landuse data is extracted from OpenStreetMap: residential, retail,
7
  industrial, Agricultural
8
* Data is cut with German borders (VG 250), data outside is dropped
9
* Invalid geometries are fixed
10
* Results are stored in table `openstreetmap.osm_landuse`
11
12
**Load Areas**
13
14
TBD
15
16
Note: industrial demand contains:
17
* voltage levels 4-7
18
* only demand from ind. sites+osm located in LA!
19
"""
20
21
import os
22
23
from airflow.operators.postgres_operator import PostgresOperator
24
from geoalchemy2.types import Geometry
25
from sqlalchemy import Column, Float, Integer, String
26
from sqlalchemy.dialects.postgresql import HSTORE
27
from sqlalchemy.ext.declarative import declarative_base
28
import importlib_resources as resources
29
30
from egon.data import db
31
from egon.data.datasets import Dataset
32
import egon.data.config
33
34
# will be later imported from another file ###
35
Base = declarative_base()
36
37
38
class OsmPolygonUrban(Base):
39
    __tablename__ = "osm_landuse"
40
    __table_args__ = {"schema": "openstreetmap"}
41
    id = Column(Integer, primary_key=True)
42
    osm_id = Column(Integer)
43
    name = Column(String)
44
    sector = Column(Integer)
45
    sector_name = Column(String(20))
46
    area_ha = Column(Float)
47
    tags = Column(HSTORE)
48
    vg250 = Column(String(10))
49
    geom = Column(Geometry("MultiPolygon", 3035))
50
51
52
class OsmLanduse(Dataset):
53
    def __init__(self, dependencies):
54
        super().__init__(
55
            name="OsmLanduse",
56
            version="0.0.0",
57
            dependencies=dependencies,
58
            tasks=(
59
                create_landuse_table,
60
                PostgresOperator(
61
                    task_id="osm_landuse_extraction",
62
                    sql=resources.read_text(
63
                        __name__, "osm_landuse_extraction.sql"
64
                    ),
65
                    postgres_conn_id="egon_data",
66
                    autocommit=True,
67
                ),
68
            ),
69
        )
70
71
72
class LoadArea(Dataset):
73
    def __init__(self, dependencies):
74
        super().__init__(
75
            name="LoadArea",
76
            version="0.0.1",
77
            dependencies=dependencies,
78
            tasks=(
79
                osm_landuse_melt,
80
                census_cells_melt,
81
                osm_landuse_census_cells_melt,
82
                loadareas_create,
83
                {
84
                    loadareas_add_demand_hh,
85
                    loadareas_add_demand_cts,
86
                    loadareas_add_demand_ind,
87
                },
88
                drop_temp_tables,
89
            ),
90
        )
91
92
93
def create_landuse_table():
94
    """Create tables for landuse data
95
    Returns
96
    -------
97
    None.
98
    """
99
    cfg = egon.data.config.datasets()["landuse"]["target"]
100
101
    # Create schema if not exists
102
    db.execute_sql(f"""CREATE SCHEMA IF NOT EXISTS {cfg['schema']};""")
103
104
    # Drop tables
105
    db.execute_sql(
106
        f"""DROP TABLE IF EXISTS
107
            {cfg['schema']}.{cfg['table']} CASCADE;"""
108
    )
109
110
    engine = db.engine()
111
    OsmPolygonUrban.__table__.create(bind=engine, checkfirst=True)
112
113
114
def execute_sql_script(script):
115
    """Execute SQL script
116
117
    Parameters
118
    ----------
119
    script : str
120
        Filename of script
121
    """
122
    db.execute_sql_script(os.path.join(os.path.dirname(__file__), script))
123
124
125
def osm_landuse_melt():
126
    """Melt all OSM landuse areas by: buffer, union, unbuffer"""
127
    print("Melting OSM landuse areas from openstreetmap.osm_landuse...")
128
    execute_sql_script("osm_landuse_melt.sql")
129
130
131
def census_cells_melt():
132
    """Melt all census cells: buffer, union, unbuffer"""
133
    print(
134
        "Melting census cells from "
135
        "society.destatis_zensus_population_per_ha_inside_germany..."
136
    )
137
    execute_sql_script("census_cells_melt.sql")
138
139
140
def osm_landuse_census_cells_melt():
141
    """Melt OSM landuse areas and census cells"""
142
    print(
143
        "Melting OSM landuse areas from openstreetmap.osm_landuse_melted and "
144
        "census cells from "
145
        "society.egon_destatis_zensus_cells_melted_cluster..."
146
    )
147
    execute_sql_script("osm_landuse_census_cells_melt.sql")
148
149
150
def loadareas_create():
151
    """Create load areas from merged OSM landuse and census cells:
152
153
    * Cut Loadarea with MV Griddistrict
154
    * Identify and exclude Loadarea smaller than 100m².
155
    * Generate Centre of Loadareas with Centroid and PointOnSurface.
156
    * Calculate population from Census 2011.
157
    * Cut all 4 OSM sectors with MV Griddistricts.
158
    * Calculate statistics like NUTS and AGS code.
159
    * Check for Loadareas without AGS code.
160
    """
161
    print("Create initial load areas and add some sector stats...")
162
    execute_sql_script("loadareas_create.sql")
163
164
165
def loadareas_add_demand_hh():
166
    """Adds consumption and peak load to load areas for households"""
167
    print("Add consumption and peak loads to load areas for households...")
168
    execute_sql_script("loadareas_add_demand_hh.sql")
169
170
171
def loadareas_add_demand_cts():
172
    """Adds consumption and peak load to load areas for CTS"""
173
    print("Add consumption and peak loads to load areas for CTS...")
174
    execute_sql_script("loadareas_add_demand_cts.sql")
175
176
177
def loadareas_add_demand_ind():
178
    """Adds consumption and peak load to load areas for industry"""
179
    print("Add consumption and peak loads to load areas for industry...")
180
    execute_sql_script("loadareas_add_demand_ind.sql")
181
182
183
def drop_temp_tables():
184
    print("Dropping temp tables, views and sequences...")
185
    execute_sql_script("drop_temp_tables.sql")
186