Passed
Pull Request — dev (#1344)
by
unknown
02:07
created

OEMetadataResourceBuilder.auto_resource_from_table()   B

Complexity

Conditions 7

Size

Total Lines 87
Code Lines 57

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 57
dl 0
loc 87
rs 7.0072
c 0
b 0
f 0
cc 7
nop 13

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from __future__ import annotations
2
3
from copy import deepcopy
4
from dataclasses import dataclass
5
from typing import Any, Dict, List, Optional
6
import datetime as dt
7
import json
8
9
from geoalchemy2 import Geometry
10
from omi.base import MetadataSpecification, get_metadata_specification
11
from omi.validation import validate_metadata  # parse_metadata
12
from sqlalchemy import MetaData, Table, inspect
13
from sqlalchemy.dialects.postgresql.base import ischema_names
14
from sqlalchemy.engine import Engine
15
import yaml  # PyYAML
16
17
# ---- Optional: your project settings/hooks
18
# from egon.data import db, logger
19
from egon.data.metadata import settings
20
21
# Geometry awareness for reflection
22
ischema_names["geometry"] = Geometry  # generic
23
# You can add specific geometry columns later per-table via kwargs
24
25
26
def _today() -> str:
27
    return dt.date.today().isoformat()
28
29
30
def _deep_merge(base: dict, override: dict) -> dict:
31
    """
32
    Deep merge with 'override wins', recursively.
33
    Lists are replaced (not merged) by default to avoid subtle duplication.
34
    """
35
    out = deepcopy(base)
36
    for k, v in override.items():
37
        if isinstance(v, dict) and isinstance(out.get(k), dict):
38
            out[k] = _deep_merge(out[k], v)
39
        else:
40
            out[k] = deepcopy(v)
41
    return out
42
43
44
def _sqlatype_to_oem_type(sa_type: str) -> str:
45
    """
46
    Map SQLAlchemy reflected type string -> OEM v2 field.type
47
    Keep it simple and deterministic; adjust as needed.
48
    """
49
    t = sa_type.lower()
50
    # geometry
51
    if "geometry" in t:
52
        return "geometry"
53
    # integers
54
    if any(x in t for x in ["int", "serial", "bigint", "smallint"]):
55
        return "integer"
56
    # floats / numeric
57
    if any(x in t for x in ["float", "double", "numeric", "real", "decimal"]):
58
        return "number"
59
    # booleans
60
    if "bool" in t:
61
        return "boolean"
62
    # timestamp/date/time
63
    if "timestamp" in t or "timestamptz" in t:
64
        return "datetime"
65
    if t == "date":
66
        return "date"
67
    if t == "time":
68
        return "time"
69
    # text-ish
70
    if any(
71
        x in t for x in ["text", "char", "string", "uuid", "json", "jsonb"]
72
    ):
73
        return "string"
74
    # fallback
75
    return "string"
76
77
78
@dataclass
79
class ResourceField:
80
    """
81
    Minimal implementation of oemetadata v2 resource structure.
82
    Eases usage in Python.
83
    """
84
85
    name: str
86
    description: Optional[str] = None
87
    type: str = "string"
88
    unit: Optional[str] = None
89
    nullable: Optional[bool] = None
90
91
    def to_dict(self) -> dict:
92
        d = {
93
            "name": self.name,
94
            "type": self.type,
95
        }
96
        # include optional keys only when provided
97
        if self.description is not None:
98
            d["description"] = self.description
99
        if self.unit is not None:
100
            d["unit"] = self.unit
101
        if self.nullable is not None:
102
            d["nullable"] = self.nullable
103
        return d
104
105
106
class OEMetadataResourceBuilder:
107
    """
108
    Single, reusable builder for OEP oemetadata v2 using omi as source of truth.
109
110
    Typical flow:
111
      builder = OEMetadataBuilder().from_template()
112
                                   .apply_yaml("dataset_meta.yaml")
113
                                   .auto_resource_from_table(engine, "schema", "table", geom_cols=["geom"])
114
                                   .set_basic(name="schema.table", title="...", description="...")
115
                                   .finalize()
116
      payload = builder.as_json()  # validated JSON string
117
      builder.save_as_table_comment(db_engine, "schema", "table")  # optional
118
    """  # noqa: E501
119
120
    def __init__(self, version: str = settings.OEMETADATA_VERSION) -> None:
121
        self.spec: MetadataSpecification = get_metadata_specification(version)
122
        self._meta: Dict[str, Any] = {}
123
        self._validated: bool = False
124
125
    # ---- Required steps
126
127
    def from_template(self) -> "OEMetadataResourceBuilder":
128
        """
129
        Start from omi's template plus selected bits from example
130
        (context/metaMetadata).
131
        Ensures keys exist (empty strings/structures as per spec).
132
        """
133
        tpl = deepcopy(self.spec.template) if self.spec.template else {}
134
        if self.spec.example:
135
            # Copy @context + metaMetadata if present in example
136
            if "@context" in self.spec.example:
137
                tpl["@context"] = deepcopy(self.spec.example["@context"])
138
            if "metaMetadata" in self.spec.example:
139
                tpl["metaMetadata"] = deepcopy(
140
                    self.spec.example["metaMetadata"]
141
                )
142
        self._meta = tpl["resources"][0]
143
        self._validated = False
144
        return self
145
146
    def apply_yaml(
147
        self, yaml_path: str | None = None, yaml_text: str | None = None
148
    ) -> "OEMetadataResourceBuilder":
149
        """
150
        Merge user-provided YAML overrides into the current metadata object.
151
        You can allow either a file path or a YAML string (for testing).
152
        """
153
        if yaml_path:
154
            with open(yaml_path, "r", encoding="utf-8") as fh:
155
                override = yaml.safe_load(fh) or {}
156
        elif yaml_text:
157
            override = yaml.safe_load(yaml_text) or {}
158
        else:
159
            override = {}
160
161
        self._meta = _deep_merge(self._meta, override)
162
        self._validated = False
163
        return self
164
165
    def set_basic(
166
        self,
167
        name: str,
168
        title: Optional[str] = None,
169
        description: Optional[str] = None,
170
        language: Optional[List[str]] = None,
171
        publication_date: Optional[str] = None,
172
        dataset_id: Optional[str] = None,
173
    ) -> "OEMetadataResourceBuilder":
174
        """
175
        Convenience setter for common top-level fields.
176
        """
177
        if publication_date is None:
178
            publication_date = _today()
179
        patch = {
180
            "name": name,
181
            "publicationDate": publication_date,
182
        }
183
        if title is not None:
184
            patch["title"] = title
185
        if description is not None:
186
            patch["description"] = description
187
        if language is not None:
188
            patch["language"] = language
189
        if dataset_id is not None:
190
            patch["id"] = dataset_id
191
192
        self._meta = _deep_merge(self._meta, patch)
193
        self._validated = False
194
        return self
195
196
    def set_context(self, context_obj: dict) -> "OEMetadataResourceBuilder":
197
        self._meta = _deep_merge(self._meta, {"context": context_obj})
198
        self._validated = False
199
        return self
200
201
    def set_spatial(
202
        self,
203
        extent: Optional[str] = None,
204
        resolution: Optional[str] = None,
205
        location: Optional[Any] = None,
206
    ) -> "OEMetadataResourceBuilder":
207
        patch = {"spatial": {}}
208
        if location is not None:
209
            patch["spatial"]["location"] = location
210
        if extent is not None:
211
            patch["spatial"]["extent"] = extent
212
        if resolution is not None:
213
            patch["spatial"]["resolution"] = resolution
214
        self._meta = _deep_merge(self._meta, patch)
215
        self._validated = False
216
        return self
217
218
    def set_temporal(
219
        self,
220
        reference_date: Optional[str] = None,
221
        timeseries: Optional[dict] = None,
222
    ) -> "OEMetadataResourceBuilder":
223
        patch = {"temporal": {}}
224
        if reference_date is not None:
225
            # NOTE: your older code used 'referenceDate' vs
226
            # 'reference_date' in places.
227
            # OEM v2 uses 'referenceDate' (camelCase). Keep consistent here:
228
            patch["temporal"]["referenceDate"] = reference_date
229
        if timeseries is not None:
230
            patch["temporal"]["timeseries"] = timeseries
231
        self._meta = _deep_merge(self._meta, patch)
232
        self._validated = False
233
        return self
234
235
    # ---- Sources, licenses, contributors
236
237
    def add_source(self, source: dict) -> "OEMetadataResourceBuilder":
238
        self._meta.setdefault("sources", [])
239
        self._meta["sources"].append(source)
240
        self._validated = False
241
        return self
242
243
    def add_license(self, lic: dict) -> "OEMetadataResourceBuilder":
244
        self._meta.setdefault("licenses", [])
245
        self._meta["licenses"].append(lic)
246
        self._validated = False
247
        return self
248
249
    def add_contributor(
250
        self, contributor: dict
251
    ) -> "OEMetadataResourceBuilder":
252
        self._meta.setdefault("contributors", [])
253
        self._meta["contributors"].append(contributor)
254
        self._validated = False
255
        return self
256
257
    # ---- Resources
258
259
    def auto_resource_from_table(
260
        self,
261
        engine: Engine,
262
        schema: str,
263
        table: str,
264
        *,
265
        resource_name: Optional[str] = None,
266
        format_: str = "PostgreSQL",
267
        encoding: str = "UTF-8",
268
        primary_key: Optional[List[str]] = None,
269
        foreign_keys: Optional[List[dict]] = None,
270
        geom_cols: Optional[List[str]] = None,
271
        dialect: Optional[dict] = None,
272
        overwrite_existing: bool = False,
273
    ) -> "OEMetadataResourceBuilder":
274
        """
275
        Introspect a DB table and create a single tabular data resource entry.
276
277
        - Maps SQLA types to OEM types
278
        - Marks 'nullable' where possible
279
        - Recognizes geometry columns (if given in geom_cols) as 'geometry'
280
281
        If overwrite_existing=False and a resource already exists with the same
282
        name, it will be left as-is (you could add a flag to update instead).
283
        """
284
        if geom_cols is None:
285
            geom_cols = ["geom", "geometry", "geom_point", "geom_polygon"]
286
287
        # reflect
288
        meta = MetaData()
289
        tbl = Table(table, meta, schema=schema, autoload_with=engine)
290
291
        fields: List[ResourceField] = []
292
        for col in tbl.columns:
293
            sa_t = str(col.type)
294
            # if explicitly geometry by name, treat as geometry
295
            col_type = (
296
                "geometry"
297
                if col.name in geom_cols
298
                else _sqlatype_to_oem_type(sa_t)
299
            )
300
            fields.append(
301
                ResourceField(
302
                    name=col.name,
303
                    description=None,
304
                    type=col_type,
305
                    unit=None,
306
                    nullable=col.nullable,
307
                )
308
            )
309
310
        if not resource_name:
311
            resource_name = f"{schema}.{table}"
312
313
        resource = {
314
            "name": resource_name,
315
            # TODO: @jh-RLI The OEP will set this,
316
            # consider if local usage is important
317
            "path": None,
318
            "type": "table",
319
            "format": format_,
320
            "encoding": encoding,
321
            "schema": {
322
                "fields": [f.to_dict() for f in fields],
323
                "primaryKey": primary_key
324
                or self._best_guess_pk(engine, schema, table),
325
                "foreignKeys": foreign_keys or [],
326
            },
327
            "dialect": dialect or {"delimiter": None, "decimalSeparator": "."},
328
        }
329
330
        # install resources array
331
        self._meta.setdefault("resources", [])
332
        if overwrite_existing:
333
            self._meta["resources"] = [
334
                r
335
                for r in self._meta["resources"]
336
                if r.get("name") != resource_name
337
            ]
338
        # only add if not present
339
        if not any(
340
            r.get("name") == resource_name for r in self._meta["resources"]
341
        ):
342
            self._meta["resources"].append(resource)
343
344
        self._validated = False
345
        return self
346
347
    def _best_guess_pk(
348
        self, engine: Engine, schema: str, table: str
349
    ) -> List[str]:
350
        """
351
        Try to read PK columns via SQLAlchemy inspector, fallback to
352
        ['id'] if found, else [].
353
        """
354
        insp = inspect(engine)
355
        pk = insp.get_pk_constraint(table, schema=schema)
356
        cols = pk.get("constrained_columns") if pk else None
357
        if cols:
358
            return cols
359
        # common fallback
360
        columns = [c["name"] for c in insp.get_columns(table, schema=schema)]
361
        return ["id"] if "id" in columns else []
362
363
    # ---- Finalize/validate/serialize
364
365
    def finalize(
366
        self, license_check: bool = False
367
    ) -> "OEMetadataResourceBuilder":
368
        """
369
        Make minimal guarantees & validate with omi.
370
        """
371
        # Fill sane defaults if missing
372
        # self._meta.setdefault("publicationDate", _today())
373
        self._meta.setdefault("language", ["en-EN"])
374
375
        # TODO: @jh-RLI might be expensive
376
        # parse + validate with omi
377
        # parse_metadata expects string; serialize & round-trip to normalize
378
        # text = json.dumps(self._meta, ensure_ascii=False)
379
        # parsed = parse_metadata(text)
380
381
        # You can toggle license checks if you are mid-migration:
382
        validate_metadata(self._meta, check_license=license_check)
383
384
        # Reassign parsed (it may normalize the structure)
385
        # self._meta = parsed
386
        self._validated = True
387
        return self
388
389
    def as_dict(self) -> dict:
390
        if not self._validated:
391
            self.finalize()
392
        return deepcopy(self._meta)
393
394
    def as_json(self) -> str:
395
        return json.dumps(self.as_dict(), ensure_ascii=False)
396
397
    # ---- Optional convenience: store as comment on a table
398
399
    def save_as_table_comment(
400
        self, engine: Engine, schema: str, table: str
401
    ) -> None:
402
        """
403
        Store metadata JSON as a COMMENT ON TABLE ... (PostgreSQL).
404
        """
405
        payload = self.as_json().replace(
406
            "'", "''"
407
        )  # escape single-quotes for SQL literal
408
        full = f"{schema}.{table}"
409
        sql = f"COMMENT ON TABLE {full} IS '{payload}';"
410
        with engine.begin() as conn:
411
            conn.exec_driver_sql(sql)
412