schemastore   B
last analyzed

Complexity

Total Complexity 44

Size/Duplication

Total Lines 235
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 142
dl 0
loc 235
rs 8.8798
c 0
b 0
f 0
wmc 44

5 Functions

Rating   Name   Duplication   Size   Complexity  
F build_schemastore_new() 0 125 28
A validate_schemastore() 0 6 3
C build_l10n_schemastore() 0 41 9
A schemata_log() 0 4 1
A test_schemata() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like schemastore often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
# Isomer - The distributed application framework
5
# ==============================================
6
# Copyright (C) 2011-2020 Heiko 'riot' Weinen <[email protected]> and others.
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU Affero General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
# GNU Affero General Public License for more details.
17
#
18
# You should have received a copy of the GNU Affero General Public License
19
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21
"""Schemastore builder"""
22
23
import formal
24
import jsonschema
25
26
from copy import deepcopy
27
from pkg_resources import iter_entry_points, DistributionNotFound
28
from json import dumps
29
30
from isomer.logger import isolog, verbose, warn, debug
31
from isomer.misc import all_languages, i18n as _
32
33
34
def schemata_log(*args, **kwargs):
35
    """Log as emitter 'SCHEMATA'"""
36
    kwargs.update({"emitter": "SCHEMATA", "frame_ref": 2})
37
    isolog(*args, **kwargs)
38
39
40
schemastore = {}
41
l10n_schemastore = {}
42
restrictions = {}
43
configschemastore = {}
44
45
46
def build_schemastore_new():
47
    available = {}
48
49
    for schema_entrypoint in iter_entry_points(group="isomer.schemata", name=None):
50
        try:
51
            schemata_log("Schemata found: ", schema_entrypoint.name, lvl=verbose)
52
            schema = schema_entrypoint.load()
53
            available[schema_entrypoint.name] = schema
54
        except (ImportError, DistributionNotFound) as e:
55
            schemata_log(
56
                "Problematic schema: ", schema_entrypoint.name, exc=True, lvl=warn
57
            )
58
59
    def schema_insert(dictionary, insert_path, insert_object):
60
        insert_path = insert_path.split("/")
61
62
        place = dictionary
63
64
        for element in insert_path:
65
            if element != "":
66
                place = place[element]
67
68
        place.update(insert_object)
69
70
        return dictionary
71
72
    def form_insert(insert_form, insert_index, insert_path, insert_object):
73
        insert_path = insert_path.split("/")
74
        place = None
75
        if isinstance(insert_index, str):
76
            for widget in insert_form:
77
                if isinstance(widget, dict) and widget.get("id", None) is not None:
78
                    place = widget
79
        else:
80
            place = insert_form[insert_index]
81
82
        if place is None:
83
            schemata_log("No place to insert into form found:", insert_path, insert_form, insert_object)
84
            return
85
86
        for element in insert_path:
87
            schemata_log(element, place, lvl=verbose)
88
            try:
89
                element = int(element)
90
            except ValueError:
91
                pass
92
            if element != "":
93
                place = place[element]
94
95
        if isinstance(place, dict):
96
            place.update(insert_object)
97
        else:
98
            place.append(insert_object)
99
100
        return insert_form
101
102
    def _get_field_restrictions(item):
103
        result = {}
104
105
        for key, thing in item['schema']['properties'].items():
106
            if 'roles' in thing:
107
                # schemata_log(thing)
108
                result[key] = thing['roles']
109
110
        return result
111
112
    for key, item in available.items():
113
        restrictions[key] = _get_field_restrictions(item)
114
        schemata_log("Schema", key, "restrictions:", restrictions[key], lvl=debug)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable restrictions does not seem to be defined.
Loading history...
115
116
        extends = item.get("extends", None)
117
        if extends is not None:
118
            schemata_log(key, "extends:", extends, pretty=True, lvl=verbose)
119
120
            for model, extension_group in extends.items():
121
                schema_extensions = extension_group.get("schema", None)
122
                form_extensions = extension_group.get("form", None)
123
                schema = available[model].get("schema", None)
124
                form = available[model].get("form", None)
125
126
                original_schema = deepcopy(schema)
127
128
                if schema_extensions is not None:
129
                    schemata_log("Extending schema", model, "from", key, lvl=debug)
130
                    for path, extensions in schema_extensions.items():
131
                        schemata_log(
132
                            "Item:", path, "Extensions:", extensions, lvl=verbose
133
                        )
134
                        for obj in extensions:
135
                            available[model]["schema"] = schema_insert(
136
                                schema, path, obj
137
                            )
138
                            schemata_log("Path:", path, "obj:", obj, lvl=verbose)
139
140
                if form_extensions is not None:
141
                    schemata_log("Extending form of", model, "with", key, lvl=verbose)
142
                    for index, extensions in form_extensions.items():
143
                        schemata_log(
144
                            "Item:", index, "Extensions:", extensions, lvl=verbose
145
                        )
146
                        for path, obj in extensions.items():
147
                            if not isinstance(obj, list):
148
                                obj = [obj]
149
                            for thing in obj:
150
                                available[model]["form"] = form_insert(
151
                                    form, index, path, thing
152
                                )
153
                                schemata_log("Path:", path, "obj:", thing, lvl=verbose)
154
155
                # schemata_log(available[model]['form'], pretty=True, lvl=warn)
156
                try:
157
                    jsonschema.Draft4Validator.check_schema(schema)
158
                except jsonschema.SchemaError as e:
159
                    schemata_log(
160
                        "Schema extension failed:", model, extension_group, exc=True
161
                    )
162
                    available[model]["schema"] = original_schema
163
164
    schemata_log(
165
        "Found", len(available), "schemata: ", sorted(available.keys()), lvl=debug
166
    )
167
168
    validate_schemastore(available)
169
170
    return available
171
172
def validate_schemastore(store):
173
    for key, item in store.items():
174
        try:
175
            _ = dumps(item)
176
        except Exception as e:
177
            schemata_log("Schema did not validate:", key, e, pretty=True, lvl=warn)
178
179
180
def build_l10n_schemastore(available):
181
    l10n_schemata = {}
182
183
    for lang in all_languages():
184
185
        language_schemata = {}
186
187
        def translate(schema):
188
            """Generate a translated copy of a schema"""
189
190
            localized = deepcopy(schema)
191
192
            def walk(branch):
193
                """Inspect a schema recursively to translate descriptions and titles"""
194
195
                if isinstance(branch, dict):
196
197
                    if "title" in branch and isinstance(branch["title"], str):
198
                        # schemata_log(branch['title'])
199
                        branch["title"] = _(branch["title"], lang=lang)
200
                    if "description" in branch and isinstance(
201
                        branch["description"], str
202
                    ):
203
                        # schemata_log(branch['description'])
204
                        branch["description"] = _(branch["description"], lang=lang)
205
206
                    for branch_item in branch.values():
207
                        walk(branch_item)
208
209
            walk(localized)
210
211
            return localized
212
213
        for key, item in available.items():
214
            language_schemata[key] = translate(item)
215
216
        l10n_schemata[lang] = language_schemata
217
218
        # schemata_log(l10n_schemata['de']['client'], pretty=True, lvl=error)
219
220
    return l10n_schemata
221
222
223
def test_schemata():
224
    """Validates all registered schemata"""
225
226
    objects = {}
227
228
    for schemaname in schemastore.keys():
229
        objects[schemaname] = formal.model_factory(schemastore[schemaname]["schema"])
230
        try:
231
            testobject = objects[schemaname]()
232
            testobject.validate()
233
        except Exception as e:
234
            schemata_log("Blank schema did not validate:", schemaname, exc=True)
235
236
            # pprint(objects)
237