1
|
|
|
import os |
2
|
|
|
|
3
|
|
|
from sqlalchemy import create_engine, text |
4
|
|
|
import yaml |
5
|
|
|
|
6
|
|
|
import egon |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
def credentials(): |
10
|
|
|
"""Return local database connection parameters. |
11
|
|
|
|
12
|
|
|
Returns |
13
|
|
|
------- |
14
|
|
|
dict |
15
|
|
|
Complete DB connection information |
16
|
|
|
""" |
17
|
|
|
# Read database configuration from docker-compose.yml |
18
|
|
|
package_path = egon.data.__path__[0] |
19
|
|
|
docker_compose_file = os.path.join( |
20
|
|
|
package_path, "airflow", "docker-compose.yml" |
21
|
|
|
) |
22
|
|
|
docker_compose = yaml.load( |
23
|
|
|
open(docker_compose_file), Loader=yaml.SafeLoader |
24
|
|
|
) |
25
|
|
|
|
26
|
|
|
# Select basic connection details |
27
|
|
|
docker_db_config = docker_compose["services"]["egon-data-local-database"][ |
28
|
|
|
"environment" |
29
|
|
|
] |
30
|
|
|
|
31
|
|
|
# Add HOST and PORT |
32
|
|
|
docker_db_config_additional = docker_compose["services"][ |
33
|
|
|
"egon-data-local-database" |
34
|
|
|
]["ports"][0].split(":") |
35
|
|
|
docker_db_config["HOST"] = docker_db_config_additional[0] |
36
|
|
|
docker_db_config["PORT"] = docker_db_config_additional[1] |
37
|
|
|
|
38
|
|
|
return docker_db_config |
39
|
|
|
|
40
|
|
|
|
41
|
|
|
def execute_sql(sql_string): |
42
|
|
|
"""Execute a SQL expression given as string. |
43
|
|
|
|
44
|
|
|
The SQL expression passed as plain string is convert to a |
45
|
|
|
`sqlalchemy.sql.expression.TextClause`. |
46
|
|
|
|
47
|
|
|
Parameters |
48
|
|
|
---------- |
49
|
|
|
sql_string : str |
50
|
|
|
SQL expression |
51
|
|
|
|
52
|
|
|
""" |
53
|
|
|
db_config = credentials() |
54
|
|
|
|
55
|
|
|
engine_local = create_engine( |
56
|
|
|
f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:" |
57
|
|
|
f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:" |
58
|
|
|
f"{db_config['PORT']}/{db_config['POSTGRES_DB']}", |
59
|
|
|
echo=False, |
60
|
|
|
) |
61
|
|
|
|
62
|
|
|
with engine_local.connect().execution_options(autocommit=True) as con: |
63
|
|
|
con.execute(text(sql_string)) |
64
|
|
|
|
65
|
|
|
|
66
|
|
|
def submit_comment(json, schema, table): |
67
|
|
|
"""Add comment to table. |
68
|
|
|
|
69
|
|
|
Parameters |
70
|
|
|
---------- |
71
|
|
|
json : str |
72
|
|
|
JSON string reflecting comment |
73
|
|
|
schema : str |
74
|
|
|
The target table's database schema |
75
|
|
|
table : str |
76
|
|
|
Database table on which to put the given comment |
77
|
|
|
""" |
78
|
|
|
prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table) |
79
|
|
|
|
80
|
|
|
check_json_str = ( |
81
|
|
|
"SELECT obj_description('{0}.{1}'::regclass)::json".format( |
82
|
|
|
schema, table |
83
|
|
|
) |
84
|
|
|
) |
85
|
|
|
|
86
|
|
|
execute_sql(prefix_str + json + ";") |
87
|
|
|
|
88
|
|
|
execute_sql(check_json_str) |
89
|
|
|
|