Completed
Pull Request — master (#114)
by Denis
01:56
created

opcua.server.NodeManagementService._add_node()   B

Complexity

Conditions 6

Size

Total Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 6.0045
Metric Value
cc 6
dl 0
loc 55
ccs 38
cts 40
cp 0.95
crap 6.0045
rs 7.8834

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1 1
from threading import RLock
2 1
import logging
3 1
from datetime import datetime
4 1
try:
5 1
    import cPickle as pickle
6 1
except:
7 1
    import pickle
8
9 1
from opcua import ua
10 1
from opcua.server.users import User
11
12
13 1
class AttributeValue(object):
14
15 1
    def __init__(self, value):
16 1
        self.value = value
17 1
        self.value_callback = None
18 1
        self.datachange_callbacks = {}
19
20 1
    def __str__(self):
21
        return "AttributeValue({})".format(self.value)
22 1
    __repr__ = __str__
23
24
25 1
class NodeData(object):
26
27 1
    def __init__(self, nodeid):
28 1
        self.nodeid = nodeid
29 1
        self.attributes = {}
30 1
        self.references = []
31 1
        self.call = None
32
33 1
    def __str__(self):
34
        return "NodeData(id:{}, attrs:{}, refs:{})".format(self.nodeid, self.attributes, self.references)
35 1
    __repr__ = __str__
36
37
38 1
class AttributeService(object):
39
40 1
    def __init__(self, aspace):
41 1
        self.logger = logging.getLogger(__name__)
42 1
        self._aspace = aspace
43
44 1
    def read(self, params):
45 1
        self.logger.debug("read %s", params)
46 1
        res = []
47 1
        for readvalue in params.NodesToRead:
48 1
            res.append(self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
49 1
        return res
50
51 1
    def write(self, params, user=User.Admin):
52 1
        self.logger.debug("write %s as user %s", params, user)
53 1
        res = []
54 1
        for writevalue in params.NodesToWrite:
55 1
            if user != User.Admin:
56 1
                if writevalue.AttributeId != ua.AttributeIds.Value:
57 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
58 1
                    continue
59 1
                al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
60 1
                ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
61 1
                if not al.Value.Value & ua.AccessLevelMask.CurrentWrite or not ual.Value.Value & ua.AccessLevelMask.CurrentWrite:
62 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63 1
                    continue
64 1
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
65 1
        return res
66
67
68 1
class ViewService(object):
69
70 1
    def __init__(self, aspace):
71 1
        self.logger = logging.getLogger(__name__)
72 1
        self._aspace = aspace
73
74 1
    def browse(self, params):
75 1
        self.logger.debug("browse %s", params)
76 1
        res = []
77 1
        for desc in params.NodesToBrowse:
78 1
            res.append(self._browse(desc))
79 1
        return res
80
81 1
    def _browse(self, desc):
82 1
        res = ua.BrowseResult()
83 1
        if desc.NodeId not in self._aspace:
84
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
85
            return res
86 1
        node = self._aspace[desc.NodeId]
87 1
        for ref in node.references:
88 1
            if not self._is_suitable_ref(desc, ref):
89 1
                continue
90 1
            res.References.append(ref)
91 1
        return res
92
93 1
    def _is_suitable_ref(self, desc, ref):
94 1
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
95
            self.logger.debug("%s is not suitable due to direction", ref)
96
            return False
97 1
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
98 1
            self.logger.debug("%s is not suitable due to type", ref)
99 1
            return False
100 1
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
101 1
            self.logger.debug("%s is not suitable due to class", ref)
102 1
            return False
103 1
        self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
104 1
        return True
105
106 1
    def _suitable_reftype(self, ref1, ref2, subtypes):
107
        """
108
        """
109 1
        if ref1.Identifier == ref2.Identifier:
110 1
            return True
111 1
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
112
            return False
113 1
        oktypes = self._get_sub_ref(ref1)
114 1
        return ref2 in oktypes
115
116 1
    def _get_sub_ref(self, ref):
117 1
        res = []
118 1
        nodedata = self._aspace[ref]
119 1
        for ref in nodedata.references:
120 1
            if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype:
121 1
                res.append(ref.NodeId)
122 1
                res += self._get_sub_ref(ref.NodeId)
123 1
        return res
124
125 1
    def _suitable_direction(self, desc, isforward):
126 1
        if desc == ua.BrowseDirection.Both:
127
            return True
128 1
        if desc == ua.BrowseDirection.Forward and isforward:
129 1
            return True
130
        return False
131
132 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
133 1
        self.logger.debug("translate browsepath: %s", browsepaths)
134 1
        results = []
135 1
        for path in browsepaths:
136 1
            results.append(self._translate_browsepath_to_nodeid(path))
137 1
        return results
138
139 1
    def _translate_browsepath_to_nodeid(self, path):
140 1
        self.logger.debug("looking at path: %s", path)
141 1
        res = ua.BrowsePathResult()
142 1
        if path.StartingNode not in self._aspace:
143 1
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
144 1
            return res
145 1
        current = path.StartingNode
146 1
        for el in path.RelativePath.Elements:
147 1
            nodeid = self._find_element_in_node(el, current)
148 1
            if not nodeid:
149 1
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
150 1
                return res
151 1
            current = nodeid
152 1
        target = ua.BrowsePathTarget()
153 1
        target.TargetId = current
154 1
        target.RemainingPathIndex = 4294967295
155 1
        res.Targets = [target]
156 1
        return res
157
158 1
    def _find_element_in_node(self, el, nodeid):
159 1
        nodedata = self._aspace[nodeid]
160 1
        for ref in nodedata.references:
161
            # FIXME: here we should check other arguments!!
162 1
            if ref.BrowseName == el.TargetName:
163 1
                return ref.NodeId
164 1
        self.logger.info("element %s was not found in node %s", el, nodeid)
165 1
        return None
166
167
168 1
class NodeManagementService(object):
169
170 1
    def __init__(self, aspace):
171 1
        self.logger = logging.getLogger(__name__)
172 1
        self._aspace = aspace
173
174 1
    def add_nodes(self, addnodeitems, user=User.Admin):
175 1
        results = []
176 1
        for item in addnodeitems:
177 1
            results.append(self._add_node(item, user))
178 1
        return results
179
180 1
    def _add_node(self, item, user):
181 1
        result = ua.AddNodesResult()
182
183 1
        if item.RequestedNewNodeId in self._aspace:
184 1
            self.logger.warning("AddNodesItem: node already exists")
185 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
186 1
            return result
187 1
        nodedata = NodeData(item.RequestedNewNodeId)
188
        # add common attrs
189 1
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(ua.DataValue(ua.Variant(item.RequestedNewNodeId, ua.VariantType.NodeId)))
190 1
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName)))
191 1
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32)))
192
        # add requested attrs
193 1
        self._add_nodeattributes(item.NodeAttributes, nodedata)
194
195
        # add parent
196 1
        if item.ParentNodeId == ua.NodeId():
197
            #self.logger.warning("add_node: creating node %s without parent", item.RequestedNewNodeId)
198 1
            pass
199 1
        elif item.ParentNodeId not in self._aspace:
200
            #self.logger.warning("add_node: while adding node %s, requested parent node %s does not exists", item.RequestedNewNodeId, item.ParentNodeId)
201
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
202
            return result
203
        else:
204 1
            if not user == User.Admin:
205 1
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
206 1
                return result
207
208 1
            desc = ua.ReferenceDescription()
209 1
            desc.ReferenceTypeId = item.ReferenceTypeId
210 1
            desc.NodeId = item.RequestedNewNodeId
211 1
            desc.NodeClass = item.NodeClass
212 1
            desc.BrowseName = item.BrowseName
213 1
            desc.DisplayName = ua.LocalizedText(item.BrowseName.Name)
214 1
            desc.TypeDefinition = item.TypeDefinition
215 1
            desc.IsForward = True
216 1
            self._aspace[item.ParentNodeId].references.append(desc)
217
218
        # now add our node to db
219 1
        self._aspace[item.RequestedNewNodeId] = nodedata
220
221
        # add type definition
222 1
        if item.TypeDefinition != ua.NodeId():
223 1
            addref = ua.AddReferencesItem()
224 1
            addref.SourceNodeId = item.RequestedNewNodeId
225 1
            addref.IsForward = True
226 1
            addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
227 1
            addref.TargetNodeId = item.TypeDefinition
228 1
            addref.TargetNodeClass = ua.NodeClass.DataType
229 1
            self._add_reference(addref, user)
230
231 1
        result.StatusCode = ua.StatusCode()
232 1
        result.AddedNodeId = item.RequestedNewNodeId
233
234 1
        return result
235
236 1
    def delete_nodes(self, deletenodeitems, user=User.Admin):
237 1
        results = []
238 1
        for item in deletenodeitems:
239 1
            results.append(self._delete_node(item, user))
240 1
        return results
241
242 1
    def _delete_node(self, item, user):
243 1
        #TODO: Check if node is monitored if it is: "When any of the Nodes deleted by an invocation of this Service is being monitored, then a Notification containing the status code Bad_NodeIdUnknown is sent to the monitoring Client indicating that the Node has been deleted."
244
        if not user == User.Admin:
245 1
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
246 1
247 1
        if item.NodeId not in self._aspace:
248
            self.logger.warning("DeleteNodesItem: node does not exists")
249 1
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
250 1
251 1
        if item.DeleteTargetReferences:
252 1
            for elem in self._aspace.keys():
253 1
                for rdesc in self._aspace[elem].references:
254 1
                    if rdesc.NodeId == item.NodeId:
255 1
                        self._aspace[elem].references.remove(rdesc)
256 1
257 1
        print self._aspace[item.NodeId]
258 1
        del self._aspace[item.NodeId]
259 1
260 1
        return ua.StatusCode()
261 1
262
    def add_references(self, refs, user=User.Admin):
263 1
        result = []
264 1
        for ref in refs:
265 1
            result.append(self._add_reference(ref, user))
266 1
        return result
267 1
268 1
    def _add_reference(self, addref, user):
269
        if addref.SourceNodeId not in self._aspace:
270 1
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
271 1
        if addref.TargetNodeId not in self._aspace:
272 1
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
273 1
        if not user == User.Admin:
274 1
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
275 1
        rdesc = ua.ReferenceDescription()
276 1
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
277 1
        rdesc.IsForward = addref.IsForward
278 1
        rdesc.NodeId = addref.TargetNodeId
279 1
        rdesc.NodeClass = addref.TargetNodeClass
280 1
        bname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
281 1
        if bname:
282 1
            rdesc.BrowseName = bname
283 1
        dname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
284 1
        if dname:
285 1
            rdesc.DisplayName = dname
286 1
        self._aspace[addref.SourceNodeId].references.append(rdesc)
287 1
        return ua.StatusCode()
288 1
289 1
    def delete_references(self, refs, user=User.Admin):
290 1
        result = []
291 1
        for ref in refs:
292 1
            result.append(self._delete_reference(ref, user))
293 1
        return result
294
295
    def _delete_reference(self, item, user):
296 1
        if item.SourceNodeId not in self._aspace:
297
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
298 1
        if item.TargetNodeId not in self._aspace:
299 1
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
300 1
        if not user == User.Admin:
301
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
302 1
303 1
        for rdesc in self._aspace[item.SourceNodeId].references:
304 1
            if rdesc.NodeId is item.TargetNodeId:
305 1
                if rdesc.RefrenceTypeId != item.RefrenceTypeId:
306 1
                    return ua.StatusCode(ua.StatusCode.BadReferenceTypeInvalid)
307
                if rdesc.IsForward == item.IsForward or item.DeleteBidirectional:
308 1
                    self._aspace[item.SourceNodeId].references.remove(rdesc)
309 1
310 1
        for rdesc in self._aspace[item.TargetNodeId].references:
311 1
            if rdesc.NodeId is item.SourceNodeId:
312
                if rdesc.RefrenceTypeId != item.RefrenceTypeId:
313 1
                    return ua.StatusCode(ua.StatusCode.BadReferenceTypeInvalid)
314 1
                if rdesc.IsForward == item.IsForward or item.DeleteBidirectional:
315
                    self._aspace[item.SourceNodeId].references.remove(rdesc)
316
317 1
        return ua.StatusCode()
318 1
319 1
    def _add_node_attr(self, item, nodedata, name, vtype=None):
320 1
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
321 1
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
322 1
            dv.ServerTimestamp = datetime.utcnow()
323 1
            dv.SourceTimestamp = datetime.utcnow()
324 1
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
325
326
    def _add_nodeattributes(self, item, nodedata):
327 1
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
328
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.Int32)
329
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
330
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
331
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
332
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
333
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
334
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
335 1
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
336 1
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
337 1
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
338 1
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
339 1
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
340 1
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.UInt32)
341
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
342 1
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
343 1
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
344 1
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Byte)
345
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
346 1
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
347 1
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.Byte)
348 1
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
349
        self._add_node_attr(item, nodedata, "Value")
350 1
351 1
352 1
class MethodService(object):
353
354 1
    def __init__(self, aspace):
355
        self.logger = logging.getLogger(__name__)
356
        self._aspace = aspace
357
358
    def call(self, methods):
359
        results = []
360
        for method in methods:
361 1
            results.append(self._call(method))
362
        return results
363
364
    def _call(self, method):
365
        res = ua.CallMethodResult()
366
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
367
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
368 1
        else:
369 1
            node = self._aspace[method.MethodId]
370
            if node.call is None:
371 1
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
372 1
            else:
373 1
                try:
374 1
                    res.OutputArguments = node.call(method.ObjectId, *method.InputArguments)
375 1
                    for _ in method.InputArguments:
376 1
                        res.InputArgumentResults.append(ua.StatusCode())
377 1
                except Exception:
378 1
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
379 1
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
380 1
        return res
381 1
382
383 1
class AddressSpace(object):
384
385 1
    """
386 1
    The address space object stores all the nodes og the OPC-UA server
387 1
    and helper methods.
388 1
    The methods are threadsafe
389 1
    """
390 1
391 1
    def __init__(self):
392 1
        self.logger = logging.getLogger(__name__)
393 1
        self._nodes = {}
394 1
        self._lock = RLock()  # FIXME: should use multiple reader, one writter pattern
395 1
        self._datachange_callback_counter = 200
396 1
        self._handle_to_attribute_map = {}
397
398 1
    def __getitem__(self, nodeid):
399 1
        with self._lock:
400 1
            return self._nodes.__getitem__(nodeid)
401 1
402 1
    def __setitem__(self, nodeid, value):
403 1
        with self._lock:
404
            return self._nodes.__setitem__(nodeid, value)
405 1
406 1
    def __contains__(self, nodeid):
407 1
        with self._lock:
408
            return self._nodes.__contains__(nodeid)
409
410
    def __delitem__(self, nodeid):
411 1
        with self._lock:
412
            self._nodes.__delitem__(nodeid)
413 1
414 1
    def keys(self):
415 1
        with self._lock:
416 1
            return self._nodes.keys()
417
418 1
    def dump(self, path):
419 1
        """
420 1
        dump address space as binary to file
421 1
        """
422 1
        with open(path, 'wb') as f:
423 1
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
424 1
425 1
    def load(self, path):
426 1
        """
427
        load address space from file, overwritting everything current address space
428 1
        """
429 1
        with open(path, 'rb') as f:
430 1
            self._nodes = pickle.load(f)
431 1
432
    def get_attribute_value(self, nodeid, attr):
433 1
        with self._lock:
434 1
            #self.logger.debug("get attr val: %s %s", nodeid, attr)
435 1
            if nodeid not in self._nodes:
436 1
                dv = ua.DataValue()
437
                dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
438
                return dv
439
            node = self._nodes[nodeid]
440
            if attr not in node.attributes:
441
                dv = ua.DataValue()
442
                dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
443
                return dv
444
            attval = node.attributes[attr]
445
            if attval.value_callback:
446
                return attval.value_callback()
447
            return attval.value
448
449
    def set_attribute_value(self, nodeid, attr, value):
450
        with self._lock:
451
            self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
452
            if nodeid not in self._nodes:
453
                return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
454
            node = self._nodes[nodeid]
455
            if attr not in node.attributes:
456
                return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
457
            if not value.SourceTimestamp:
458
                value.SourceTimestamp = datetime.utcnow()
459
            if not value.ServerTimestamp:
460
                value.ServerTimestamp = datetime.utcnow()
461
462
            attval = node.attributes[attr]
463
            old = attval.value
464
            attval.value = value
465
            cbs = []
466
            if old.Value != value.Value:  # only send call callback when a value change has happend
467
                cbs = list(attval.datachange_callbacks.items())
468
469
        for k, v in cbs:
470
            try:
471
                v(k, value)
472
            except Exception as ex:
473
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
474
475
        return ua.StatusCode()
476
477
    def add_datachange_callback(self, nodeid, attr, callback):
478
        with self._lock:
479
            self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
480
            if nodeid not in self._nodes:
481
                return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
482
            node = self._nodes[nodeid]
483
            if attr not in node.attributes:
484
                return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
485
            attval = node.attributes[attr]
486
            self._datachange_callback_counter += 1
487
            handle = self._datachange_callback_counter
488
            attval.datachange_callbacks[handle] = callback
489
            self._handle_to_attribute_map[handle] = (nodeid, attr)
490
            return ua.StatusCode(), handle
491
492
    def delete_datachange_callback(self, handle):
493
        with self._lock:
494
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
495
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
496
497
    def add_method_callback(self, methodid, callback):
498
        with self._lock:
499
            node = self._nodes[methodid]
500
            node.call = callback
501