ModelManager   B
last analyzed

Complexity

Total Complexity 42

Size/Duplication

Total Lines 227
Duplicated Lines 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
wmc 42
c 4
b 0
f 0
dl 0
loc 227
rs 8.295

24 Methods

Rating   Name   Duplication   Size   Complexity  
A new_model() 0 17 2
A _open_xml() 0 5 1
A open() 0 5 2
A open_ua_model() 0 7 2
A open_xml() 0 7 2
A import_xml() 0 8 2
A __init__() 0 9 1
A delete_node() 0 8 4
A paste_node() 0 11 2
A close_model() 0 9 3
A save_ua_model() 0 11 2
A save_xml() 0 10 1
A _attr_written() 0 7 3
A _after_add() 0 8 2
A _open_ua_model() 0 10 2
A add_data_type() 0 6 1
A add_object() 0 7 1
A add_property() 0 7 1
A add_method() 0 9 1
A add_folder() 0 6 1
A add_variable_type() 0 7 1
A _get_path() 0 8 3
A add_object_type() 0 6 1
A add_variable() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like ModelManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
import os
3
import xml.etree.ElementTree as Et
4
5
6
from PyQt5.QtCore import pyqtSignal, QObject, QSettings
7
8
from opcua import ua
9
from opcua import copy_node
10
from opcua.common.instantiate import instantiate
11
from opcua.common.ua_utils import get_node_children
12
13
from uawidgets.utils import trycatchslot
14
15
from uamodeler.server_manager import ServerManager
16
17
logger = logging.getLogger(__name__)
18
19
20
class ModelManager(QObject):
21
    """
22
    Manage our model. loads xml, start and close, add nodes
23
    No dialogs at that level, only api
24
    """
25
26
    error = pyqtSignal(Exception)
27
    titleChanged = pyqtSignal(str)
28
    modelChanged = pyqtSignal()
29
30
    def __init__(self, modeler):
31
        QObject.__init__(self, modeler)
32
        self.modeler = modeler
33
        self.server_mgr = ServerManager(self.modeler.ui.actionUseOpenUa)
34
        self.new_nodes = []  # the added nodes we will save
35
        self.current_path = None
36
        self.settings = QSettings()
37
        self.modified = False
38
        self.modeler.attrs_ui.attr_written.connect(self._attr_written)
39
40
    def delete_node(self, node):
41
        if node:
42
            nodes = get_node_children(node)
43
            for n in nodes:
44
                n.delete()
45
                if n in self.new_nodes:
46
                    self.new_nodes.remove(n)
47
            self.modeler.tree_ui.remove_current_item()
48
49
    def paste_node(self, node):
50
        parent = self.modeler.get_current_node()
51
        try:
52
            added_nodes = copy_node(parent, node)
53
        except Exception as ex:
54
            self.show_error(ex)
55
            raise
56
        self.new_nodes.extend(added_nodes)
57
        self.modeler.tree_ui.reload_current()
58
        self.modeler.show_refs()
59
        self.modified = True
60
61
    def close_model(self, force=False):
62
        if not force and self.modified:
63
            raise RuntimeError("Model is modified, use force to close it")
64
        self.modeler.actions.disable_all_actions()
65
        self.server_mgr.stop_server()
66
        self.current_path = None
67
        self.modified = False
68
        self.titleChanged.emit("")
69
        self.modeler.clear_all_widgets()
70
71
    def new_model(self):
72
        if self.modified:
73
            raise RuntimeError("Model is modified, cannot create new model")
74
        del (self.new_nodes[:])  # empty list while keeping reference
75
76
        endpoint = "opc.tcp://0.0.0.0:48400/freeopcua/uamodeler/"
77
        logger.info("Starting server on %s", endpoint)
78
        self.server_mgr.start_server(endpoint)
79
80
        self.modeler.tree_ui.set_root_node(self.server_mgr.nodes.root)
81
        self.modeler.idx_ui.set_node(self.server_mgr.get_node(ua.ObjectIds.Server_NamespaceArray))
82
        self.modeler.nodesets_ui.set_server_mgr(self.server_mgr)
83
        self.modified = False
84
        self.modeler.actions.enable_model_actions()
85
        self.current_path = None
86
        self.titleChanged.emit("No Name")
87
        return True
88
89
    def import_xml(self, path):
90
        new_nodes = self.server_mgr.import_xml(path)
91
        self.new_nodes.extend([self.server_mgr.get_node(node) for node in new_nodes])
92
        self.modified = True
93
        # we maybe should only reload the imported nodes
94
        self.modeler.tree_ui.reload()
95
        self.modeler.idx_ui.reload()
96
        return path
97
98
    def open_xml(self, path):
99
        self.new_model()
100
        try:
101
            self._open_xml(path)
102
        except:
103
            self.close_model(force=True)
104
            raise
105
106
    def _open_xml(self, path):
107
        path = self.import_xml(path)
108
        self.modified = False
109
        self.current_path = path
110
        self.titleChanged.emit(self.current_path)
111
112
    def open(self, path):
113
        if path.endswith(".xml"):
114
            self.open_xml(path)
115
        else:
116
            self.open_ua_model(path)
117
118
    def open_ua_model(self, path):
119
        self.new_model()
120
        try:
121
            self._open_ua_model(path)
122
        except:
123
            self.close_model(force=True)
124
            raise
125
126
    def _open_ua_model(self, path):
127
        tree = Et.parse(path)
128
        root = tree.getroot()
129
        for ref_el in root.findall("Reference"):
130
            refpath = ref_el.attrib['path']
131
            self.modeler.nodesets_ui.import_nodeset(refpath)
132
        mod_el = root.find("Model")
133
        dirname = os.path.dirname(path)
134
        xmlpath = os.path.join(dirname, mod_el.attrib['path'])
135
        self._open_xml(xmlpath)
136
137
    def _get_path(self, path):
138
        if path is None:
139
            path = self.current_path
140
        if path is None:
141
            raise ValueError("No path is defined")
142
        self.current_path = os.path.splitext(path)[0] 
143
        self.titleChanged.emit(self.current_path)
144
        return self.current_path
145
146
    def save_xml(self, path=None):
147
        path = self._get_path(path)
148
        path += ".xml"
149
        logger.info("Saving nodes to %s", path)
150
        logger.info("Exporting  %s nodes: %s", len(self.new_nodes), self.new_nodes)
151
        logger.info("and namespaces: %s ", self.server_mgr.get_namespace_array()[1:])
152
        uris = self.server_mgr.get_namespace_array()[1:]
153
        self.server_mgr.export_xml(self.new_nodes, uris, path)
154
        self.modified = False
155
        logger.info("%s saved", path)
156
157
    def save_ua_model(self, path=None):
158
        path = self._get_path(path)
159
        model_path = path + ".uamodel"
160
        logger.info("Saving model to %s", model_path)
161
        etree = Et.ElementTree(Et.Element('UAModel'))
162
        node_el = Et.SubElement(etree.getroot(), "Model")
163
        node_el.attrib["path"] = os.path.basename(path) + ".xml"
164
        for refpath in self.modeler.nodesets_ui.nodesets:
165
            node_el = Et.SubElement(etree.getroot(), "Reference")
166
            node_el.attrib["path"] = refpath
167
        etree.write(model_path, encoding='utf-8', xml_declaration=True)
168
169
    def _after_add(self, new_nodes):
170
        if isinstance(new_nodes, (list, tuple)):
171
            self.new_nodes.extend(new_nodes)
172
        else:
173
            self.new_nodes.append(new_nodes)
174
        self.modeler.tree_ui.reload_current()
175
        self.modeler.show_refs()
176
        self.modified = True
177
178
    def add_method(self, *args):
179
        logger.info("Creating method type with args: %s", args)
180
        parent = self.modeler.tree_ui.get_current_node()
181
        new_nodes = []
182
        new_node = parent.add_method(*args)
183
        new_nodes.append(new_node)
184
        new_nodes.extend(new_node.get_children())
185
        self._after_add(new_nodes)
186
        return new_nodes
187
188
    def add_object_type(self, *args):
189
        logger.info("Creating object type with args: %s", args)
190
        parent = self.modeler.tree_ui.get_current_node()
191
        new_node = parent.add_object_type(*args)
192
        self._after_add(new_node)
193
        return new_node
194
195
    def add_folder(self, *args):
196
        parent = self.modeler.tree_ui.get_current_node()
197
        logger.info("Creating folder with args: %s", args)
198
        new_node = parent.add_folder(*args)
199
        self._after_add(new_node)
200
        return new_node
201
202
    def add_object(self, *args):
203
        parent = self.modeler.tree_ui.get_current_node()
204
        logger.info("Creating object with args: %s", args)
205
        nodeid, bname, otype = args
206
        new_nodes = instantiate(parent, otype, bname=bname, nodeid=nodeid, dname=ua.LocalizedText(bname.Name))
207
        self._after_add(new_nodes)
208
        return new_nodes
209
210
    def add_data_type(self, *args):
211
        parent = self.modeler.tree_ui.get_current_node()
212
        logger.info("Creating data type with args: %s", args)
213
        new_node = parent.add_data_type(*args)
214
        self._after_add(new_node)
215
        return new_node
216
217
    def add_variable(self, *args):
218
        parent = self.modeler.tree_ui.get_current_node()
219
        logger.info("Creating variable with args: %s", args)
220
        new_node = parent.add_variable(*args)
221
        self._after_add(new_node)
222
        return new_node
223
224
    def add_property(self, *args):
225
        parent = self.modeler.tree_ui.get_current_node()
226
        logger.info("Creating property with args: %s", args)
227
        self.settings.setValue("last_datatype", args[4])
228
        new_node = parent.add_property(*args)
229
        self._after_add(new_node)
230
        return new_node
231
232
    def add_variable_type(self, *args):
233
        parent = self.modeler.tree_ui.get_current_node()
234
        logger.info("Creating variable type with args: %s", args)
235
        nodeid, bname, datatype = args
236
        new_node = parent.add_variable_type(nodeid, bname, datatype.nodeid)
237
        self._after_add(new_node)
238
        return new_node
239
240
    @trycatchslot
241
    def _attr_written(self, attr, dv):
242
        self.modified = True
243
        if attr == ua.AttributeIds.BrowseName:
244
            self.modeler.tree_ui.update_browse_name_current_item(dv.Value.Value)
245
        elif attr == ua.AttributeIds.DisplayName:
246
            self.modeler.tree_ui.update_display_name_current_item(dv.Value.Value)
247