Passed
Pull Request — dev (#1344)
by
unknown
02:05
created

package.OEMetadataPackage.add_from_full_document()   B

Complexity

Conditions 6

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 11
rs 8.6666
c 0
b 0
f 0
cc 6
nop 4
1
from copy import deepcopy
2
from typing import Any, Dict
3
import json
4
5
from omi.base import get_metadata_specification
6
from omi.validation import validate_metadata  # parse_metadata
7
from sqlalchemy.engine import Engine
8
9
from egon.data.metadata import settings
10
11
12
class OEMetadataPackage:
13
    def __init__(self, version: str = settings.OEMETADATA_VERSION) -> None:
14
        self.spec = get_metadata_specification(version)
15
        self._doc: Dict[str, Any] = {
16
            "@context": "https://raw.githubusercontent.com/OpenEnergyPlatform/oemetadata/production/oemetadata/latest/context.json",  # noqa: E501
17
            "name": "",
18
            "title": "",
19
            "description": "",
20
            "@id": "",
21
            "resources": [],
22
            "metaMetadata": {
23
                "metadataVersion": version,
24
                "metadataLicense": {"name": "CC0-1.0"},
25
            },
26
        }
27
        self._validated = False
28
29
    def set_root(
30
        self,
31
        *,
32
        name: str,
33
        title: str = "",
34
        description: str = "",
35
        id_: str = "",
36
    ) -> "OEMetadataPackage":
37
        self._doc["name"] = name
38
        self._doc["title"] = title
39
        self._doc["description"] = description
40
        self._doc["@id"] = id_
41
        self._validated = False
42
        return self
43
44
    def add_resource(
45
        self,
46
        resource: dict,
47
        *,
48
        dedupe_by: str = "name",
49
        overwrite: bool = True,
50
    ) -> "OEMetadataPackage":
51
        if dedupe_by and overwrite:
52
            self._doc["resources"] = [
53
                r
54
                for r in self._doc["resources"]
55
                if r.get(dedupe_by) != resource.get(dedupe_by)
56
            ]
57
        self._doc["resources"].append(deepcopy(resource))
58
        self._validated = False
59
        return self
60
61
    def add_from_full_document(
62
        self, full_doc: dict, *, take_root_if_empty: bool = False
63
    ) -> "OEMetadataPackage":
64
        # Optionally fill root if still empty
65
        if take_root_if_empty and not self._doc["name"]:
66
            for k in ("name", "title", "description", "@id"):
67
                if k in full_doc:
68
                    self._doc[k] = full_doc[k]
69
        for r in full_doc.get("resources", []):
70
            self.add_resource(r)
71
        return self
72
73
    def add_from_table_comment(
74
        self, engine: Engine, schema: str, table: str
75
    ) -> "OEMetadataPackage":
76
        sql = """
77
        SELECT obj_description((quote_ident(%s)||'.'||quote_ident(%s))::regclass, 'pg_class') AS comment
78
        """  # noqa: E501
79
        with engine.begin() as conn:
80
            comment = conn.exec_driver_sql(sql, (schema, table)).scalar()
81
        if not comment:
82
            return self
83
        try:
84
            full_doc = json.loads(comment)
85
        except Exception:
86
            return self
87
        # Optionally validate the doc before merging
88
        try:
89
            validate_metadata(full_doc, check_license=False)
90
        except Exception:
91
            pass
92
        return self.add_from_full_document(full_doc)
93
94
    def finalize(self, *, license_check: bool = True) -> "OEMetadataPackage":
95
        validate_metadata(self._doc, check_license=license_check)
96
        self._validated = True
97
        return self
98
99
    def as_dict(self) -> dict:
100
        if not self._validated:
101
            self.finalize()
102
        return deepcopy(self._doc)
103
104
    def as_json(self) -> str:
105
        return json.dumps(self.as_dict(), ensure_ascii=False)
106