Passed
Pull Request — dev (#1344)
by
unknown
02:05
created

OEMetadataResourceBuilder.save_as_table_comment()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 13
rs 10
c 0
b 0
f 0
cc 2
nop 4
1
from __future__ import annotations
2
3
from copy import deepcopy
4
from dataclasses import dataclass
5
from typing import Any, Dict, List, Optional
6
import datetime as dt
7
import json
8
9
from geoalchemy2 import Geometry
10
from omi.base import MetadataSpecification, get_metadata_specification
11
from omi.validation import validate_metadata  # parse_metadata
12
from sqlalchemy import MetaData, Table, inspect
13
from sqlalchemy.dialects.postgresql.base import ischema_names
14
from sqlalchemy.engine import Engine
15
import yaml  # PyYAML
16
17
# ---- Optional: your project settings/hooks
18
# from egon.data import db, logger
19
from egon.data.metadata import settings
20
21
# Geometry awareness for reflection
22
ischema_names["geometry"] = Geometry  # generic
23
# You can add specific geometry columns later per-table via kwargs
24
25
26
def _today() -> str:
27
    return dt.date.today().isoformat()
28
29
30
def _deep_merge(base: dict, override: dict) -> dict:
31
    """
32
    Deep merge with 'override wins', recursively.
33
    Lists are replaced (not merged) by default to avoid subtle duplication.
34
    """
35
    out = deepcopy(base)
36
    for k, v in override.items():
37
        if isinstance(v, dict) and isinstance(out.get(k), dict):
38
            out[k] = _deep_merge(out[k], v)
39
        else:
40
            out[k] = deepcopy(v)
41
    return out
42
43
44
def _sqlatype_to_oem_type(sa_type: str) -> str:
45
    """
46
    Map SQLAlchemy reflected type string -> OEM v2 field.type
47
    Keep it simple and deterministic; adjust as needed.
48
    """
49
    t = sa_type.lower()
50
    # geometry
51
    if "geometry" in t:
52
        return "geometry"
53
    # integers
54
    if any(x in t for x in ["int", "serial", "bigint", "smallint"]):
55
        return "integer"
56
    # floats / numeric
57
    if any(x in t for x in ["float", "double", "numeric", "real", "decimal"]):
58
        return "number"
59
    # booleans
60
    if "bool" in t:
61
        return "boolean"
62
    # timestamp/date/time
63
    if "timestamp" in t or "timestamptz" in t:
64
        return "datetime"
65
    if t == "date":
66
        return "date"
67
    if t == "time":
68
        return "time"
69
    # text-ish
70
    if any(
71
        x in t for x in ["text", "char", "string", "uuid", "json", "jsonb"]
72
    ):
73
        return "string"
74
    # fallback
75
    return "string"
76
77
78
@dataclass
79
class ResourceField:
80
    """
81
    Minimal implementation of oemetadata v2 resource structure.
82
    Eases usage in Python.
83
    """
84
85
    name: str
86
    description: Optional[str] = None
87
    type: str = "string"
88
    unit: Optional[str] = None
89
    nullable: Optional[bool] = None
90
91
    def to_dict(self) -> dict:
92
        d = {
93
            "name": self.name,
94
            "type": self.type,
95
        }
96
        # include optional keys only when provided
97
        if self.description is not None:
98
            d["description"] = self.description
99
        if self.unit is not None:
100
            d["unit"] = self.unit
101
        if self.nullable is not None:
102
            d["nullable"] = self.nullable
103
        return d
104
105
106
class OEMetadataResourceBuilder:
107
    """
108
    Single, reusable builder for OEP oemetadata v2 using omi as source of truth.
109
110
    Typical flow:
111
      builder = OEMetadataBuilder().from_template()
112
                                   .apply_yaml("dataset_meta.yaml")
113
                                   .auto_resource_from_table(engine, "schema", "table", geom_cols=["geom"])
114
                                   .set_basic(name="schema.table", title="...", description="...")
115
                                   .finalize()
116
      payload = builder.as_json()  # validated JSON string
117
      builder.save_as_table_comment(db_engine, "schema", "table")  # optional
118
    """  # noqa: E501
119
120
    def __init__(self, version: str = settings.OEMETADATA_VERSION) -> None:
121
        self.spec: MetadataSpecification = get_metadata_specification(version)
122
        self._meta: Dict[str, Any] = {}
123
        self._validated: bool = False
124
125
    # ---- Required steps
126
127
    def from_template(self) -> "OEMetadataResourceBuilder":
128
        """
129
        Start from omi's template plus selected bits from example
130
        (context/metaMetadata).
131
        Ensures keys exist (empty strings/structures as per spec).
132
        """
133
        tpl = deepcopy(self.spec.template) if self.spec.template else {}
134
        if self.spec.example:
135
            # Copy @context + metaMetadata if present in example
136
            if "@context" in self.spec.example:
137
                tpl["@context"] = deepcopy(self.spec.example["@context"])
138
            if "metaMetadata" in self.spec.example:
139
                tpl["metaMetadata"] = deepcopy(
140
                    self.spec.example["metaMetadata"]
141
                )
142
        self._meta = tpl["resources"][0]
143
        self._validated = False
144
        return self
145
146
    def apply_yaml(
147
        self, yaml_path: str | None = None, yaml_text: str | None = None
148
    ) -> "OEMetadataResourceBuilder":
149
        """
150
        Merge user-provided YAML overrides into the current metadata object.
151
        You can allow either a file path or a YAML string (for testing).
152
        """
153
        if yaml_path:
154
            with open(yaml_path, "r", encoding="utf-8") as fh:
155
                override = yaml.safe_load(fh) or {}
156
        elif yaml_text:
157
            override = yaml.safe_load(yaml_text) or {}
158
        else:
159
            override = {}
160
161
        self._meta = _deep_merge(self._meta, override)
162
        self._validated = False
163
        return self
164
165
    def set_basic(
166
        self,
167
        name: str,
168
        title: Optional[str] = None,
169
        description: Optional[str] = None,
170
        language: Optional[List[str]] = None,
171
        publication_date: Optional[str] = None,
172
        dataset_id: Optional[str] = None,
173
    ) -> "OEMetadataResourceBuilder":
174
        """
175
        Convenience setter for common top-level fields.
176
        """
177
        if publication_date is None:
178
            publication_date = _today()
179
        patch = {
180
            "name": name,
181
            "publicationDate": publication_date,
182
        }
183
        if title is not None:
184
            patch["title"] = title
185
        if description is not None:
186
            patch["description"] = description
187
        if language is not None:
188
            patch["language"] = language
189
        if dataset_id is not None:
190
            patch["id"] = dataset_id
191
192
        self._meta = _deep_merge(self._meta, patch)
193
        self._validated = False
194
        return self
195
196
    def set_context(self, context_obj: dict) -> "OEMetadataResourceBuilder":
197
        self._meta = _deep_merge(self._meta, {"context": context_obj})
198
        self._validated = False
199
        return self
200
201
    def set_spatial(
202
        self,
203
        extent: Optional[str] = None,
204
        resolution: Optional[str] = None,
205
        location: Optional[Any] = None,
206
    ) -> "OEMetadataResourceBuilder":
207
        patch = {"spatial": {}}
208
        if location is not None:
209
            patch["spatial"]["location"] = location
210
        if extent is not None:
211
            patch["spatial"]["extent"] = extent
212
        if resolution is not None:
213
            patch["spatial"]["resolution"] = resolution
214
        self._meta = _deep_merge(self._meta, patch)
215
        self._validated = False
216
        return self
217
218
    def set_temporal(
219
        self,
220
        reference_date: Optional[str] = None,
221
        timeseries: Optional[dict] = None,
222
    ) -> "OEMetadataResourceBuilder":
223
        patch = {"temporal": {}}
224
        if reference_date is not None:
225
            # NOTE: your older code used 'referenceDate' vs
226
            # 'reference_date' in places.
227
            # OEM v2 uses 'referenceDate' (camelCase). Keep consistent here:
228
            patch["temporal"]["referenceDate"] = reference_date
229
        if timeseries is not None:
230
            patch["temporal"]["timeseries"] = timeseries
231
        self._meta = _deep_merge(self._meta, patch)
232
        self._validated = False
233
        return self
234
235
    # ---- Sources, licenses, contributors
236
237
    def add_source(self, source: dict) -> "OEMetadataResourceBuilder":
238
        self._meta.setdefault("sources", [])
239
        self._meta["sources"].append(source)
240
        self._validated = False
241
        return self
242
243
    def add_license(self, lic: dict) -> "OEMetadataResourceBuilder":
244
        self._meta.setdefault("licenses", [])
245
        self._meta["licenses"].append(lic)
246
        self._validated = False
247
        return self
248
249
    def add_contributor(
250
        self, contributor: dict
251
    ) -> "OEMetadataResourceBuilder":
252
        self._meta.setdefault("contributors", [])
253
        self._meta["contributors"].append(contributor)
254
        self._validated = False
255
        return self
256
257
    # ---- Resources
258
259
    def auto_resource_from_table(
260
        self,
261
        engine: Engine,
262
        schema: str,
263
        table: str,
264
        *,
265
        resource_name: Optional[str] = None,
266
        format_: str = "PostgreSQL",
267
        encoding: str = "UTF-8",
268
        primary_key: Optional[List[str]] = None,
269
        foreign_keys: Optional[List[dict]] = None,
270
        geom_cols: Optional[List[str]] = None,
271
        dialect: Optional[dict] = None,
272
        overwrite_existing: bool = False,
273
    ) -> "OEMetadataResourceBuilder":
274
        """
275
        Introspect a DB table and create a single tabular data resource entry.
276
277
        - Maps SQLA types to OEM types
278
        - Marks 'nullable' where possible
279
        - Recognizes geometry columns (if given in geom_cols) as 'geometry'
280
281
        If overwrite_existing=False and a resource already exists with the same
282
        name, it will be left as-is (you could add a flag to update instead).
283
        """
284
        if geom_cols is None:
285
            geom_cols = ["geom", "geometry", "geom_point", "geom_polygon"]
286
287
        # reflect
288
        meta = MetaData()
289
        tbl = Table(table, meta, schema=schema, autoload_with=engine)
290
291
        fields: List[ResourceField] = []
292
        for col in tbl.columns:
293
            sa_t = str(col.type)
294
            # if explicitly geometry by name, treat as geometry
295
            col_type = (
296
                "geometry"
297
                if col.name in geom_cols
298
                else _sqlatype_to_oem_type(sa_t)
299
            )
300
            fields.append(
301
                ResourceField(
302
                    name=col.name,
303
                    description=None,
304
                    type=col_type,
305
                    unit=None,
306
                    nullable=col.nullable,
307
                )
308
            )
309
310
        if not resource_name:
311
            resource_name = f"{schema}.{table}"
312
313
        resource = {
314
            "name": resource_name,
315
            # TODO: @jh-RLI The OEP will set this,
316
            # consider if local usage is important
317
            "path": None,
318
            "type": "table",
319
            "format": format_,
320
            "encoding": encoding,
321
            "schema": {
322
                "fields": [f.to_dict() for f in fields],
323
                "primaryKey": primary_key
324
                or self._best_guess_pk(engine, schema, table),
325
                "foreignKeys": foreign_keys or [],
326
            },
327
            "dialect": dialect or {"delimiter": None, "decimalSeparator": "."},
328
        }
329
330
        # install resources array
331
        self._meta.setdefault("resources", [])
332
        if overwrite_existing:
333
            self._meta["resources"] = [
334
                r
335
                for r in self._meta["resources"]
336
                if r.get("name") != resource_name
337
            ]
338
        # only add if not present
339
        if not any(
340
            r.get("name") == resource_name for r in self._meta["resources"]
341
        ):
342
            self._meta["resources"].append(resource)
343
344
        self._validated = False
345
        return self
346
347
    def _best_guess_pk(
348
        self, engine: Engine, schema: str, table: str
349
    ) -> List[str]:
350
        """
351
        Try to read PK columns via SQLAlchemy inspector, fallback to
352
        ['id'] if found, else [].
353
        """
354
        insp = inspect(engine)
355
        pk = insp.get_pk_constraint(table, schema=schema)
356
        cols = pk.get("constrained_columns") if pk else None
357
        if cols:
358
            return cols
359
        # common fallback
360
        columns = [c["name"] for c in insp.get_columns(table, schema=schema)]
361
        return ["id"] if "id" in columns else []
362
363
    # ---- Finalize/validate/serialize
364
365
    def finalize(
366
        self, license_check: bool = False
367
    ) -> "OEMetadataResourceBuilder":
368
        """
369
        Make minimal guarantees & validate with omi.
370
        """
371
        # Fill sane defaults if missing
372
        # self._meta.setdefault("publicationDate", _today())
373
        self._meta.setdefault("language", ["en-EN"])
374
375
        # TODO: @jh-RLI might be expensive
376
        # parse + validate with omi
377
        # parse_metadata expects string; serialize & round-trip to normalize
378
        # text = json.dumps(self._meta, ensure_ascii=False)
379
        # parsed = parse_metadata(text)
380
381
        # You can toggle license checks if you are mid-migration:
382
        validate_metadata(self._meta, check_license=license_check)
383
384
        # Reassign parsed (it may normalize the structure)
385
        # self._meta = parsed
386
        self._validated = True
387
        return self
388
389
    def as_dict(self) -> dict:
390
        if not self._validated:
391
            self.finalize()
392
        return deepcopy(self._meta)
393
394
    def as_json(self) -> str:
395
        return json.dumps(self.as_dict(), ensure_ascii=False)
396
397
    # ---- Optional convenience: store as comment on a table
398
399
    def save_as_table_comment(
400
        self, engine: Engine, schema: str, table: str
401
    ) -> None:
402
        """
403
        Store metadata JSON as a COMMENT ON TABLE ... (PostgreSQL).
404
        """
405
        payload = self.as_json().replace(
406
            "'", "''"
407
        )  # escape single-quotes for SQL literal
408
        full = f"{schema}.{table}"
409
        sql = f"COMMENT ON TABLE {full} IS '{payload}';"
410
        with engine.begin() as conn:
411
            conn.exec_driver_sql(sql)
412