Passed
Pull Request — dev (#1344)
by
unknown
01:57
created

OEMetadataBuilder.auto_resource_from_table()   B

Complexity

Conditions 7

Size

Total Lines 87
Code Lines 57

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 57
dl 0
loc 87
rs 7.0072
c 0
b 0
f 0
cc 7
nop 13

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from __future__ import annotations
2
3
from copy import deepcopy
4
from dataclasses import dataclass
5
from typing import Any, Dict, List, Optional
6
import datetime as dt
7
import json
8
9
from geoalchemy2 import Geometry
10
from omi.base import MetadataSpecification, get_metadata_specification
11
from omi.validation import validate_metadata  # parse_metadata
12
from sqlalchemy import MetaData, Table, inspect
13
from sqlalchemy.dialects.postgresql.base import ischema_names
14
from sqlalchemy.engine import Engine
15
import yaml  # PyYAML
16
17
# ---- Optional: your project settings/hooks
18
# from egon.data import db, logger
19
from egon.data.metadata import settings
20
21
# Geometry awareness for reflection
22
ischema_names["geometry"] = Geometry  # generic
23
# You can add specific geometry columns later per-table via kwargs
24
25
26
def _today() -> str:
27
    return dt.date.today().isoformat()
28
29
30
def _deep_merge(base: dict, override: dict) -> dict:
31
    """
32
    Deep merge with 'override wins', recursively.
33
    Lists are replaced (not merged) by default to avoid subtle duplication.
34
    """
35
    out = deepcopy(base)
36
    for k, v in override.items():
37
        if isinstance(v, dict) and isinstance(out.get(k), dict):
38
            out[k] = _deep_merge(out[k], v)
39
        else:
40
            out[k] = deepcopy(v)
41
    return out
42
43
44
def _sqlatype_to_oem_type(sa_type: str) -> str:
45
    """
46
    Map SQLAlchemy reflected type string -> OEM v2 field.type
47
    Keep it simple and deterministic; adjust as needed.
48
    """
49
    t = sa_type.lower()
50
    # geometry
51
    if "geometry" in t:
52
        return "geometry"
53
    # integers
54
    if any(x in t for x in ["int", "serial", "bigint", "smallint"]):
55
        return "integer"
56
    # floats / numeric
57
    if any(x in t for x in ["float", "double", "numeric", "real", "decimal"]):
58
        return "number"
59
    # booleans
60
    if "bool" in t:
61
        return "boolean"
62
    # timestamp/date/time
63
    if "timestamp" in t or "timestamptz" in t:
64
        return "datetime"
65
    if t == "date":
66
        return "date"
67
    if t == "time":
68
        return "time"
69
    # text-ish
70
    if any(
71
        x in t for x in ["text", "char", "string", "uuid", "json", "jsonb"]
72
    ):
73
        return "string"
74
    # fallback
75
    return "string"
76
77
78
@dataclass
79
class ResourceField:
80
    """
81
    Minimal implementation of oemetadata v2 resource structure.
82
    Eases usage in Python.
83
    """
84
85
    name: str
86
    description: Optional[str] = None
87
    type: str = "string"
88
    unit: Optional[str] = None
89
    nullable: Optional[bool] = None
90
91
    def to_dict(self) -> dict:
92
        d = {
93
            "name": self.name,
94
            "type": self.type,
95
        }
96
        # include optional keys only when provided
97
        if self.description is not None:
98
            d["description"] = self.description
99
        if self.unit is not None:
100
            d["unit"] = self.unit
101
        if self.nullable is not None:
102
            d["nullable"] = self.nullable
103
        return d
104
105
106
class OEMetadataBuilder:
107
    """
108
    Single, reusable builder for OEP oemetadata v2 using omi as source of truth.
109
110
    Typical flow:
111
      builder = OEMetadataBuilder().from_template()
112
                                   .apply_yaml("dataset_meta.yaml")
113
                                   .auto_resource_from_table(engine, "schema", "table", geom_cols=["geom"])
114
                                   .set_basic(name="schema.table", title="...", description="...")
115
                                   .finalize()
116
      payload = builder.as_json()  # validated JSON string
117
      builder.save_as_table_comment(db_engine, "schema", "table")  # optional
118
    """  # noqa: E501
119
120
    def __init__(self, version: str = settings.OEMETADATA_VERSION) -> None:
121
        self.spec: MetadataSpecification = get_metadata_specification(version)
122
        self._meta: Dict[str, Any] = {}
123
        self._validated: bool = False
124
125
    # ---- Required steps
126
127
    def from_template(self) -> "OEMetadataBuilder":
128
        """
129
        Start from omi's template plus selected bits from example
130
        (context/metaMetadata).
131
        Ensures keys exist (empty strings/structures as per spec).
132
        """
133
        tpl = deepcopy(self.spec.template) if self.spec.template else {}
134
        if self.spec.example:
135
            # Copy @context + metaMetadata if present in example
136
            if "@context" in self.spec.example:
137
                tpl["@context"] = deepcopy(self.spec.example["@context"])
138
            if "metaMetadata" in self.spec.example:
139
                tpl["metaMetadata"] = deepcopy(
140
                    self.spec.example["metaMetadata"]
141
                )
142
        self._meta = tpl["resources"][0]
143
        self._validated = False
144
        return self
145
146
    def apply_yaml(
147
        self, yaml_path: str | None = None, yaml_text: str | None = None
148
    ) -> "OEMetadataBuilder":
149
        """
150
        Merge user-provided YAML overrides into the current metadata object.
151
        You can allow either a file path or a YAML string (for testing).
152
        """
153
        if yaml_path:
154
            with open(yaml_path, "r", encoding="utf-8") as fh:
155
                override = yaml.safe_load(fh) or {}
156
        elif yaml_text:
157
            override = yaml.safe_load(yaml_text) or {}
158
        else:
159
            override = {}
160
161
        self._meta = _deep_merge(self._meta, override)
162
        self._validated = False
163
        return self
164
165
    def set_basic(
166
        self,
167
        name: str,
168
        title: Optional[str] = None,
169
        description: Optional[str] = None,
170
        language: Optional[List[str]] = None,
171
        publication_date: Optional[str] = None,
172
        dataset_id: Optional[str] = None,
173
    ) -> "OEMetadataBuilder":
174
        """
175
        Convenience setter for common top-level fields.
176
        """
177
        if publication_date is None:
178
            publication_date = _today()
179
        patch = {
180
            "name": name,
181
            "publicationDate": publication_date,
182
        }
183
        if title is not None:
184
            patch["title"] = title
185
        if description is not None:
186
            patch["description"] = description
187
        if language is not None:
188
            patch["language"] = language
189
        if dataset_id is not None:
190
            patch["id"] = dataset_id
191
192
        self._meta = _deep_merge(self._meta, patch)
193
        self._validated = False
194
        return self
195
196
    def set_context(self, context_obj: dict) -> "OEMetadataBuilder":
197
        self._meta = _deep_merge(self._meta, {"context": context_obj})
198
        self._validated = False
199
        return self
200
201
    def set_spatial(
202
        self,
203
        extent: Optional[str] = None,
204
        resolution: Optional[str] = None,
205
        location: Optional[Any] = None,
206
    ) -> "OEMetadataBuilder":
207
        patch = {"spatial": {}}
208
        if location is not None:
209
            patch["spatial"]["location"] = location
210
        if extent is not None:
211
            patch["spatial"]["extent"] = extent
212
        if resolution is not None:
213
            patch["spatial"]["resolution"] = resolution
214
        self._meta = _deep_merge(self._meta, patch)
215
        self._validated = False
216
        return self
217
218
    def set_temporal(
219
        self,
220
        reference_date: Optional[str] = None,
221
        timeseries: Optional[dict] = None,
222
    ) -> "OEMetadataBuilder":
223
        patch = {"temporal": {}}
224
        if reference_date is not None:
225
            # NOTE: your older code used 'referenceDate' vs
226
            # 'reference_date' in places.
227
            # OEM v2 uses 'referenceDate' (camelCase). Keep consistent here:
228
            patch["temporal"]["referenceDate"] = reference_date
229
        if timeseries is not None:
230
            patch["temporal"]["timeseries"] = timeseries
231
        self._meta = _deep_merge(self._meta, patch)
232
        self._validated = False
233
        return self
234
235
    # ---- Sources, licenses, contributors
236
237
    def add_source(self, source: dict) -> "OEMetadataBuilder":
238
        self._meta.setdefault("sources", [])
239
        self._meta["sources"].append(source)
240
        self._validated = False
241
        return self
242
243
    def add_license(self, lic: dict) -> "OEMetadataBuilder":
244
        self._meta.setdefault("licenses", [])
245
        self._meta["licenses"].append(lic)
246
        self._validated = False
247
        return self
248
249
    def add_contributor(self, contributor: dict) -> "OEMetadataBuilder":
250
        self._meta.setdefault("contributors", [])
251
        self._meta["contributors"].append(contributor)
252
        self._validated = False
253
        return self
254
255
    # ---- Resources
256
257
    def auto_resource_from_table(
258
        self,
259
        engine: Engine,
260
        schema: str,
261
        table: str,
262
        *,
263
        resource_name: Optional[str] = None,
264
        format_: str = "PostgreSQL",
265
        encoding: str = "UTF-8",
266
        primary_key: Optional[List[str]] = None,
267
        foreign_keys: Optional[List[dict]] = None,
268
        geom_cols: Optional[List[str]] = None,
269
        dialect: Optional[dict] = None,
270
        overwrite_existing: bool = False,
271
    ) -> "OEMetadataBuilder":
272
        """
273
        Introspect a DB table and create a single tabular data resource entry.
274
275
        - Maps SQLA types to OEM types
276
        - Marks 'nullable' where possible
277
        - Recognizes geometry columns (if given in geom_cols) as 'geometry'
278
279
        If overwrite_existing=False and a resource already exists with the same
280
        name, it will be left as-is (you could add a flag to update instead).
281
        """
282
        if geom_cols is None:
283
            geom_cols = ["geom", "geometry", "geom_point", "geom_polygon"]
284
285
        # reflect
286
        meta = MetaData()
287
        tbl = Table(table, meta, schema=schema, autoload_with=engine)
288
289
        fields: List[ResourceField] = []
290
        for col in tbl.columns:
291
            sa_t = str(col.type)
292
            # if explicitly geometry by name, treat as geometry
293
            col_type = (
294
                "geometry"
295
                if col.name in geom_cols
296
                else _sqlatype_to_oem_type(sa_t)
297
            )
298
            fields.append(
299
                ResourceField(
300
                    name=col.name,
301
                    description=None,
302
                    type=col_type,
303
                    unit=None,
304
                    nullable=col.nullable,
305
                )
306
            )
307
308
        if not resource_name:
309
            resource_name = f"{schema}.{table}"
310
311
        resource = {
312
            "name": resource_name,
313
            # TODO: @jh-RLI The OEP will set this,
314
            # consider if local usage is important
315
            "path": None,
316
            "type": "table",
317
            "format": format_,
318
            "encoding": encoding,
319
            "schema": {
320
                "fields": [f.to_dict() for f in fields],
321
                "primaryKey": primary_key
322
                or self._best_guess_pk(engine, schema, table),
323
                "foreignKeys": foreign_keys or [],
324
            },
325
            "dialect": dialect or {"delimiter": None, "decimalSeparator": "."},
326
        }
327
328
        # install resources array
329
        self._meta.setdefault("resources", [])
330
        if overwrite_existing:
331
            self._meta["resources"] = [
332
                r
333
                for r in self._meta["resources"]
334
                if r.get("name") != resource_name
335
            ]
336
        # only add if not present
337
        if not any(
338
            r.get("name") == resource_name for r in self._meta["resources"]
339
        ):
340
            self._meta["resources"].append(resource)
341
342
        self._validated = False
343
        return self
344
345
    def _best_guess_pk(
346
        self, engine: Engine, schema: str, table: str
347
    ) -> List[str]:
348
        """
349
        Try to read PK columns via SQLAlchemy inspector, fallback to
350
        ['id'] if found, else [].
351
        """
352
        insp = inspect(engine)
353
        pk = insp.get_pk_constraint(table, schema=schema)
354
        cols = pk.get("constrained_columns") if pk else None
355
        if cols:
356
            return cols
357
        # common fallback
358
        columns = [c["name"] for c in insp.get_columns(table, schema=schema)]
359
        return ["id"] if "id" in columns else []
360
361
    # ---- Finalize/validate/serialize
362
363
    def finalize(self, license_check: bool = False) -> "OEMetadataBuilder":
364
        """
365
        Make minimal guarantees & validate with omi.
366
        """
367
        # Fill sane defaults if missing
368
        # self._meta.setdefault("publicationDate", _today())
369
        self._meta.setdefault("language", ["en-EN"])
370
371
        # TODO: @jh-RLI might be expensive
372
        # parse + validate with omi
373
        # parse_metadata expects string; serialize & round-trip to normalize
374
        # text = json.dumps(self._meta, ensure_ascii=False)
375
        # parsed = parse_metadata(text)
376
377
        # You can toggle license checks if you are mid-migration:
378
        validate_metadata(self._meta, check_license=license_check)
379
380
        # Reassign parsed (it may normalize the structure)
381
        # self._meta = parsed
382
        self._validated = True
383
        return self
384
385
    def as_dict(self) -> dict:
386
        if not self._validated:
387
            self.finalize()
388
        return deepcopy(self._meta)
389
390
    def as_json(self) -> str:
391
        return json.dumps(self.as_dict(), ensure_ascii=False)
392
393
    # ---- Optional convenience: store as comment on a table
394
395
    def save_as_table_comment(
396
        self, engine: Engine, schema: str, table: str
397
    ) -> None:
398
        """
399
        Store metadata JSON as a COMMENT ON TABLE ... (PostgreSQL).
400
        """
401
        payload = self.as_json().replace(
402
            "'", "''"
403
        )  # escape single-quotes for SQL literal
404
        full = f"{schema}.{table}"
405
        sql = f"COMMENT ON TABLE {full} IS '{payload}';"
406
        with engine.begin() as conn:
407
            conn.exec_driver_sql(sql)
408