Passed
Pull Request — dev (#1344)
by
unknown
02:07
created

package.OEMetadataPackage.add_from_full_document()   B

Complexity

Conditions 6

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 11
rs 8.6666
c 0
b 0
f 0
cc 6
nop 4
1
from copy import deepcopy
2
from typing import Any, Dict
3
import json
4
5
from omi.base import get_metadata_specification
6
from omi.validation import validate_metadata  # parse_metadata
7
from sqlalchemy.engine import Engine
8
9
from egon.data.metadata import settings
10
11
12
class OEMetadataPackage:
13
    def __init__(self, version: str = settings.OEMETADATA_VERSION) -> None:
14
        self.spec = get_metadata_specification(version)
15
16
        self._doc: Dict[str, Any] = {
17
            "@context": self.spec.example["@context"],
18
            "name": "",
19
            "title": "",
20
            "description": "",
21
            "@id": "",
22
            "resources": [],
23
            "metaMetadata": self.spec.example["metaMetadata"],
24
        }
25
        self._validated = False
26
27
    def set_root(
28
        self,
29
        *,
30
        name: str,
31
        title: str = "",
32
        description: str = "",
33
        id_: str = "",
34
    ) -> "OEMetadataPackage":
35
        self._doc["name"] = name
36
        self._doc["title"] = title
37
        self._doc["description"] = description
38
        self._doc["@id"] = id_
39
        self._validated = False
40
        return self
41
42
    def add_resource(
43
        self,
44
        resource: dict,
45
        *,
46
        dedupe_by: str = "name",
47
        overwrite: bool = True,
48
    ) -> "OEMetadataPackage":
49
        if dedupe_by and overwrite:
50
            self._doc["resources"] = [
51
                r
52
                for r in self._doc["resources"]
53
                if r.get(dedupe_by) != resource.get(dedupe_by)
54
            ]
55
        self._doc["resources"].append(deepcopy(resource))
56
        self._validated = False
57
        return self
58
59
    def add_from_full_document(
60
        self, full_doc: dict, *, take_root_if_empty: bool = False
61
    ) -> "OEMetadataPackage":
62
        # Optionally fill root if still empty
63
        if take_root_if_empty and not self._doc["name"]:
64
            for k in ("name", "title", "description", "@id"):
65
                if k in full_doc:
66
                    self._doc[k] = full_doc[k]
67
        for r in full_doc.get("resources", []):
68
            self.add_resource(r)
69
        return self
70
71
    def add_from_table_comment(
72
        self, engine: Engine, schema: str, table: str
73
    ) -> "OEMetadataPackage":
74
        sql = """
75
        SELECT obj_description((quote_ident(%s)||'.'||quote_ident(%s))::regclass, 'pg_class') AS comment
76
        """  # noqa: E501
77
        with engine.begin() as conn:
78
            comment = conn.exec_driver_sql(sql, (schema, table)).scalar()
79
        if not comment:
80
            return self
81
        try:
82
            full_doc = json.loads(comment)
83
        except Exception:
84
            return self
85
        # Optionally validate the doc before merging
86
        try:
87
            validate_metadata(full_doc, check_license=False)
88
        except Exception:
89
            pass
90
        return self.add_from_full_document(full_doc)
91
92
    def finalize(self, *, license_check: bool = True) -> "OEMetadataPackage":
93
        validate_metadata(self._doc, check_license=license_check)
94
        self._validated = True
95
        return self
96
97
    def as_dict(self) -> dict:
98
        if not self._validated:
99
            self.finalize()
100
        return deepcopy(self._doc)
101
102
    def as_json(self) -> str:
103
        return json.dumps(self.as_dict(), ensure_ascii=False)
104