Passed
Pull Request — dev (#31)
by Stephan
01:10
created

data.db   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 96
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 33
dl 0
loc 96
rs 10
c 0
b 0
f 0
wmc 4

3 Functions

Rating   Name   Duplication   Size   Complexity  
A execute_sql() 0 23 2
A credentials() 0 30 1
A submit_comment() 0 30 1
1
import os
2
3
from sqlalchemy import create_engine, text
4
import yaml
5
6
import egon
7
8
9
def credentials():
10
    """Return local database connection parameters.
11
12
    Returns
13
    -------
14
    dict
15
        Complete DB connection information
16
    """
17
    # Read database configuration from docker-compose.yml
18
    package_path = egon.data.__path__[0]
19
    docker_compose_file = os.path.join(
20
        package_path, "airflow", "docker-compose.yml"
21
    )
22
    docker_compose = yaml.load(
23
        open(docker_compose_file), Loader=yaml.SafeLoader
24
    )
25
26
    # Select basic connection details
27
    docker_db_config = docker_compose["services"]["egon-data-local-database"][
28
        "environment"
29
    ]
30
31
    # Add HOST and PORT
32
    docker_db_config_additional = docker_compose["services"][
33
        "egon-data-local-database"
34
    ]["ports"][0].split(":")
35
    docker_db_config["HOST"] = docker_db_config_additional[0]
36
    docker_db_config["PORT"] = docker_db_config_additional[1]
37
38
    return docker_db_config
39
40
41
def execute_sql(sql_string):
42
    """Execute a SQL expression given as string.
43
44
    The SQL expression passed as plain string is convert to a
45
    `sqlalchemy.sql.expression.TextClause`.
46
47
    Parameters
48
    ----------
49
    sql_string : str
50
        SQL expression
51
52
    """
53
    db_config = credentials()
54
55
    engine_local = create_engine(
56
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
57
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
58
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
59
        echo=False,
60
    )
61
62
    with engine_local.connect().execution_options(autocommit=True) as con:
63
        con.execute(text(sql_string))
64
65
66
def submit_comment(json, schema, table):
67
    """Add comment to table.
68
69
    We use `Open Energy Metadata <https://github.com/OpenEnergyPlatform/
70
    oemetadata/blob/develop/metadata/v140/metadata_key_description.md>`_
71
    standard for describging our data. Metadata is stored as JSON in the table
72
    comment.
73
74
    Parameters
75
    ----------
76
    json : str
77
        JSON string reflecting comment
78
    schema : str
79
        The target table's database schema
80
    table : str
81
        Database table on which to put the given comment
82
    """
83
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
84
85
    check_json_str = (
86
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
87
            schema, table
88
        )
89
    )
90
91
    execute_sql(prefix_str + json + ";")
92
93
    # Query table comment and cast it into JSON
94
    # The query throws an error if JSON is invalid
95
    execute_sql(check_json_str)
96