Passed
Pull Request — dev (#31)
by Stephan
03:17
created

data.db   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 89
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 4
eloc 33
dl 0
loc 89
rs 10
c 0
b 0
f 0

3 Functions

Rating   Name   Duplication   Size   Complexity  
A execute_sql() 0 23 2
A submit_comment() 0 23 1
A credentials() 0 30 1
1
import os
2
3
from sqlalchemy import create_engine, text
4
import yaml
5
6
import egon
7
8
9
def credentials():
10
    """Return local database connection parameters.
11
12
    Returns
13
    -------
14
    dict
15
        Complete DB connection information
16
    """
17
    # Read database configuration from docker-compose.yml
18
    package_path = egon.data.__path__[0]
19
    docker_compose_file = os.path.join(
20
        package_path, "airflow", "docker-compose.yml"
21
    )
22
    docker_compose = yaml.load(
23
        open(docker_compose_file), Loader=yaml.SafeLoader
24
    )
25
26
    # Select basic connection details
27
    docker_db_config = docker_compose["services"]["egon-data-local-database"][
28
        "environment"
29
    ]
30
31
    # Add HOST and PORT
32
    docker_db_config_additional = docker_compose["services"][
33
        "egon-data-local-database"
34
    ]["ports"][0].split(":")
35
    docker_db_config["HOST"] = docker_db_config_additional[0]
36
    docker_db_config["PORT"] = docker_db_config_additional[1]
37
38
    return docker_db_config
39
40
41
def execute_sql(sql_string):
42
    """Execute a SQL expression given as string.
43
44
    The SQL expression passed as plain string is convert to a
45
    `sqlalchemy.sql.expression.TextClause`.
46
47
    Parameters
48
    ----------
49
    sql_string : str
50
        SQL expression
51
52
    """
53
    db_config = credentials()
54
55
    engine_local = create_engine(
56
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
57
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
58
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
59
        echo=False,
60
    )
61
62
    with engine_local.connect().execution_options(autocommit=True) as con:
63
        con.execute(text(sql_string))
64
65
66
def submit_comment(json, schema, table):
67
    """Add comment to table.
68
69
    Parameters
70
    ----------
71
    json : str
72
        JSON string reflecting comment
73
    schema : str
74
        The target table's database schema
75
    table : str
76
        Database table on which to put the given comment
77
    """
78
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
79
80
    check_json_str = (
81
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
82
            schema, table
83
        )
84
    )
85
86
    execute_sql(prefix_str + json + ";")
87
88
    execute_sql(check_json_str)
89