Passed
Pull Request — dev (#31)
by
unknown
54s
created

data.db.submit_comment()   A

Complexity

Conditions 1

Size

Total Lines 24
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 24
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
import os
2
3
from sqlalchemy import create_engine, text
4
import yaml
5
6
import egon
7
8
9
def credentials():
10
    """
11
    Return local database connection parameters.
12
13
    Returns
14
    -------
15
    dict
16
        Complete DB connection information
17
    """
18
    # Read database configuration from docker-compose.yml
19
    package_path = egon.data.__path__[0]
20
    docker_compose_file = os.path.join(
21
        package_path, "airflow", "docker-compose.yml"
22
    )
23
    docker_compose = yaml.load(
24
        open(docker_compose_file), Loader=yaml.SafeLoader
25
    )
26
27
    # Select basic connection details
28
    docker_db_config = docker_compose["services"]["egon-data-local-database"][
29
        "environment"
30
    ]
31
32
    # Add HOST and PORT
33
    docker_db_config_additional = docker_compose["services"][
34
        "egon-data-local-database"
35
    ]["ports"][0].split(":")
36
    docker_db_config["HOST"] = docker_db_config_additional[0]
37
    docker_db_config["PORT"] = docker_db_config_additional[1]
38
39
    return docker_db_config
40
41
42
def execute_sql(sql_string):
43
    """
44
    Execute a SQL expression given as string.
45
46
    The SQL expression passed as plain string is convert to a
47
    `sqlalchemy.sql.expression.TextClause`.
48
49
    Parameters
50
    ----------
51
    sql_string : str
52
        SQL expression
53
54
    """
55
    db_config = credentials()
56
57
    engine_local = create_engine(
58
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
59
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
60
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
61
        echo=False,
62
    )
63
64
    with engine_local.connect().execution_options(autocommit=True) as con:
65
        con.execute(text(sql_string))
66
67
68
def submit_comment(json, schema, table):
69
    """
70
    Add comment to table.
71
72
    Parameters
73
    ----------
74
    json : str
75
        JSON string reflecting comment
76
    schema : str
77
        Desired database schema
78
    table : str
79
        Desired database table
80
    """
81
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
82
83
    check_json_str = (
84
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
85
            schema, table
86
        )
87
    )
88
89
    execute_sql(prefix_str + json + ";")
90
91
    execute_sql(check_json_str)
92