Completed
Pull Request — master (#51)
by Olivier
30s
created

ModelManager   B

Complexity

Total Complexity 42

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 42
c 3
b 0
f 0
dl 0
loc 222
rs 8.295

24 Methods

Rating   Name   Duplication   Size   Complexity  
A new_model() 0 17 2
A import_xml() 0 8 2
A __init__() 0 9 1
A delete_node() 0 8 4
A paste_node() 0 11 2
A close_model() 0 9 3
A save_ua_model() 0 11 2
A save_xml() 0 10 1
A _open_xml() 0 5 1
A _attr_written() 0 7 3
A _after_add() 0 8 2
A _open_ua_model() 0 9 2
A add_data_type() 0 6 1
A open() 0 5 2
A open_ua_model() 0 7 2
A add_object() 0 7 1
A open_xml() 0 7 2
A add_property() 0 7 1
A add_method() 0 8 1
A add_folder() 0 6 1
A add_variable_type() 0 7 1
A _get_path() 0 8 3
A add_object_type() 0 5 1
A add_variable() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like ModelManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
import os
3
import xml.etree.ElementTree as Et
4
5
6
from PyQt5.QtCore import pyqtSignal, QObject, QSettings
7
8
from opcua import ua
9
from opcua import copy_node
10
from opcua.common.instantiate import instantiate
11
from opcua.common.ua_utils import get_node_children
12
13
from uawidgets.utils import trycatchslot
14
15
from uamodeler.server_manager import ServerManager
16
17
logger = logging.getLogger(__name__)
18
19
20
class ModelManager(QObject):
21
    """
22
    Manage our model. loads xml, start and close, add nodes
23
    No dialogs at that level, only api
24
    """
25
26
    error = pyqtSignal(Exception)
27
28
    def __init__(self, modeler):
29
        QObject.__init__(self, modeler)
30
        self.modeler = modeler
31
        self.server_mgr = ServerManager(self.modeler.ui.actionUseOpenUa)
32
        self.new_nodes = []  # the added nodes we will save
33
        self.current_path = None
34
        self.settings = QSettings()
35
        self.modified = False
36
        self.modeler.attrs_ui.attr_written.connect(self._attr_written)
37
38
    def delete_node(self, node):
39
        if node:
40
            nodes = get_node_children(node)
41
            for n in nodes:
42
                n.delete()
43
                if n in self.new_nodes:
44
                    self.new_nodes.remove(n)
45
            self.modeler.tree_ui.remove_current_item()
46
47
    def paste_node(self, node):
48
        parent = self.modeler.get_current_node()
49
        try:
50
            added_nodes = copy_node(parent, node)
51
        except Exception as ex:
52
            self.show_error(ex)
53
            raise
54
        self.new_nodes.extend(added_nodes)
55
        self.modeler.tree_ui.reload_current()
56
        self.modeler.show_refs()
57
        self.modified = True
58
59
    def close_model(self, force=False):
60
        if not force and self.modified:
61
            raise RuntimeError("Model is modified, use force to close it")
62
        self.modeler.actions.disable_all_actions()
63
        self.server_mgr.stop_server()
64
        self.current_path = None
65
        self.modified = False
66
        self.modeler.update_title("")
67
        self.modeler.clear_all_widgets()
68
69
    def new_model(self):
70
        if self.modified:
71
            raise RuntimeError("Model is modified, cannot create new model")
72
        del (self.new_nodes[:])  # empty list while keeping reference
73
74
        endpoint = "opc.tcp://0.0.0.0:48400/freeopcua/uamodeler/"
75
        logger.info("Starting server on %s", endpoint)
76
        self.server_mgr.start_server(endpoint)
77
78
        self.modeler.tree_ui.set_root_node(self.server_mgr.nodes.root)
79
        self.modeler.idx_ui.set_node(self.server_mgr.get_node(ua.ObjectIds.Server_NamespaceArray))
80
        self.modeler.nodesets_ui.set_server_mgr(self.server_mgr)
81
        self.modified = False
82
        self.modeler.actions.enable_model_actions()
83
        self.current_path = None
84
        self.modeler.update_title("No Name")
85
        return True
86
87
    def import_xml(self, path):
88
        new_nodes = self.server_mgr.import_xml(path)
89
        self.new_nodes.extend([self.server_mgr.get_node(node) for node in new_nodes])
90
        self.modified = True
91
        # we maybe should only reload the imported nodes
92
        self.modeler.tree_ui.reload()
93
        self.modeler.idx_ui.reload()
94
        return path
95
96
    def open_xml(self, path):
97
        self.new_model()
98
        try:
99
            self._open_xml(path)
100
        except:
101
            self.close_model(force=True)
102
            raise
103
104
    def _open_xml(self, path):
105
        path = self.import_xml(path)
106
        self.modified = False
107
        self.current_path = path
108
        self.modeler.update_title(self.current_path)
109
110
    def open(self, path):
111
        if path.endswith(".xml"):
112
            self.open_xml(path)
113
        else:
114
            self.open_ua_model(path)
115
116
    def open_ua_model(self, path):
117
        self.new_model()
118
        try:
119
            self._open_ua_model(path)
120
        except:
121
            self.close_model(force=True)
122
            raise
123
124
    def _open_ua_model(self, path):
125
        tree = Et.parse(path)
126
        root = tree.getroot()
127
        for ref_el in root.findall("Reference"):
128
            refpath = ref_el.attrib['path']
129
            self.modeler.nodesets_ui.import_nodeset(refpath)
130
        mod_el = root.find("Model")
131
        xmlpath = mod_el.attrib['path']
132
        self._open_xml(xmlpath)
133
134
    def _get_path(self, path):
135
        if path is None:
136
            path = self.current_path
137
        if path is None:
138
            raise ValueError("No path is defined")
139
        self.current_path = os.path.splitext(path)[0] 
140
        self.modeler.update_title(self.current_path)
141
        return self.current_path
142
143
    def save_xml(self, path=None):
144
        path = self._get_path(path)
145
        path += ".xml"
146
        logger.info("Saving nodes to %s", path)
147
        logger.info("Exporting  %s nodes: %s", len(self.new_nodes), self.new_nodes)
148
        logger.info("and namespaces: %s ", self.server_mgr.get_namespace_array()[1:])
149
        uris = self.server_mgr.get_namespace_array()[1:]
150
        self.server_mgr.export_xml(self.new_nodes, uris, path)
151
        self.modified = False
152
        logger.info("%s saved", path)
153
154
    def save_ua_model(self, path=None):
155
        path = self._get_path(path)
156
        model_path = path + ".uamodel"
157
        logger.info("Saving model to %s", model_path)
158
        etree = Et.ElementTree(Et.Element('UAModel'))
159
        node_el = Et.SubElement(etree.getroot(), "Model")
160
        node_el.attrib["path"] = path + ".xml"
161
        for refpath in self.modeler.nodesets_ui.nodesets:
162
            node_el = Et.SubElement(etree.getroot(), "Reference")
163
            node_el.attrib["path"] = refpath 
164
        etree.write(model_path, encoding='utf-8', xml_declaration=True)
165
166
    def _after_add(self, new_nodes):
167
        if isinstance(new_nodes, (list, tuple)):
168
            self.new_nodes.extend(new_nodes)
169
        else:
170
            self.new_nodes.append(new_nodes)
171
        self.modeler.tree_ui.reload_current()
172
        self.modeler.show_refs()
173
        self.modified = True
174
175
    def add_method(self, *args):
176
        logger.info("Creating method type with args: %s", args)
177
        parent = self.modeler.tree_ui.get_current_node()
178
        new_nodes = []
179
        new_node = parent.add_method(*args)
180
        new_nodes.append(new_node)
181
        new_nodes.extend(new_node.get_children())
182
        self._after_add(new_nodes)
183
184
    def add_object_type(self, *args):
185
        logger.info("Creating object type with args: %s", args)
186
        parent = self.modeler.tree_ui.get_current_node()
187
        new_node = parent.add_object_type(*args)
188
        self._after_add(new_node)
189
190
    def add_folder(self, *args):
191
        parent = self.modeler.tree_ui.get_current_node()
192
        logger.info("Creating folder with args: %s", args)
193
        new_node = parent.add_folder(*args)
194
        self._after_add(new_node)
195
        return new_node
196
197
    def add_object(self, *args):
198
        parent = self.modeler.tree_ui.get_current_node()
199
        logger.info("Creating object with args: %s", args)
200
        nodeid, bname, otype = args
201
        new_nodes = instantiate(parent, otype, bname=bname, nodeid=nodeid, dname=ua.LocalizedText(bname.Name))
202
        self._after_add(new_nodes)
203
        return new_nodes
204
205
    def add_data_type(self, *args):
206
        parent = self.modeler.tree_ui.get_current_node()
207
        logger.info("Creating data type with args: %s", args)
208
        new_node = parent.add_data_type(*args)
209
        self._after_add(new_node)
210
        return new_node
211
212
    def add_variable(self, *args):
213
        parent = self.modeler.tree_ui.get_current_node()
214
        logger.info("Creating variable with args: %s", args)
215
        new_node = parent.add_variable(*args)
216
        self._after_add(new_node)
217
        return new_node
218
219
    def add_property(self, *args):
220
        parent = self.modeler.tree_ui.get_current_node()
221
        logger.info("Creating property with args: %s", args)
222
        self.settings.setValue("last_datatype", args[4])
223
        new_node = parent.add_property(*args)
224
        self._after_add(new_node)
225
        return new_node
226
227
    def add_variable_type(self, *args):
228
        parent = self.modeler.tree_ui.get_current_node()
229
        logger.info("Creating variable type with args: %s", args)
230
        nodeid, bname, datatype = args
231
        new_node = parent.add_variable_type(nodeid, bname, datatype.nodeid)
232
        self._after_add(new_node)
233
        return new_node
234
235
    @trycatchslot
236
    def _attr_written(self, attr, dv):
237
        self.modified = True
238
        if attr == ua.AttributeIds.BrowseName:
239
            self.modeler.tree_ui.update_browse_name_current_item(dv.Value.Value)
240
        elif attr == ua.AttributeIds.DisplayName:
241
            self.modeler.tree_ui.update_display_name_current_item(dv.Value.Value)
242