Passed
Push — dev ( 97f75d...d1bef8 )
by Olivier
03:40
created

opcua.server.NodeManagementService._add_node()   B

Complexity

Conditions 6

Size

Total Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 6.0045
Metric Value
cc 6
dl 0
loc 55
ccs 38
cts 40
cp 0.95
crap 6.0045
rs 7.8834

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1 1
from threading import RLock
2 1
import logging
3 1
from datetime import datetime
4 1
try:
5 1
    import cPickle as pickle
6 1
except:
7 1
    import pickle
8
9 1
from opcua import ua
10 1
from opcua.server.users import User
11
12
13 1
class AttributeValue(object):
14
15 1
    def __init__(self, value):
16 1
        self.value = value
17 1
        self.value_callback = None
18 1
        self.datachange_callbacks = {}
19
20 1
    def __str__(self):
21
        return "AttributeValue({})".format(self.value)
22 1
    __repr__ = __str__
23
24
25 1
class NodeData(object):
26
27 1
    def __init__(self, nodeid):
28 1
        self.nodeid = nodeid
29 1
        self.attributes = {}
30 1
        self.references = []
31 1
        self.call = None
32
33 1
    def __str__(self):
34
        return "NodeData(id:{}, attrs:{}, refs:{})".format(self.nodeid, self.attributes, self.references)
35 1
    __repr__ = __str__
36
37
38 1
class AttributeService(object):
39
40 1
    def __init__(self, aspace):
41 1
        self.logger = logging.getLogger(__name__)
42 1
        self._aspace = aspace
43
44 1
    def read(self, params):
45 1
        self.logger.debug("read %s", params)
46 1
        res = []
47 1
        for readvalue in params.NodesToRead:
48 1
            res.append(self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
49 1
        return res
50
51 1
    def write(self, params, user=User.Admin):
52 1
        self.logger.debug("write %s as user %s", params, user)
53 1
        res = []
54 1
        for writevalue in params.NodesToWrite:
55 1
            if user != User.Admin:
56 1
                if writevalue.AttributeId != ua.AttributeIds.Value:
57 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
58 1
                    continue
59 1
                al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
60 1
                ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
61 1
                if not al.Value.Value & ua.AccessLevelMask.CurrentWrite or not ual.Value.Value & ua.AccessLevelMask.CurrentWrite:
62 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63 1
                    continue
64 1
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
65 1
        return res
66
67
68 1
class ViewService(object):
69
70 1
    def __init__(self, aspace):
71 1
        self.logger = logging.getLogger(__name__)
72 1
        self._aspace = aspace
73
74 1
    def browse(self, params):
75 1
        self.logger.debug("browse %s", params)
76 1
        res = []
77 1
        for desc in params.NodesToBrowse:
78 1
            res.append(self._browse(desc))
79 1
        return res
80
81 1
    def _browse(self, desc):
82 1
        res = ua.BrowseResult()
83 1
        if desc.NodeId not in self._aspace:
84
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
85
            return res
86 1
        node = self._aspace[desc.NodeId]
87 1
        for ref in node.references:
88 1
            if not self._is_suitable_ref(desc, ref):
89 1
                continue
90 1
            res.References.append(ref)
91 1
        return res
92
93 1
    def _is_suitable_ref(self, desc, ref):
94 1
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
95
            self.logger.debug("%s is not suitable due to direction", ref)
96
            return False
97 1
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
98 1
            self.logger.debug("%s is not suitable due to type", ref)
99 1
            return False
100 1
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
101 1
            self.logger.debug("%s is not suitable due to class", ref)
102 1
            return False
103 1
        self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
104 1
        return True
105
106 1
    def _suitable_reftype(self, ref1, ref2, subtypes):
107
        """
108
        """
109 1
        if ref1.Identifier == ref2.Identifier:
110 1
            return True
111 1
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
112
            return False
113 1
        oktypes = self._get_sub_ref(ref1)
114 1
        return ref2 in oktypes
115
116 1
    def _get_sub_ref(self, ref):
117 1
        res = []
118 1
        nodedata = self._aspace[ref]
119 1
        for ref in nodedata.references:
120 1
            if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype:
121 1
                res.append(ref.NodeId)
122 1
                res += self._get_sub_ref(ref.NodeId)
123 1
        return res
124
125 1
    def _suitable_direction(self, desc, isforward):
126 1
        if desc == ua.BrowseDirection.Both:
127
            return True
128 1
        if desc == ua.BrowseDirection.Forward and isforward:
129 1
            return True
130
        return False
131
132 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
133 1
        self.logger.debug("translate browsepath: %s", browsepaths)
134 1
        results = []
135 1
        for path in browsepaths:
136 1
            results.append(self._translate_browsepath_to_nodeid(path))
137 1
        return results
138
139 1
    def _translate_browsepath_to_nodeid(self, path):
140 1
        self.logger.debug("looking at path: %s", path)
141 1
        res = ua.BrowsePathResult()
142 1
        if path.StartingNode not in self._aspace:
143 1
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
144 1
            return res
145 1
        current = path.StartingNode
146 1
        for el in path.RelativePath.Elements:
147 1
            nodeid = self._find_element_in_node(el, current)
148 1
            if not nodeid:
149 1
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
150 1
                return res
151 1
            current = nodeid
152 1
        target = ua.BrowsePathTarget()
153 1
        target.TargetId = current
154 1
        target.RemainingPathIndex = 4294967295
155 1
        res.Targets = [target]
156 1
        return res
157
158 1
    def _find_element_in_node(self, el, nodeid):
159 1
        nodedata = self._aspace[nodeid]
160 1
        for ref in nodedata.references:
161
            # FIXME: here we should check other arguments!!
162 1
            if ref.BrowseName == el.TargetName:
163 1
                return ref.NodeId
164 1
        self.logger.info("element %s was not found in node %s", el, nodeid)
165 1
        return None
166
167
168 1
class NodeManagementService(object):
169
170 1
    def __init__(self, aspace):
171 1
        self.logger = logging.getLogger(__name__)
172 1
        self._aspace = aspace
173
174 1
    def add_nodes(self, addnodeitems, user=User.Admin):
175 1
        results = []
176 1
        for item in addnodeitems:
177 1
            results.append(self._add_node(item, user))
178 1
        return results
179
180 1
    def _add_node(self, item, user):
181 1
        result = ua.AddNodesResult()
182
183 1
        if item.RequestedNewNodeId in self._aspace:
184 1
            self.logger.warning("AddNodesItem: node already exists")
185 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
186 1
            return result
187 1
        nodedata = NodeData(item.RequestedNewNodeId)
188
        # add common attrs
189 1
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(ua.DataValue(ua.Variant(item.RequestedNewNodeId, ua.VariantType.NodeId)))
190 1
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName)))
191 1
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32)))
192
        # add requested attrs
193 1
        self._add_nodeattributes(item.NodeAttributes, nodedata)
194
195
        # add parent
196 1
        if item.ParentNodeId == ua.NodeId():
197
            #self.logger.warning("add_node: creating node %s without parent", item.RequestedNewNodeId)
198 1
            pass
199 1
        elif item.ParentNodeId not in self._aspace:
200
            #self.logger.warning("add_node: while adding node %s, requested parent node %s does not exists", item.RequestedNewNodeId, item.ParentNodeId)
201
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
202
            return result
203
        else:
204 1
            if not user == User.Admin:
205 1
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
206 1
                return result
207
208 1
            desc = ua.ReferenceDescription()
209 1
            desc.ReferenceTypeId = item.ReferenceTypeId
210 1
            desc.NodeId = item.RequestedNewNodeId
211 1
            desc.NodeClass = item.NodeClass
212 1
            desc.BrowseName = item.BrowseName
213 1
            desc.DisplayName = ua.LocalizedText(item.BrowseName.Name)
214 1
            desc.TypeDefinition = item.TypeDefinition
215 1
            desc.IsForward = True
216 1
            self._aspace[item.ParentNodeId].references.append(desc)
217
218
        # now add our node to db
219 1
        self._aspace[item.RequestedNewNodeId] = nodedata
220
221
        # add type definition
222 1
        if item.TypeDefinition != ua.NodeId():
223 1
            addref = ua.AddReferencesItem()
224 1
            addref.SourceNodeId = item.RequestedNewNodeId
225 1
            addref.IsForward = True
226 1
            addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
227 1
            addref.TargetNodeId = item.TypeDefinition
228 1
            addref.TargetNodeClass = ua.NodeClass.DataType
229 1
            self._add_reference(addref, user)
230
231 1
        result.StatusCode = ua.StatusCode()
232 1
        result.AddedNodeId = item.RequestedNewNodeId
233
234 1
        return result
235
236 1
    def delete_nodes(self, deletenodeitems, user=User.Admin):
237
        results = []
238
        for item in deletenodeitems:
239
            results.append(self._delete_node(item, user))
240
        return results
241
242 1
    def _delete_node(self, item, user):
243
        if not user == User.Admin:
244
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
245
246
        if item.NodeId not in self._aspace:
247
            self.logger.warning("DeleteNodesItem: node does not exists")
248
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
249
250
        if item.DeleteTargetReferences:
251
            for elem in self._aspace.keys():
252
                for rdesc in self._aspace[elem].references:
253
                    if rdesc.NodeId == item.NodeId:
254
                        self._aspace[elem].references.remove(rdesc)
255
256
        for handle, callback in list(self._aspace[item.NodeId].attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
257
            try:
258
                callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
259
                self._aspace.delete_datachange_callback(handle)
260
            except Exception as ex:
261
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
262
263
        del self._aspace[item.NodeId]
264
265
        return ua.StatusCode()
266
267 1
    def add_references(self, refs, user=User.Admin):
268 1
        result = []
269 1
        for ref in refs:
270 1
            result.append(self._add_reference(ref, user))
271 1
        return result
272
273 1
    def _add_reference(self, addref, user):
274 1
        if addref.SourceNodeId not in self._aspace:
275
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
276 1
        if addref.TargetNodeId not in self._aspace:
277 1
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
278 1
        if not user == User.Admin:
279
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
280 1
        rdesc = ua.ReferenceDescription()
281 1
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
282 1
        rdesc.IsForward = addref.IsForward
283 1
        rdesc.NodeId = addref.TargetNodeId
284 1
        rdesc.NodeClass = addref.TargetNodeClass
285 1
        bname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
286 1
        if bname:
287 1
            rdesc.BrowseName = bname
288 1
        dname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
289 1
        if dname:
290 1
            rdesc.DisplayName = dname
291 1
        self._aspace[addref.SourceNodeId].references.append(rdesc)
292 1
        return ua.StatusCode()
293
294 1
    def delete_references(self, refs, user=User.Admin):
295
        result = []
296
        for ref in refs:
297
            result.append(self._delete_reference(ref, user))
298
        return result
299
300 1
    def _delete_reference(self, item, user):
301
        if item.SourceNodeId not in self._aspace:
302
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
303
        if item.TargetNodeId not in self._aspace:
304
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
305
        if not user == User.Admin:
306
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
307
308
        for rdesc in self._aspace[item.SourceNodeId].references:
309
            if rdesc.NodeId is item.TargetNodeId:
310
                if rdesc.RefrenceTypeId != item.RefrenceTypeId:
311
                    return ua.StatusCode(ua.StatusCode.BadReferenceTypeInvalid)
312
                if rdesc.IsForward == item.IsForward or item.DeleteBidirectional:
313
                    self._aspace[item.SourceNodeId].references.remove(rdesc)
314
315
        for rdesc in self._aspace[item.TargetNodeId].references:
316
            if rdesc.NodeId is item.SourceNodeId:
317
                if rdesc.RefrenceTypeId != item.RefrenceTypeId:
318
                    return ua.StatusCode(ua.StatusCode.BadReferenceTypeInvalid)
319
                if rdesc.IsForward == item.IsForward or item.DeleteBidirectional:
320
                    self._aspace[item.SourceNodeId].references.remove(rdesc)
321
322
        return ua.StatusCode()
323
324 1
    def _add_node_attr(self, item, nodedata, name, vtype=None):
325 1
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
326 1
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
327 1
            dv.ServerTimestamp = datetime.utcnow()
328 1
            dv.SourceTimestamp = datetime.utcnow()
329 1
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
330
331 1
    def _add_nodeattributes(self, item, nodedata):
332 1
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
333 1
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.Int32)
334 1
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
335 1
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
336 1
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
337 1
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
338 1
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
339 1
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
340 1
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
341 1
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
342 1
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
343 1
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
344 1
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
345 1
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.UInt32)
346 1
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
347 1
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
348 1
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
349 1
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Byte)
350 1
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
351 1
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
352 1
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.Byte)
353 1
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
354 1
        self._add_node_attr(item, nodedata, "Value")
355
356
357 1
class MethodService(object):
358
359 1
    def __init__(self, aspace):
360 1
        self.logger = logging.getLogger(__name__)
361 1
        self._aspace = aspace
362
363 1
    def call(self, methods):
364 1
        results = []
365 1
        for method in methods:
366 1
            results.append(self._call(method))
367 1
        return results
368
369 1
    def _call(self, method):
370 1
        res = ua.CallMethodResult()
371 1
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
372 1
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
373
        else:
374 1
            node = self._aspace[method.MethodId]
375 1
            if node.call is None:
376
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
377
            else:
378 1
                try:
379 1
                    res.OutputArguments = node.call(method.ObjectId, *method.InputArguments)
380 1
                    for _ in method.InputArguments:
381 1
                        res.InputArgumentResults.append(ua.StatusCode())
382 1
                except Exception:
383 1
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
384 1
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
385 1
        return res
386
387
388 1
class AddressSpace(object):
389
390
    """
391
    The address space object stores all the nodes og the OPC-UA server
392
    and helper methods.
393
    The methods are threadsafe
394
    """
395
396 1
    def __init__(self):
397 1
        self.logger = logging.getLogger(__name__)
398 1
        self._nodes = {}
399 1
        self._lock = RLock()  # FIXME: should use multiple reader, one writter pattern
400 1
        self._datachange_callback_counter = 200
401 1
        self._handle_to_attribute_map = {}
402
403 1
    def __getitem__(self, nodeid):
404 1
        with self._lock:
405 1
            return self._nodes.__getitem__(nodeid)
406
407 1
    def __setitem__(self, nodeid, value):
408 1
        with self._lock:
409 1
            return self._nodes.__setitem__(nodeid, value)
410
411 1
    def __contains__(self, nodeid):
412 1
        with self._lock:
413 1
            return self._nodes.__contains__(nodeid)
414
415 1
    def __delitem__(self, nodeid):
416
        with self._lock:
417
            self._nodes.__delitem__(nodeid)
418
419 1
    def keys(self):
420
        with self._lock:
421
            return self._nodes.keys()
422
423 1
    def dump(self, path):
424
        """
425
        dump address space as binary to file
426
        """
427
        with open(path, 'wb') as f:
428
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
429
430 1
    def load(self, path):
431
        """
432
        load address space from file, overwritting everything current address space
433
        """
434
        with open(path, 'rb') as f:
435
            self._nodes = pickle.load(f)
436
437 1
    def get_attribute_value(self, nodeid, attr):
438 1
        with self._lock:
439
            #self.logger.debug("get attr val: %s %s", nodeid, attr)
440 1
            if nodeid not in self._nodes:
441 1
                dv = ua.DataValue()
442 1
                dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
443 1
                return dv
444 1
            node = self._nodes[nodeid]
445 1
            if attr not in node.attributes:
446 1
                dv = ua.DataValue()
447 1
                dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
448 1
                return dv
449 1
            attval = node.attributes[attr]
450 1
            if attval.value_callback:
451
                return attval.value_callback()
452 1
            return attval.value
453
454 1
    def set_attribute_value(self, nodeid, attr, value):
455 1
        with self._lock:
456 1
            self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
457 1
            if nodeid not in self._nodes:
458 1
                return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
459 1
            node = self._nodes[nodeid]
460 1
            if attr not in node.attributes:
461 1
                return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
462 1
            if not value.SourceTimestamp:
463 1
                value.SourceTimestamp = datetime.utcnow()
464 1
            if not value.ServerTimestamp:
465 1
                value.ServerTimestamp = datetime.utcnow()
466
467 1
            attval = node.attributes[attr]
468 1
            old = attval.value
469 1
            attval.value = value
470 1
            cbs = []
471 1
            if old.Value != value.Value:  # only send call callback when a value change has happend
472 1
                cbs = list(attval.datachange_callbacks.items())
473
474 1
        for k, v in cbs:
475 1
            try:
476 1
                v(k, value)
477
            except Exception as ex:
478
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
479
480 1
        return ua.StatusCode()
481
482 1
    def add_datachange_callback(self, nodeid, attr, callback):
483 1
        with self._lock:
484 1
            self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
485 1
            if nodeid not in self._nodes:
486
                return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
487 1
            node = self._nodes[nodeid]
488 1
            if attr not in node.attributes:
489 1
                return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
490 1
            attval = node.attributes[attr]
491 1
            self._datachange_callback_counter += 1
492 1
            handle = self._datachange_callback_counter
493 1
            attval.datachange_callbacks[handle] = callback
494 1
            self._handle_to_attribute_map[handle] = (nodeid, attr)
495 1
            return ua.StatusCode(), handle
496
497 1
    def delete_datachange_callback(self, handle):
498 1
        with self._lock:
499 1
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
500 1
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
501
502 1
    def add_method_callback(self, methodid, callback):
503 1
        with self._lock:
504 1
            node = self._nodes[methodid]
505
            node.call = callback
506