Completed
Pull Request — master (#114)
by Denis
02:22
created

opcua.server.NodeManagementService._add_node()   B

Complexity

Conditions 6

Size

Total Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 6.0045
Metric Value
cc 6
dl 0
loc 55
ccs 38
cts 40
cp 0.95
crap 6.0045
rs 7.8834

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1 1
from threading import RLock
2 1
import logging
3 1
from datetime import datetime
4 1
try:
5 1
    import cPickle as pickle
6 1
except:
7 1
    import pickle
8
9 1
from opcua import ua
10 1
from opcua.server.users import User
11
12
13 1
class AttributeValue(object):
14
15 1
    def __init__(self, value):
16 1
        self.value = value
17 1
        self.value_callback = None
18 1
        self.datachange_callbacks = {}
19
20 1
    def __str__(self):
21
        return "AttributeValue({})".format(self.value)
22 1
    __repr__ = __str__
23
24
25 1
class NodeData(object):
26
27 1
    def __init__(self, nodeid):
28 1
        self.nodeid = nodeid
29 1
        self.attributes = {}
30 1
        self.references = []
31 1
        self.call = None
32
33 1
    def __str__(self):
34
        return "NodeData(id:{}, attrs:{}, refs:{})".format(self.nodeid, self.attributes, self.references)
35 1
    __repr__ = __str__
36
37
38 1
class AttributeService(object):
39
40 1
    def __init__(self, aspace):
41 1
        self.logger = logging.getLogger(__name__)
42 1
        self._aspace = aspace
43
44 1
    def read(self, params):
45 1
        self.logger.debug("read %s", params)
46 1
        res = []
47 1
        for readvalue in params.NodesToRead:
48 1
            res.append(self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
49 1
        return res
50
51 1
    def write(self, params, user=User.Admin):
52 1
        self.logger.debug("write %s as user %s", params, user)
53 1
        res = []
54 1
        for writevalue in params.NodesToWrite:
55 1
            if user != User.Admin:
56 1
                if writevalue.AttributeId != ua.AttributeIds.Value:
57 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
58 1
                    continue
59 1
                al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
60 1
                ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
61 1
                if not al.Value.Value & ua.AccessLevelMask.CurrentWrite or not ual.Value.Value & ua.AccessLevelMask.CurrentWrite:
62 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63 1
                    continue
64 1
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
65 1
        return res
66
67
68 1
class ViewService(object):
69
70 1
    def __init__(self, aspace):
71 1
        self.logger = logging.getLogger(__name__)
72 1
        self._aspace = aspace
73
74 1
    def browse(self, params):
75 1
        self.logger.debug("browse %s", params)
76 1
        res = []
77 1
        for desc in params.NodesToBrowse:
78 1
            res.append(self._browse(desc))
79 1
        return res
80
81 1
    def _browse(self, desc):
82 1
        res = ua.BrowseResult()
83 1
        if desc.NodeId not in self._aspace:
84
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
85
            return res
86 1
        node = self._aspace[desc.NodeId]
87 1
        for ref in node.references:
88 1
            if not self._is_suitable_ref(desc, ref):
89 1
                continue
90 1
            res.References.append(ref)
91 1
        return res
92
93 1
    def _is_suitable_ref(self, desc, ref):
94 1
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
95
            self.logger.debug("%s is not suitable due to direction", ref)
96
            return False
97 1
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
98 1
            self.logger.debug("%s is not suitable due to type", ref)
99 1
            return False
100 1
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
101 1
            self.logger.debug("%s is not suitable due to class", ref)
102 1
            return False
103 1
        self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
104 1
        return True
105
106 1
    def _suitable_reftype(self, ref1, ref2, subtypes):
107
        """
108
        """
109 1
        if ref1.Identifier == ref2.Identifier:
110 1
            return True
111 1
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
112
            return False
113 1
        oktypes = self._get_sub_ref(ref1)
114 1
        return ref2 in oktypes
115
116 1
    def _get_sub_ref(self, ref):
117 1
        res = []
118 1
        nodedata = self._aspace[ref]
119 1
        for ref in nodedata.references:
120 1
            if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype:
121 1
                res.append(ref.NodeId)
122 1
                res += self._get_sub_ref(ref.NodeId)
123 1
        return res
124
125 1
    def _suitable_direction(self, desc, isforward):
126 1
        if desc == ua.BrowseDirection.Both:
127
            return True
128 1
        if desc == ua.BrowseDirection.Forward and isforward:
129 1
            return True
130
        return False
131
132 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
133 1
        self.logger.debug("translate browsepath: %s", browsepaths)
134 1
        results = []
135 1
        for path in browsepaths:
136 1
            results.append(self._translate_browsepath_to_nodeid(path))
137 1
        return results
138
139 1
    def _translate_browsepath_to_nodeid(self, path):
140 1
        self.logger.debug("looking at path: %s", path)
141 1
        res = ua.BrowsePathResult()
142 1
        if path.StartingNode not in self._aspace:
143 1
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
144 1
            return res
145 1
        current = path.StartingNode
146 1
        for el in path.RelativePath.Elements:
147 1
            nodeid = self._find_element_in_node(el, current)
148 1
            if not nodeid:
149 1
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
150 1
                return res
151 1
            current = nodeid
152 1
        target = ua.BrowsePathTarget()
153 1
        target.TargetId = current
154 1
        target.RemainingPathIndex = 4294967295
155 1
        res.Targets = [target]
156 1
        return res
157
158 1
    def _find_element_in_node(self, el, nodeid):
159 1
        nodedata = self._aspace[nodeid]
160 1
        for ref in nodedata.references:
161
            # FIXME: here we should check other arguments!!
162 1
            if ref.BrowseName == el.TargetName:
163 1
                return ref.NodeId
164 1
        self.logger.info("element %s was not found in node %s", el, nodeid)
165 1
        return None
166
167
168 1
class NodeManagementService(object):
169
170 1
    def __init__(self, aspace):
171 1
        self.logger = logging.getLogger(__name__)
172 1
        self._aspace = aspace
173
174 1
    def add_nodes(self, addnodeitems, user=User.Admin):
175 1
        results = []
176 1
        for item in addnodeitems:
177 1
            results.append(self._add_node(item, user))
178 1
        return results
179
180 1
    def _add_node(self, item, user):
181 1
        result = ua.AddNodesResult()
182
183 1
        if item.RequestedNewNodeId in self._aspace:
184 1
            self.logger.warning("AddNodesItem: node already exists")
185 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
186 1
            return result
187 1
        nodedata = NodeData(item.RequestedNewNodeId)
188
        # add common attrs
189 1
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(ua.DataValue(ua.Variant(item.RequestedNewNodeId, ua.VariantType.NodeId)))
190 1
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName)))
191 1
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32)))
192
        # add requested attrs
193 1
        self._add_nodeattributes(item.NodeAttributes, nodedata)
194
195
        # add parent
196 1
        if item.ParentNodeId == ua.NodeId():
197
            #self.logger.warning("add_node: creating node %s without parent", item.RequestedNewNodeId)
198 1
            pass
199 1
        elif item.ParentNodeId not in self._aspace:
200
            #self.logger.warning("add_node: while adding node %s, requested parent node %s does not exists", item.RequestedNewNodeId, item.ParentNodeId)
201
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
202
            return result
203
        else:
204 1
            if not user == User.Admin:
205 1
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
206 1
                return result
207
208 1
            desc = ua.ReferenceDescription()
209 1
            desc.ReferenceTypeId = item.ReferenceTypeId
210 1
            desc.NodeId = item.RequestedNewNodeId
211 1
            desc.NodeClass = item.NodeClass
212 1
            desc.BrowseName = item.BrowseName
213 1
            desc.DisplayName = ua.LocalizedText(item.BrowseName.Name)
214 1
            desc.TypeDefinition = item.TypeDefinition
215 1
            desc.IsForward = True
216 1
            self._aspace[item.ParentNodeId].references.append(desc)
217
218
        # now add our node to db
219 1
        self._aspace[item.RequestedNewNodeId] = nodedata
220
221
        # add type definition
222 1
        if item.TypeDefinition != ua.NodeId():
223 1
            addref = ua.AddReferencesItem()
224 1
            addref.SourceNodeId = item.RequestedNewNodeId
225 1
            addref.IsForward = True
226 1
            addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
227 1
            addref.TargetNodeId = item.TypeDefinition
228 1
            addref.TargetNodeClass = ua.NodeClass.DataType
229 1
            self._add_reference(addref, user)
230
231 1
        result.StatusCode = ua.StatusCode()
232 1
        result.AddedNodeId = item.RequestedNewNodeId
233
234 1
        return result
235
236 1
    def delete_nodes(self, deletenodeitems, user=User.Admin):
237
        results = []
238
        for item in deletenodeitems:
239
            results.append(self._delete_node(item, user))
240
        return results
241
242 1
    def _delete_node(self, item, user):
243
        #TODO: Check if node is monitored if it is: "When any of the Nodes deleted by an invocation of this Service is being monitored, then a Notification containing the status code Bad_NodeIdUnknown is sent to the monitoring Client indicating that the Node has been deleted."
244
        if not user == User.Admin:
245
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
246
247
        if item.NodeId not in self._aspace:
248
            self.logger.warning("DeleteNodesItem: node does not exists")
249
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
250
251
        if item.DeleteTargetReferences:
252
            for elem in self._aspace.keys():
253
                for rdesc in self._aspace[elem].references:
254
                    if rdesc.NodeId == item.NodeId:
255
                        self._aspace[elem].references.remove(rdesc)
256
257
        del self._aspace[item.NodeId]
258
259
        return ua.StatusCode()
260
261 1
    def add_references(self, refs, user=User.Admin):
262 1
        result = []
263 1
        for ref in refs:
264 1
            result.append(self._add_reference(ref, user))
265 1
        return result
266
267 1
    def _add_reference(self, addref, user):
268 1
        if addref.SourceNodeId not in self._aspace:
269
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
270 1
        if addref.TargetNodeId not in self._aspace:
271 1
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
272 1
        if not user == User.Admin:
273
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
274 1
        rdesc = ua.ReferenceDescription()
275 1
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
276 1
        rdesc.IsForward = addref.IsForward
277 1
        rdesc.NodeId = addref.TargetNodeId
278 1
        rdesc.NodeClass = addref.TargetNodeClass
279 1
        bname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
280 1
        if bname:
281 1
            rdesc.BrowseName = bname
282 1
        dname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
283 1
        if dname:
284 1
            rdesc.DisplayName = dname
285 1
        self._aspace[addref.SourceNodeId].references.append(rdesc)
286 1
        return ua.StatusCode()
287
288 1
    def delete_references(self, refs, user=User.Admin):
289
        result = []
290
        for ref in refs:
291
            result.append(self._delete_reference(ref, user))
292
        return result
293
294 1
    def _delete_reference(self, item, user):
295
        if item.SourceNodeId not in self._aspace:
296
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
297
        if item.TargetNodeId not in self._aspace:
298
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
299
        if not user == User.Admin:
300
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
301
302
        for rdesc in self._aspace[item.SourceNodeId].references:
303
            if rdesc.NodeId is item.TargetNodeId:
304
                if rdesc.RefrenceTypeId != item.RefrenceTypeId:
305
                    return ua.StatusCode(ua.StatusCode.BadReferenceTypeInvalid)
306
                if rdesc.IsForward == item.IsForward or item.DeleteBidirectional:
307
                    self._aspace[item.SourceNodeId].references.remove(rdesc)
308
309
        for rdesc in self._aspace[item.TargetNodeId].references:
310
            if rdesc.NodeId is item.SourceNodeId:
311
                if rdesc.RefrenceTypeId != item.RefrenceTypeId:
312
                    return ua.StatusCode(ua.StatusCode.BadReferenceTypeInvalid)
313
                if rdesc.IsForward == item.IsForward or item.DeleteBidirectional:
314
                    self._aspace[item.SourceNodeId].references.remove(rdesc)
315
316
        return ua.StatusCode()
317
318 1
    def _add_node_attr(self, item, nodedata, name, vtype=None):
319 1
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
320 1
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
321 1
            dv.ServerTimestamp = datetime.utcnow()
322 1
            dv.SourceTimestamp = datetime.utcnow()
323 1
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
324
325 1
    def _add_nodeattributes(self, item, nodedata):
326 1
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
327 1
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.Int32)
328 1
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
329 1
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
330 1
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
331 1
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
332 1
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
333 1
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
334 1
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
335 1
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
336 1
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
337 1
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
338 1
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
339 1
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.UInt32)
340 1
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
341 1
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
342 1
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
343 1
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Byte)
344 1
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
345 1
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
346 1
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.Byte)
347 1
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
348 1
        self._add_node_attr(item, nodedata, "Value")
349
350
351 1
class MethodService(object):
352
353 1
    def __init__(self, aspace):
354 1
        self.logger = logging.getLogger(__name__)
355 1
        self._aspace = aspace
356
357 1
    def call(self, methods):
358 1
        results = []
359 1
        for method in methods:
360 1
            results.append(self._call(method))
361 1
        return results
362
363 1
    def _call(self, method):
364 1
        res = ua.CallMethodResult()
365 1
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
366 1
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
367
        else:
368 1
            node = self._aspace[method.MethodId]
369 1
            if node.call is None:
370
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
371
            else:
372 1
                try:
373 1
                    res.OutputArguments = node.call(method.ObjectId, *method.InputArguments)
374 1
                    for _ in method.InputArguments:
375 1
                        res.InputArgumentResults.append(ua.StatusCode())
376 1
                except Exception:
377 1
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
378 1
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
379 1
        return res
380
381
382 1
class AddressSpace(object):
383
384
    """
385
    The address space object stores all the nodes og the OPC-UA server
386
    and helper methods.
387
    The methods are threadsafe
388
    """
389
390 1
    def __init__(self):
391 1
        self.logger = logging.getLogger(__name__)
392 1
        self._nodes = {}
393 1
        self._lock = RLock()  # FIXME: should use multiple reader, one writter pattern
394 1
        self._datachange_callback_counter = 200
395 1
        self._handle_to_attribute_map = {}
396
397 1
    def __getitem__(self, nodeid):
398 1
        with self._lock:
399 1
            return self._nodes.__getitem__(nodeid)
400
401 1
    def __setitem__(self, nodeid, value):
402 1
        with self._lock:
403 1
            return self._nodes.__setitem__(nodeid, value)
404
405 1
    def __contains__(self, nodeid):
406 1
        with self._lock:
407 1
            return self._nodes.__contains__(nodeid)
408
409 1
    def __delitem__(self, nodeid):
410
        with self._lock:
411
            self._nodes.__delitem__(nodeid)
412
413 1
    def keys(self):
414
        with self._lock:
415
            return self._nodes.keys()
416
417 1
    def dump(self, path):
418
        """
419
        dump address space as binary to file
420
        """
421
        with open(path, 'wb') as f:
422
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
423
424 1
    def load(self, path):
425
        """
426
        load address space from file, overwritting everything current address space
427
        """
428
        with open(path, 'rb') as f:
429
            self._nodes = pickle.load(f)
430
431 1
    def get_attribute_value(self, nodeid, attr):
432 1
        with self._lock:
433
            #self.logger.debug("get attr val: %s %s", nodeid, attr)
434 1
            if nodeid not in self._nodes:
435 1
                dv = ua.DataValue()
436 1
                dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
437 1
                return dv
438 1
            node = self._nodes[nodeid]
439 1
            if attr not in node.attributes:
440 1
                dv = ua.DataValue()
441 1
                dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
442 1
                return dv
443 1
            attval = node.attributes[attr]
444 1
            if attval.value_callback:
445
                return attval.value_callback()
446 1
            return attval.value
447
448 1
    def set_attribute_value(self, nodeid, attr, value):
449 1
        with self._lock:
450 1
            self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
451 1
            if nodeid not in self._nodes:
452 1
                return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
453 1
            node = self._nodes[nodeid]
454 1
            if attr not in node.attributes:
455 1
                return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
456 1
            if not value.SourceTimestamp:
457 1
                value.SourceTimestamp = datetime.utcnow()
458 1
            if not value.ServerTimestamp:
459 1
                value.ServerTimestamp = datetime.utcnow()
460
461 1
            attval = node.attributes[attr]
462 1
            old = attval.value
463 1
            attval.value = value
464 1
            cbs = []
465 1
            if old.Value != value.Value:  # only send call callback when a value change has happend
466 1
                cbs = list(attval.datachange_callbacks.items())
467
468 1
        for k, v in cbs:
469 1
            try:
470 1
                v(k, value)
471
            except Exception as ex:
472
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
473
474 1
        return ua.StatusCode()
475
476 1
    def add_datachange_callback(self, nodeid, attr, callback):
477 1
        with self._lock:
478 1
            self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
479 1
            if nodeid not in self._nodes:
480
                return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
481 1
            node = self._nodes[nodeid]
482 1
            if attr not in node.attributes:
483 1
                return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
484 1
            attval = node.attributes[attr]
485 1
            self._datachange_callback_counter += 1
486 1
            handle = self._datachange_callback_counter
487 1
            attval.datachange_callbacks[handle] = callback
488 1
            self._handle_to_attribute_map[handle] = (nodeid, attr)
489 1
            return ua.StatusCode(), handle
490
491 1
    def delete_datachange_callback(self, handle):
492 1
        with self._lock:
493 1
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
494 1
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
495
496 1
    def add_method_callback(self, methodid, callback):
497 1
        with self._lock:
498 1
            node = self._nodes[methodid]
499
            node.call = callback
500