Passed
Pull Request — master (#306)
by
unknown
04:58
created

NodeManagementService._add_node()   F

Complexity

Conditions 18

Size

Total Lines 74
Code Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
eloc 48
nop 4
dl 0
loc 74
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like asyncua.server.address_space.NodeManagementService._add_node() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User, UserRole
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    def read(self, params):
50
        # self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    async def write(self, params, user=User(role=UserRole.Admin)):
57
        # self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user.role != UserRole.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not \
68
                        ua.ua_binary.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(
72
                await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
73
        return res
74
75
76
class ViewService(object):
77
78
    def __init__(self, aspace: "AddressSpace"):
79
        self.logger = logging.getLogger(__name__)
80
        self._aspace: "AddressSpace" = aspace
81
82
    def browse(self, params):
83
        # self.logger.debug("browse %s", params)
84
        res = []
85
        for desc in params.NodesToBrowse:
86
            res.append(self._browse(desc))
87
        return res
88
89
    def _browse(self, desc):
90
        res = ua.BrowseResult()
91
        if desc.NodeId not in self._aspace:
92
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
93
            return res
94
        node = self._aspace[desc.NodeId]
95
        for ref in node.references:
96
            if not self._is_suitable_ref(desc, ref):
97
                continue
98
            res.References.append(ref)
99
        return res
100
101
    def _is_suitable_ref(self, desc, ref):
102
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
103
            # self.logger.debug("%s is not suitable due to direction", ref)
104
            return False
105
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
106
            # self.logger.debug("%s is not suitable due to type", ref)
107
            return False
108
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
109
            # self.logger.debug("%s is not suitable due to class", ref)
110
            return False
111
        # self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
112
        return True
113
114
    def _suitable_reftype(self, ref1, ref2, subtypes):
115
        """
116
        """
117
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
118
            # If ReferenceTypeId is not specified in the BrowseDescription,
119
            # all References are returned and includeSubtypes is ignored.
120
            return True
121
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
122
            return False
123
        if ref1.Identifier == ref2.Identifier:
124
            return True
125
        oktypes = self._get_sub_ref(ref1)
126
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
127
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
128
        return ref2 in oktypes
129
130
    def _get_sub_ref(self, ref):
131
        res = []
132
        nodedata = self._aspace[ref]
133
        if nodedata is not None:
134
            for ref in nodedata.references:
135
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
136
                    res.append(ref.NodeId)
137
                    res += self._get_sub_ref(ref.NodeId)
138
        return res
139
140
    def _suitable_direction(self, direction, isforward):
141
        if direction == ua.BrowseDirection.Both:
142
            return True
143
        if direction == ua.BrowseDirection.Forward and isforward:
144
            return True
145
        if direction == ua.BrowseDirection.Inverse and not isforward:
146
            return True
147
        return False
148
149
    def translate_browsepaths_to_nodeids(self, browsepaths):
150
        # self.logger.debug("translate browsepath: %s", browsepaths)
151
        results = []
152
        for path in browsepaths:
153
            results.append(self._translate_browsepath_to_nodeid(path))
154
        return results
155
156
    def _translate_browsepath_to_nodeid(self, path):
157
        # self.logger.debug("looking at path: %s", path)
158
        res = ua.BrowsePathResult()
159
        if not path.RelativePath.Elements[-1].TargetName:
160
            # OPC UA Part 4: Services, 5.8.4 TranslateBrowsePathsToNodeIds
161
            # it's unclear if this the check should also handle empty strings
162
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameInvalid)
163
            return res
164
        if path.StartingNode not in self._aspace:
165
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
166
            return res
167
        current = path.StartingNode
168
        for el in path.RelativePath.Elements:
169
            nodeid = self._find_element_in_node(el, current)
170
            if not nodeid:
171
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
172
                return res
173
            current = nodeid
174
        target = ua.BrowsePathTarget()
175
        target.TargetId = current
176
        target.RemainingPathIndex = 4294967295
177
        res.Targets = [target]
178
        return res
179
180
    def _find_element_in_node(self, el, nodeid):
181
        nodedata = self._aspace[nodeid]
182
        for ref in nodedata.references:
183
            if ref.BrowseName != el.TargetName:
184
                continue
185
            if ref.IsForward == el.IsInverse:
186
                continue
187
            if not el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
188
                continue
189
            elif el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
190
                if ref.ReferenceTypeId not in self._get_sub_ref(el.ReferenceTypeId):
191
                    continue
192
            return ref.NodeId
193
        self.logger.info("element %s was not found in node %s", el, nodeid)
194
        return None
195
196
197
class NodeManagementService:
198
199
    def __init__(self, aspace: "AddressSpace"):
200
        self.logger = logging.getLogger(__name__)
201
        self._aspace: "AddressSpace" = aspace
202
203
    def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)):
204
        results = []
205
        for item in addnodeitems:
206
            results.append(self._add_node(item, user))
207
        return results
208
209
    def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True):
210
        for item in addnodeitems:
211
            ret = self._add_node(item, user, check=check)
212
            if not ret.StatusCode.is_good():
213
                yield item
214
215
    def _add_node(self, item, user, check=True):
216
        # self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
217
        result = ua.AddNodesResult()
218
219
        if not user.role == UserRole.Admin:
220
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
221
            return result
222
223
        if item.RequestedNewNodeId.has_null_identifier():
224
            # If Identifier of requested NodeId is null we generate a new NodeId using
225
            # the namespace of the nodeid, this is an extention of the spec to allow
226
            # to requests the server to generate a new nodeid in a specified namespace
227
            # self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
228
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
229
        else:
230
            if item.RequestedNewNodeId in self._aspace:
231
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
232
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
233
                return result
234
235
        if item.ParentNodeId.is_null():
236
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
237
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
238
            if check:
239
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
240
                return result
241
242
        parentdata = self._aspace.get(item.ParentNodeId)
243
        if parentdata is None and not item.ParentNodeId.is_null():
244
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
245
                             item.RequestedNewNodeId, item.ParentNodeId)
246
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
247
            return result
248
249
        try:
250
            for ref in self._aspace._nodes[item.ParentNodeId].references:
251
                # Check if the Parent has a "HasChild" Reference (or subtype of it) with the Node
252
                if ref.ReferenceTypeId.Identifier in [ua.ObjectIds.HasChild, ua.ObjectIds.HasComponent,
253
                                           ua.ObjectIds.HasProperty, ua.ObjectIds.HasSubtype,
254
                                           ua.ObjectIds.HasOrderedComponent] and ref.IsForward:
255
                    if item.BrowseName.Name == ref.BrowseName.Name:
256
                        self.logger.warning(f"AddNodesItem: Requested Browsename {item.BrowseName.Name}"
257
                                            f" already exists in Parent Node. ParentID:{item.ParentNodeId} --- "
258
                                            f"ItemId:{item.RequestedNewNodeId}")
259
                        result.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameDuplicated)
260
                        return result
261
        except KeyError as e:
262
            if item.ParentNodeId.Identifier == 0 and item.ParentNodeId.NamespaceIndex == 0:
263
                self.logger.debug(f"{e} - NodeParent seems to be Root Node")
264
            else:
265
                self.logger.debug(f"{e} - NodeParent does not exist in Server")
266
        except BaseException as e:
267
            self.logger.warning(f"Unknown Exception thrown: {e}")
268
            pass
269
270
        nodedata = NodeData(item.RequestedNewNodeId)
271
272
        self._add_node_attributes(nodedata, item, add_timestamps=check)
273
274
        # now add our node to db
275
        self._aspace[nodedata.nodeid] = nodedata
276
277
        if parentdata is not None:
278
            self._add_ref_from_parent(nodedata, item, parentdata)
279
            self._add_ref_to_parent(nodedata, item, parentdata)
280
281
        # add type definition
282
        if item.TypeDefinition != ua.NodeId():
283
            self._add_type_definition(nodedata, item)
284
285
        result.StatusCode = ua.StatusCode()
286
        result.AddedNodeId = nodedata.nodeid
287
288
        return result
289
290
    def _add_node_attributes(self, nodedata, item, add_timestamps):
291
        # add common attrs
292
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
293
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
294
        )
295
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
296
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
297
        )
298
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
299
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
300
        )
301
        # add requested attrs
302
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
303
304
    def _add_unique_reference(self, nodedata, desc):
305
        for r in nodedata.references:
306
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
307
                if r.IsForward != desc.IsForward:
308
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
309
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
310
                break  # ref already exists
311
        else:
312
            nodedata.references.append(desc)
313
        return ua.StatusCode()
314
315
    def _add_ref_from_parent(self, nodedata, item, parentdata):
316
        desc = ua.ReferenceDescription()
317
        desc.ReferenceTypeId = item.ReferenceTypeId
318
        desc.NodeId = nodedata.nodeid
319
        desc.NodeClass = item.NodeClass
320
        desc.BrowseName = item.BrowseName
321
        desc.DisplayName = item.NodeAttributes.DisplayName
322
        desc.TypeDefinition = item.TypeDefinition
323
        desc.IsForward = True
324
        self._add_unique_reference(parentdata, desc)
325
326
    def _add_ref_to_parent(self, nodedata, item, parentdata):
327
        addref = ua.AddReferencesItem()
328
        addref.ReferenceTypeId = item.ReferenceTypeId
329
        addref.SourceNodeId = nodedata.nodeid
330
        addref.TargetNodeId = item.ParentNodeId
331
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
332
        addref.IsForward = False
333
        self._add_reference_no_check(nodedata, addref)
334
335
    def _add_type_definition(self, nodedata, item):
336
        addref = ua.AddReferencesItem()
337
        addref.SourceNodeId = nodedata.nodeid
338
        addref.IsForward = True
339
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
340
        addref.TargetNodeId = item.TypeDefinition
341
        addref.TargetNodeClass = ua.NodeClass.DataType
342
        self._add_reference_no_check(nodedata, addref)
343
344
    def delete_nodes(self, deletenodeitems, user=User(role=UserRole.Admin)):
345
        results = []
346
        for item in deletenodeitems.NodesToDelete:
347
            results.append(self._delete_node(item, user))
348
        return results
349
350
    def _delete_node(self, item, user):
351
        if user.role != UserRole.Admin:
352
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
353
354
        if item.NodeId not in self._aspace:
355
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
356
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
357
358
        if item.DeleteTargetReferences:
359
            for elem in self._aspace.keys():
360
                for rdesc in self._aspace[elem].references[:]:
361
                    if rdesc.NodeId == item.NodeId:
362
                        self._aspace[elem].references.remove(rdesc)
363
364
        self._delete_node_callbacks(self._aspace[item.NodeId])
365
366
        del (self._aspace[item.NodeId])
367
368
        return ua.StatusCode()
369
370
    def _delete_node_callbacks(self, nodedata):
371
        if ua.AttributeIds.Value in nodedata.attributes:
372
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
373
                try:
374
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
375
                    self._aspace.delete_datachange_callback(handle)
376
                except Exception as ex:
377
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
378
                                          ua.AttributeIds.Value, ex)
379
380
    def add_references(self, refs, user=User(role=UserRole.Admin)):
381
        result = []
382
        for ref in refs:
383
            result.append(self._add_reference(ref, user))
384
        return result
385
386
    def try_add_references(self, refs, user=User(role=UserRole.Admin)):
387
        for ref in refs:
388
            if not self._add_reference(ref, user).is_good():
389
                yield ref
390
391
    def _add_reference(self, addref, user):
392
        sourcedata = self._aspace.get(addref.SourceNodeId)
393
        if sourcedata is None:
394
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
395
        if addref.TargetNodeId not in self._aspace:
396
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
397
        if user.role != UserRole.Admin:
398
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
399
        return self._add_reference_no_check(sourcedata, addref)
400
401
    def _add_reference_no_check(self, sourcedata, addref):
402
        rdesc = ua.ReferenceDescription()
403
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
404
        rdesc.IsForward = addref.IsForward
405
        rdesc.NodeId = addref.TargetNodeId
406
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
407
            rdesc.NodeClass = self._aspace.read_attribute_value(
408
                addref.TargetNodeId, ua.AttributeIds.NodeClass).Value.Value
409
        else:
410
            rdesc.NodeClass = addref.TargetNodeClass
411
        bname = self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
412
        if bname:
413
            rdesc.BrowseName = bname
414
        dname = self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
415
        if dname:
416
            rdesc.DisplayName = dname
417
        return self._add_unique_reference(sourcedata, rdesc)
418
419
    def delete_references(self, refs, user=User(role=UserRole.Admin)):
420
        result = []
421
        for ref in refs:
422
            result.append(self._delete_reference(ref, user))
423
        return result
424
425
    def _delete_unique_reference(self, item, invert=False):
426
        if invert:
427
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
428
        else:
429
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
430
        for rdesc in self._aspace[source].references:
431
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
432
                if rdesc.IsForward == forward:
433
                    self._aspace[source].references.remove(rdesc)
434
                    return ua.StatusCode()
435
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
436
437
    def _delete_reference(self, item, user):
438
        if item.SourceNodeId not in self._aspace:
439
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
440
        if item.TargetNodeId not in self._aspace:
441
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
442
        if item.ReferenceTypeId not in self._aspace:
443
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
444
        if user.role != UserRole.Admin:
445
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
446
447
        if item.DeleteBidirectional:
448
            self._delete_unique_reference(item, True)
449
        return self._delete_unique_reference(item)
450
451
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
452
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
453
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
454
            if add_timestamps:
455
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
456
                dv.SourceTimestamp = datetime.utcnow()
457
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
458
459
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
460
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
461
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
462
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
463
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
464
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
465
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
466
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
467
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
468
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
469
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
470
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
471
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
472
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
473
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
474
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
475
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
476
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
477
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
478
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
479
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
480
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
481
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
482
        self._add_node_attr(item, nodedata, "DataTypeDefinition", ua.VariantType.ExtensionObject)
483
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
484
485
486
class MethodService:
487
488
    def __init__(self, aspace: "AddressSpace"):
489
        self.logger = logging.getLogger(__name__)
490
        self._aspace: "AddressSpace" = aspace
491
        self._pool = ThreadPoolExecutor()
492
493
    def stop(self):
494
        self._pool.shutdown()
495
496
    async def call(self, methods):
497
        results = []
498
        for method in methods:
499
            res = await self._call(method)
500
            results.append(res)
501
        return results
502
503
    async def _call(self, method):
504
        self.logger.info("Calling: %s", method)
505
        res = ua.CallMethodResult()
506
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
507
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
508
        else:
509
            node = self._aspace[method.MethodId]
510
            if node.call is None:
511
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
512
            else:
513
                try:
514
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
515
                except Exception:
516
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
517
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
518
                else:
519
                    if isinstance(result, ua.CallMethodResult):
520
                        res = result
521
                    elif isinstance(result, ua.StatusCode):
522
                        res.StatusCode = result
523
                    else:
524
                        res.OutputArguments = result
525
                    while len(res.InputArgumentResults) < len(method.InputArguments):
526
                        res.InputArgumentResults.append(ua.StatusCode())
527
        return res
528
529
    async def _run_method(self, func, parent, *args):
530
        if asyncio.iscoroutinefunction(func):
531
            return await func(parent, *args)
532
        p = partial(func, parent, *args)
533
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
534
        return res
535
536
537
class AddressSpace:
538
    """
539
    The address space object stores all the nodes of the OPC-UA server and helper methods.
540
    The methods are thread safe
541
    """
542
543
    def __init__(self):
544
        self.logger = logging.getLogger(__name__)
545
        self._nodes = {}
546
        self._datachange_callback_counter = 200
547
        self._handle_to_attribute_map = {}
548
        self._default_idx = 2
549
        self._nodeid_counter = {0: 20000, 1: 2000}
550
551
    def __getitem__(self, nodeid):
552
        return self._nodes.__getitem__(nodeid)
553
554
    def get(self, nodeid):
555
        return self._nodes.get(nodeid, None)
556
557
    def __setitem__(self, nodeid, value):
558
        return self._nodes.__setitem__(nodeid, value)
559
560
    def __contains__(self, nodeid):
561
        return self._nodes.__contains__(nodeid)
562
563
    def __delitem__(self, nodeid):
564
        self._nodes.__delitem__(nodeid)
565
566
    def generate_nodeid(self, idx=None):
567
        if idx is None:
568
            idx = self._default_idx
569
        if idx in self._nodeid_counter:
570
            self._nodeid_counter[idx] += 1
571
        else:
572
            # get the biggest identifier number from the existed nodes in address space
573
            identifier_list = sorted([
574
                nodeid.Identifier for nodeid in self._nodes.keys()
575
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
576
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
577
                )
578
            ])
579
            if identifier_list:
580
                self._nodeid_counter[idx] = identifier_list[-1]
581
            else:
582
                self._nodeid_counter[idx] = 1
583
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
584
        while True:
585
            if nodeid in self._nodes:
586
                nodeid = self.generate_nodeid(idx)
587
            else:
588
                return nodeid
589
590
    def keys(self):
591
        return self._nodes.keys()
592
593
    def empty(self):
594
        """Delete all nodes in address space"""
595
        self._nodes = {}
596
597
    def dump(self, path):
598
        """
599
        Dump address space as binary to file; note that server must be stopped for this method to work
600
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
601
        """
602
        # prepare nodes in address space for being serialized
603
        for nodeid, ndata in self._nodes.items():
604
            # if the node has a reference to a method call, remove it so the object can be serialized
605
            if ndata.call is not None:
606
                self._nodes[nodeid].call = None
607
608
        with open(path, 'wb') as f:
609
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
610
611
    def load(self, path):
612
        """
613
        Load address space from a binary file, overwriting everything in the current address space
614
        """
615
        with open(path, 'rb') as f:
616
            self._nodes = pickle.load(f)
617
618
    def make_aspace_shelf(self, path):
619
        """
620
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
621
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
622
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
623
        is currently NOT supported, it is only used for the default OPC UA standard address space
624
625
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
626
        """
627
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
628
            for nodeid, ndata in self._nodes.items():
629
                s[nodeid.to_string()] = ndata
630
631
    def load_aspace_shelf(self, path):
632
        """
633
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
634
        The dump() method can no longer be used if the address space is being loaded from a shelf
635
636
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
637
        """
638
        raise NotImplementedError
639
640
        # ToDo: async friendly implementation - load all at once?
641
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
642
            """
643
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
644
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
645
            is currently NOT supported
646
            """
647
648
            def __init__(self, source):
649
                self.source = source  # python shelf
650
                self.cache = {}  # internal dict
651
652
            def __getitem__(self, key):
653
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
654
                try:
655
                    return self.cache[key]
656
                except KeyError:
657
                    node = self.cache[key] = self.source[key.to_string()]
658
                    return node
659
660
            def __setitem__(self, key, value):
661
                # add a new item to the cache; if this item is in the shelf it is not updated
662
                self.cache[key] = value
663
664
            def __contains__(self, key):
665
                return key in self.cache or key.to_string() in self.source
666
667
            def __delitem__(self, key):
668
                # only deleting items from the cache is allowed
669
                del self.cache[key]
670
671
            def __iter__(self):
672
                # only the cache can be iterated over
673
                return iter(self.cache.keys())
674
675
            def __len__(self):
676
                # only returns the length of items in the cache, not unaccessed items in the shelf
677
                return len(self.cache)
678
679
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
680
681
    def read_attribute_value(self, nodeid, attr):
682
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
683
        if nodeid not in self._nodes:
684
            dv = ua.DataValue()
685
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
686
            return dv
687
        node = self._nodes[nodeid]
688
        if attr not in node.attributes:
689
            dv = ua.DataValue()
690
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
691
            return dv
692
        attval = node.attributes[attr]
693
        if attval.value_callback:
694
            return attval.value_callback()
695
        return attval.value
696
697
    async def write_attribute_value(self, nodeid, attr, value):
698
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
699
        node = self._nodes.get(nodeid, None)
700
        if node is None:
701
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
702
        attval = node.attributes.get(attr, None)
703
        if attval is None:
704
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
705
706
        old = attval.value
707
        attval.value = value
708
        cbs = []
709
        if old.Value != value.Value:  # only send call callback when a value change has happend
710
            cbs = list(attval.datachange_callbacks.items())
711
712
        for k, v in cbs:
713
            try:
714
                await v(k, value)
715
            except Exception as ex:
716
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
717
718
        return ua.StatusCode()
719
720
    def add_datachange_callback(self, nodeid, attr, callback):
721
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
722
        if nodeid not in self._nodes:
723
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
724
        node = self._nodes[nodeid]
725
        if attr not in node.attributes:
726
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
727
        attval = node.attributes[attr]
728
        self._datachange_callback_counter += 1
729
        handle = self._datachange_callback_counter
730
        attval.datachange_callbacks[handle] = callback
731
        self._handle_to_attribute_map[handle] = (nodeid, attr)
732
        return ua.StatusCode(), handle
733
734
    def delete_datachange_callback(self, handle):
735
        if handle in self._handle_to_attribute_map:
736
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
737
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
738
739
    def add_method_callback(self, methodid, callback):
740
        node = self._nodes[methodid]
741
        node.call = callback
742