Passed
Pull Request — dev (#1344)
by
unknown
02:18
created

migrate_json_to_split_yaml._compute_template()   F

Complexity

Conditions 14

Size

Total Lines 30
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 30
rs 3.6
c 0
b 0
f 0
cc 14
nop 1

How to fix   Complexity   

Complexity

Complex classes like migrate_json_to_split_yaml._compute_template() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
from __future__ import annotations
3
4
from copy import deepcopy
5
from pathlib import Path
6
from typing import Any, Dict, List, Tuple
7
import argparse
8
import json
9
10
import yaml
11
12
# ---------- YAML helpers ----------
13
14
15
def _dump_yaml(path: Path, obj: object) -> None:
16
    path.parent.mkdir(parents=True, exist_ok=True)
17
    with path.open("w", encoding="utf-8") as f:
18
        yaml.safe_dump(obj, f, sort_keys=False, allow_unicode=True)
19
20
21
# ---------- list/dict helpers for template inference ----------
22
23
_LIST_TEMPLATE_KEYS = {"keywords", "topics", "languages"}
24
_DICT_TEMPLATE_KEYS = {"context"}
25
_LIST_EQUALITY_KEYS = {
26
    "licenses"
27
}  # consider as common only if identical across all
28
29
30
def _deep_intersection_dict(dicts: List[Dict[str, Any]]) -> Dict[str, Any]:
31
    if not dicts:
32
        return {}
33
    keys_in_all = set(dicts[0].keys())
34
    for d in dicts[1:]:
35
        keys_in_all &= set(d.keys())
36
37
    out: Dict[str, Any] = {}
38
    for k in keys_in_all:
39
        vals = [d[k] for d in dicts]
40
        if not all(type(v) is type(vals[0]) for v in vals):  # noqa: E721
41
            continue
42
        v0 = vals[0]
43
        if isinstance(v0, dict):
44
            # type: ignore[arg-type]
45
            sub = _deep_intersection_dict([v for v in vals])
46
            if sub:
47
                out[k] = sub
48
        elif isinstance(v0, list):
49
            if all(v == v0 for v in vals[1:]):
50
                out[k] = deepcopy(v0)
51
        else:
52
            if all(v == v0 for v in vals[1:]):
53
                out[k] = v0
54
    return out
55
56
57
def _all_identical(values: List[Any]) -> Tuple[bool, Any]:
58
    if not values:
59
        return False, None
60
    v0 = values[0]
61
    return (all(v == v0 for v in values[1:])), v0
62
63
64
# ---------- schema fix ----------
65
66
67
def _ensure_nullable_in_fields(resource: Dict[str, Any]) -> None:
68
    schema = resource.get("schema")
69
    if not isinstance(schema, dict):
70
        return
71
    fields = schema.get("fields")
72
    if not isinstance(fields, list):
73
        return
74
    for fld in fields:
75
        if isinstance(fld, dict) and "nullable" not in fld:
76
            fld["nullable"] = False
77
78
79
# ---------- names ----------
80
81
82
def _slug(s: str) -> str:
83
    return "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in s)
84
85
86
def _resource_filename(name: str) -> str:
87
    return f"{_slug(name)}.resource.yaml"
88
89
90
# ---------- core migration ----------
91
92
93
def _strip_resource_by_template(
94
    res: Dict[str, Any], tmpl: Dict[str, Any]
95
) -> Dict[str, Any]:
96
    out = deepcopy(res)
97
98
    # strip list-keys if identical
99
    for k in list(_LIST_TEMPLATE_KEYS | _LIST_EQUALITY_KEYS):
100
        if k in tmpl and k in out and out[k] == tmpl[k]:
101
            out.pop(k, None)
102
103
    # strip dict-keys by matching deep subset of template keys
104
    for k in _DICT_TEMPLATE_KEYS:
105
        if (
106
            k not in tmpl
107
            or k not in out
108
            or not isinstance(out[k], dict)
109
            or not isinstance(tmpl[k], dict)
110
        ):
111
            continue
112
        pruned = {}
113
        for kk, vv in out[k].items():
114
            if kk in tmpl[k] and tmpl[k][kk] == vv:
115
                continue
116
            pruned[kk] = vv
117
        if pruned:
118
            out[k] = pruned
119
        else:
120
            out.pop(k, None)
121
122
    return out
123
124
125
def _compute_template(resources: List[Dict[str, Any]]) -> Dict[str, Any]:
126
    template: Dict[str, Any] = {}
127
    if not resources:
128
        return template
129
130
    # dict keys (deep intersection)
131
    for k in _DICT_TEMPLATE_KEYS:
132
        dicts = [r.get(k, {}) for r in resources if isinstance(r.get(k), dict)]
133
        if len(dicts) == len(resources) and dicts:
134
            inter = _deep_intersection_dict(dicts)
135
            if inter:
136
                template[k] = inter
137
138
    # list equality keys (identical lists across all)
139
    for k in _LIST_EQUALITY_KEYS:
140
        lists = [r.get(k) for r in resources]
141
        if all(isinstance(v, list) for v in lists):
142
            same, val = _all_identical(lists)
143
            if same and val:
144
                template[k] = deepcopy(val)
145
146
    # simple list keys that must be identical across resources
147
    for k in _LIST_TEMPLATE_KEYS:
148
        vals = [r.get(k) for r in resources]
149
        if all(isinstance(v, list) for v in vals):
150
            same, val = _all_identical(vals)
151
            if same and val:
152
                template[k] = deepcopy(val)
153
154
    return template
155
156
157
def migrate_monolithic(
158
    data: Dict[str, Any], out_dir: Path, dataset_id: str
159
) -> None:
160
    resources = data.get("resources", [])
161
    if not isinstance(resources, list):
162
        resources = []
163
164
    # Build dataset block (skip @context/metaMetadata/resources)
165
    dataset: Dict[str, Any] = {
166
        k: v
167
        for k, v in data.items()
168
        if k not in {"@context", "metaMetadata", "resources"}
169
    }
170
    version = dataset.get("version", "OEMetadata-2.0.4")
171
172
    # Fix fields
173
    for r in resources:
174
        if isinstance(r, dict):
175
            _ensure_nullable_in_fields(r)
176
177
    # Compute template & strip
178
    template = _compute_template(resources)
179
    stripped_resources = [
180
        _strip_resource_by_template(r, template) for r in resources
181
    ]
182
183
    # Write YAMLs
184
    ds_path = out_dir / "datasets" / f"{dataset_id}.dataset.yaml"
185
    _dump_yaml(ds_path, {"version": version, "dataset": dataset})
186
187
    if template:
188
        tp_path = out_dir / "datasets" / f"{dataset_id}.template.yaml"
189
        _dump_yaml(tp_path, template)
190
191
    res_dir = out_dir / "resources" / dataset_id
192
    for r in stripped_resources:
193
        name = str(r.get("name") or "resource")
194
        _dump_yaml(res_dir / _resource_filename(name), r)
195
196
    print(
197
        f"[OK] {dataset_id}: wrote dataset + {len(stripped_resources)}"
198
        "resources"
199
    )
200
201
202
def migrate_resource_only(
203
    data: Dict[str, Any], out_dir: Path, dataset_id: str
204
) -> None:
205
    """
206
    For JSONs that contain a single resource (no top-level 'resources').
207
    We just write one resource YAML. Dataset/template can be added later.
208
    """
209
    name = str(data.get("name") or "resource")
210
    _ensure_nullable_in_fields(data)
211
    res_dir = out_dir / "resources" / dataset_id
212
    _dump_yaml(res_dir / _resource_filename(name), data)
213
    print(f"[OK] {dataset_id}: wrote single resource '{name}'")
214
215
216
# ---------- dataset_id inference ----------
217
218
219
def pick_dataset_id(
220
    mode: str,
221
    data: Dict[str, Any],
222
    file_path: Path,
223
    fixed_id: str | None,
224
) -> str:
225
    if mode == "fixed":
226
        if not fixed_id:
227
            raise ValueError(
228
                "--dataset-id is required when --dataset-id-mode=fixed"
229
            )
230
        return fixed_id
231
    if mode == "name":
232
        n = data.get("name")
233
        if isinstance(n, str) and n.strip():
234
            return _slug(n)
235
        return _slug(file_path.stem)
236
    if mode == "filename":
237
        return _slug(file_path.stem)
238
    if mode == "parent":
239
        return _slug(file_path.parent.name)
240
    raise ValueError(f"Unknown dataset-id-mode: {mode}")
241
242
243
# ---------- CLI ----------
244
245
246
def main() -> None:
247
    ap = argparse.ArgumentParser(
248
        description="Migrate OEMetadata JSON → split YAML"
249
        "(handles many files)."
250
    )
251
    ap.add_argument(
252
        "--input",
253
        required=True,
254
        type=Path,
255
        help="Path to a JSON file or a directory to scan recursively.",
256
    )
257
    ap.add_argument(
258
        "--out-dir",
259
        required=True,
260
        type=Path,
261
        help="Root of the split-YAML metadata tree to write.",
262
    )
263
    ap.add_argument(
264
        "--dataset-id-mode",
265
        choices=["name", "filename", "parent", "fixed"],
266
        default="name",
267
        help="How to determine dataset_id per input file (default: name).",
268
    )
269
    ap.add_argument(
270
        "--dataset-id",
271
        default=None,
272
        help="Required if --dataset-id-mode=fixed; ignored otherwise.",
273
    )
274
    args = ap.parse_args()
275
276
    inputs: List[Path]
277
    if args.input.is_dir():
278
        inputs = sorted(args.input.rglob("*.json"))
279
        if not inputs:
280
            print(f"[WARN] No JSON files found under {args.input}")
281
            return
282
    else:
283
        inputs = [args.input]
284
285
    out_dir = args.out_dir
286
287
    for j in inputs:
288
        try:
289
            data = json.loads(j.read_text(encoding="utf-8"))
290
        except Exception as e:
291
            print(f"[SKIP] {j}: cannot read/parse JSON ({e})")
292
            continue
293
294
        try:
295
            dsid = pick_dataset_id(
296
                args.dataset_id_mode, data, j, args.dataset_id
297
            )
298
        except Exception as e:
299
            print(f"[SKIP] {j}: {e}")
300
            continue
301
302
        if isinstance(data.get("resources"), list):
303
            migrate_monolithic(data, out_dir, dsid)
304
        else:
305
            # treat as a single resource JSON (best-effort)
306
            migrate_resource_only(data, out_dir, dsid)
307
308
309
if __name__ == "__main__":
310
    main()
311