Completed
Pull Request — master (#144)
by
unknown
08:10
created

NodeManagementService._add_node_attr()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 6
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User, UserRole
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    async def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(await self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    async def write(self, params, user=User(role=UserRole.Admin)):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user.role != UserRole.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = await self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = await self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, direction, isforward):
140
        if direction == ua.BrowseDirection.Both:
141
            return True
142
        if direction == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if direction == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if not path.RelativePath.Elements[-1].TargetName:
159
            # OPC UA Part 4: Services, 5.8.4 TranslateBrowsePathsToNodeIds
160
            # it's unclear if this the check should also handle empty strings
161
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameInvalid)
162
            return res
163
        if path.StartingNode not in self._aspace:
164
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
165
            return res
166
        current = path.StartingNode
167
        for el in path.RelativePath.Elements:
168
            nodeid = self._find_element_in_node(el, current)
169
            if not nodeid:
170
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
171
                return res
172
            current = nodeid
173
        target = ua.BrowsePathTarget()
174
        target.TargetId = current
175
        target.RemainingPathIndex = 4294967295
176
        res.Targets = [target]
177
        return res
178
179
    def _find_element_in_node(self, el, nodeid):
180
        nodedata = self._aspace[nodeid]
181
        for ref in nodedata.references:
182
            if ref.BrowseName != el.TargetName:
183
                continue
184
            if ref.IsForward == el.IsInverse:
185
                continue
186
            if not el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
187
                continue
188
            elif el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
189
                if ref.ReferenceTypeId not in self._get_sub_ref(el.ReferenceTypeId):
190
                    continue
191
            return ref.NodeId
192
        self.logger.info("element %s was not found in node %s", el, nodeid)
193
        return None
194
195
196
class NodeManagementService:
197
198
    def __init__(self, aspace: "AddressSpace"):
199
        self.logger = logging.getLogger(__name__)
200
        self._aspace: "AddressSpace" = aspace
201
202
203
    async def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)):
204
        results = []
205
        for item in addnodeitems:
206
            results.append(await self._add_node(item, user))
207
        return results
208
209
210
    async def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True):
211
        for item in addnodeitems:
212
            ret = await self._add_node(item, user, check=check)
213
            if not ret.StatusCode.is_good():
214
                yield item
215
216
    async def _add_node(self, item, user, check=True):
217
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
218
        result = ua.AddNodesResult()
219
220
        if not user.role == UserRole.Admin:
221
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
222
            return result
223
224
        if item.RequestedNewNodeId.has_null_identifier():
225
            # If Identifier of requested NodeId is null we generate a new NodeId using
226
            # the namespace of the nodeid, this is an extention of the spec to allow
227
            # to requests the server to generate a new nodeid in a specified namespace
228
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
229
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
230
        else:
231
            if item.RequestedNewNodeId in self._aspace:
232
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
233
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
234
                return result
235
236
        if item.ParentNodeId.is_null():
237
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
238
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
239
            if check:
240
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
241
                return result
242
243
        parentdata = self._aspace.get(item.ParentNodeId)
244
        if parentdata is None and not item.ParentNodeId.is_null():
245
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
246
                item.RequestedNewNodeId, item.ParentNodeId)
247
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
248
            return result
249
250
        nodedata = NodeData(item.RequestedNewNodeId)
251
252
        self._add_node_attributes(nodedata, item, add_timestamps=check)
253
254
        # now add our node to db
255
        self._aspace[nodedata.nodeid] = nodedata
256
257
        if parentdata is not None:
258
            self._add_ref_from_parent(nodedata, item, parentdata)
259
            await self._add_ref_to_parent(nodedata, item, parentdata)
260
261
        # add type definition
262
        if item.TypeDefinition != ua.NodeId():
263
            await self._add_type_definition(nodedata, item)
264
265
        result.StatusCode = ua.StatusCode()
266
        result.AddedNodeId = nodedata.nodeid
267
268
        return result
269
270
    def _add_node_attributes(self, nodedata, item, add_timestamps):
271
        # add common attrs
272
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
273
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
274
        )
275
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
276
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
277
        )
278
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
279
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
280
        )
281
        # add requested attrs
282
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
283
284
    def _add_unique_reference(self, nodedata, desc):
285
        for r in nodedata.references:
286
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
287
                if r.IsForward != desc.IsForward:
288
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
289
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
290
                break  # ref already exists
291
        else:
292
            nodedata.references.append(desc)
293
        return ua.StatusCode()
294
295
    def _add_ref_from_parent(self, nodedata, item, parentdata):
296
        desc = ua.ReferenceDescription()
297
        desc.ReferenceTypeId = item.ReferenceTypeId
298
        desc.NodeId = nodedata.nodeid
299
        desc.NodeClass = item.NodeClass
300
        desc.BrowseName = item.BrowseName
301
        desc.DisplayName = item.NodeAttributes.DisplayName
302
        desc.TypeDefinition = item.TypeDefinition
303
        desc.IsForward = True
304
        self._add_unique_reference(parentdata, desc)
305
306
    async def _add_ref_to_parent(self, nodedata, item, parentdata):
307
        addref = ua.AddReferencesItem()
308
        addref.ReferenceTypeId = item.ReferenceTypeId
309
        addref.SourceNodeId = nodedata.nodeid
310
        addref.TargetNodeId = item.ParentNodeId
311
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
312
        addref.IsForward = False
313
        await self._add_reference_no_check(nodedata, addref)
314
315
    async def _add_type_definition(self, nodedata, item):
316
        addref = ua.AddReferencesItem()
317
        addref.SourceNodeId = nodedata.nodeid
318
        addref.IsForward = True
319
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
320
        addref.TargetNodeId = item.TypeDefinition
321
        addref.TargetNodeClass = ua.NodeClass.DataType
322
        await self._add_reference_no_check(nodedata, addref)
323
324
    def delete_nodes(self, deletenodeitems, user=User(role=UserRole.Admin)):
325
        results = []
326
        for item in deletenodeitems.NodesToDelete:
327
            results.append(self._delete_node(item, user))
328
        return results
329
330
    def _delete_node(self, item, user):
331
        if user.role != UserRole.Admin:
332
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
333
334
        if item.NodeId not in self._aspace:
335
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
336
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
337
338
        if item.DeleteTargetReferences:
339
            for elem in self._aspace.keys():
340
                for rdesc in self._aspace[elem].references:
341
                    if rdesc.NodeId == item.NodeId:
342
                        self._aspace[elem].references.remove(rdesc)
343
344
        self._delete_node_callbacks(self._aspace[item.NodeId])
345
346
        del (self._aspace[item.NodeId])
347
348
        return ua.StatusCode()
349
350
    def _delete_node_callbacks(self, nodedata):
351
        if ua.AttributeIds.Value in nodedata.attributes:
352
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
353
                try:
354
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
355
                    self._aspace.delete_datachange_callback(handle)
356
                except Exception as ex:
357
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
358
                        ua.AttributeIds.Value, ex)
359
360
    async def add_references(self, refs, user=User(role=UserRole.Admin)):
361
        result = []
362
        for ref in refs:
363
            result.append(await self._add_reference(ref, user))
364
        return result
365
366
    async def try_add_references(self, refs, user=User(role=UserRole.Admin)):
367
        for ref in refs:
368
            ret = await self._add_reference(ref, user)
369
            if not ret.is_good():
370
                yield ref
371
372
    async def _add_reference(self, addref, user):
373
        sourcedata = self._aspace.get(addref.SourceNodeId)
374
        if sourcedata is None:
375
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
376
        if addref.TargetNodeId not in self._aspace:
377
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
378
        if user.role != UserRole.Admin:
379
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
380
        return await self._add_reference_no_check(sourcedata, addref)
381
382
    async def _add_reference_no_check(self, sourcedata, addref):
383
        rdesc = ua.ReferenceDescription()
384
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
385
        rdesc.IsForward = addref.IsForward
386
        rdesc.NodeId = addref.TargetNodeId
387
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
388
            rdesc.NodeClass = (await self._aspace.read_attribute_value(
389
                addref.TargetNodeId, ua.AttributeIds.NodeClass)).Value.Value
390
        else:
391
            rdesc.NodeClass = addref.TargetNodeClass
392
        bname = (await self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName)).Value.Value
393
        if bname:
394
            rdesc.BrowseName = bname
395
        dname = (await self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName)).Value.Value
396
        if dname:
397
            rdesc.DisplayUser = dname
398
        return self._add_unique_reference(sourcedata, rdesc)
399
400
    def delete_references(self, refs, user=User(role=UserRole.Admin)):
401
        result = []
402
        for ref in refs:
403
            result.append(self._delete_reference(ref, user))
404
        return result
405
406
    def _delete_unique_reference(self, item, invert=False):
407
        if invert:
408
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
409
        else:
410
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
411
        for rdesc in self._aspace[source].references:
412
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
413
                if rdesc.IsForward == forward:
414
                    self._aspace[source].references.remove(rdesc)
415
                    return ua.StatusCode()
416
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
417
418
    def _delete_reference(self, item, user):
419
        if item.SourceNodeId not in self._aspace:
420
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
421
        if item.TargetNodeId not in self._aspace:
422
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
423
        if item.ReferenceTypeId not in self._aspace:
424
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
425
        if user.role != UserRole.Admin:
426
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
427
428
        if item.DeleteBidirectional:
429
            self._delete_unique_reference(item, True)
430
        return self._delete_unique_reference(item)
431
432
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
433
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
434
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
435
            if add_timestamps:
436
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
437
                dv.SourceTimestamp = datetime.utcnow()
438
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
439
440
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
441
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
442
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
443
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
444
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
445
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
446
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
447
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
448
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
449
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
450
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
451
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
452
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
453
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
454
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
455
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
456
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
457
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
458
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
459
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
460
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
461
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
462
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
463
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
464
465
466
class MethodService:
467
468
    def __init__(self, aspace: "AddressSpace"):
469
        self.logger = logging.getLogger(__name__)
470
        self._aspace: "AddressSpace" = aspace
471
        self._pool = ThreadPoolExecutor()
472
473
    def stop(self):
474
        self._pool.shutdown()
475
476
    async def call(self, methods):
477
        results = []
478
        for method in methods:
479
            res = await self._call(method)
480
            results.append(res)
481
        return results
482
483
    async def _call(self, method):
484
        self.logger.info("Calling: %s", method)
485
        res = ua.CallMethodResult()
486
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
487
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
488
        else:
489
            node = self._aspace[method.MethodId]
490
            if node.call is None:
491
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
492
            else:
493
                try:
494
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
495
                except Exception:
496
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
497
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
498
                else:
499
                    if isinstance(result, ua.CallMethodResult):
500
                        res = result
501
                    elif isinstance(result, ua.StatusCode):
502
                        res.StatusCode = result
503
                    else:
504
                        res.OutputArguments = result
505
                    while len(res.InputArgumentResults) < len(method.InputArguments):
506
                        res.InputArgumentResults.append(ua.StatusCode())
507
        return res
508
509
    async def _run_method(self, func, parent, *args):
510
        if asyncio.iscoroutinefunction(func):
511
            return await func(parent, *args)
512
        p = partial(func, parent, *args)
513
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
514
        return res
515
516
517
class AddressSpace:
518
    """
519
    The address space object stores all the nodes of the OPC-UA server and helper methods.
520
    The methods are thread safe
521
    """
522
523
    def __init__(self):
524
        self.logger = logging.getLogger(__name__)
525
        self._nodes = {}
526
        self._datachange_callback_counter = 200
527
        self._handle_to_attribute_map = {}
528
        self._default_idx = 2
529
        self._nodeid_counter = {0: 20000, 1: 2000}
530
531
    def __getitem__(self, nodeid):
532
        return self._nodes.__getitem__(nodeid)
533
534
    def get(self, nodeid):
535
        return self._nodes.get(nodeid, None)
536
537
    def __setitem__(self, nodeid, value):
538
        return self._nodes.__setitem__(nodeid, value)
539
540
    def __contains__(self, nodeid):
541
        return self._nodes.__contains__(nodeid)
542
543
    def __delitem__(self, nodeid):
544
        self._nodes.__delitem__(nodeid)
545
546
    def generate_nodeid(self, idx=None):
547
        if idx is None:
548
            idx = self._default_idx
549
        if idx in self._nodeid_counter:
550
            self._nodeid_counter[idx] += 1
551
        else:
552
            # get the biggest identifier number from the existed nodes in address space
553
            identifier_list = sorted([
554
                nodeid.Identifier for nodeid in self._nodes.keys()
555
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
556
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
557
                )
558
            ])
559
            if identifier_list:
560
                self._nodeid_counter[idx] = identifier_list[-1]
561
            else:
562
                self._nodeid_counter[idx] = 1
563
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
564
        while True:
565
            if nodeid in self._nodes:
566
                nodeid = self.generate_nodeid(idx)
567
            else:
568
                return nodeid
569
570
    def keys(self):
571
        return self._nodes.keys()
572
573
    def empty(self):
574
        """Delete all nodes in address space"""
575
        self._nodes = {}
576
577
    def dump(self, path):
578
        """
579
        Dump address space as binary to file; note that server must be stopped for this method to work
580
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
581
        """
582
        # prepare nodes in address space for being serialized
583
        for nodeid, ndata in self._nodes.items():
584
            # if the node has a reference to a method call, remove it so the object can be serialized
585
            if ndata.call is not None:
586
                self._nodes[nodeid].call = None
587
588
        with open(path, 'wb') as f:
589
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
590
591
    def load(self, path):
592
        """
593
        Load address space from a binary file, overwriting everything in the current address space
594
        """
595
        with open(path, 'rb') as f:
596
            self._nodes = pickle.load(f)
597
598
    def make_aspace_shelf(self, path):
599
        """
600
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
601
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
602
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
603
        is currently NOT supported, it is only used for the default OPC UA standard address space
604
605
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
606
        """
607
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
608
            for nodeid, ndata in self._nodes.items():
609
                s[nodeid.to_string()] = ndata
610
611
    def load_aspace_shelf(self, path):
612
        """
613
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
614
        The dump() method can no longer be used if the address space is being loaded from a shelf
615
616
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
617
        """
618
        raise NotImplementedError
619
620
        # ToDo: async friendly implementation - load all at once?
621
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
622
            """
623
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
624
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
625
            is currently NOT supported
626
            """
627
628
            def __init__(self, source):
629
                self.source = source  # python shelf
630
                self.cache = {}  # internal dict
631
632
            def __getitem__(self, key):
633
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
634
                try:
635
                    return self.cache[key]
636
                except KeyError:
637
                    node = self.cache[key] = self.source[key.to_string()]
638
                    return node
639
640
            def __setitem__(self, key, value):
641
                # add a new item to the cache; if this item is in the shelf it is not updated
642
                self.cache[key] = value
643
644
            def __contains__(self, key):
645
                return key in self.cache or key.to_string() in self.source
646
647
            def __delitem__(self, key):
648
                # only deleting items from the cache is allowed
649
                del self.cache[key]
650
651
            def __iter__(self):
652
                # only the cache can be iterated over
653
                return iter(self.cache.keys())
654
655
            def __len__(self):
656
                # only returns the length of items in the cache, not unaccessed items in the shelf
657
                return len(self.cache)
658
659
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
660
661
    async def read_attribute_value(self, nodeid, attr):
662
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
663
        if nodeid not in self._nodes:
664
            dv = ua.DataValue()
665
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
666
            return dv
667
        node = self._nodes[nodeid]
668
        if attr not in node.attributes:
669
            dv = ua.DataValue()
670
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
671
            return dv
672
        attval = node.attributes[attr]
673
        if attval.value_callback:
674
            if asyncio.iscoroutinefunction(attval.value_callback):
675
                await attval.value_callback()
676
            else:
677
                return attval.value_callback()
678
        return attval.value
679
680
    async def write_attribute_value(self, nodeid, attr, value):
681
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
682
        node = self._nodes.get(nodeid, None)
683
        if node is None:
684
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
685
        attval = node.attributes.get(attr, None)
686
        if attval is None:
687
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
688
689
        old = attval.value
690
        attval.value = value
691
        cbs = []
692
        if old.Value != value.Value:  # only send call callback when a value change has happend
693
            cbs = list(attval.datachange_callbacks.items())
694
695
        for k, v in cbs:
696
            try:
697
                await v(k, value)
698
            except Exception as ex:
699
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
700
701
        return ua.StatusCode()
702
703
    def add_datachange_callback(self, nodeid, attr, callback):
704
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
705
        if nodeid not in self._nodes:
706
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
707
        node = self._nodes[nodeid]
708
        if attr not in node.attributes:
709
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
710
        attval = node.attributes[attr]
711
        self._datachange_callback_counter += 1
712
        handle = self._datachange_callback_counter
713
        attval.datachange_callbacks[handle] = callback
714
        self._handle_to_attribute_map[handle] = (nodeid, attr)
715
        return ua.StatusCode(), handle
716
717
    def delete_datachange_callback(self, handle):
718
        if handle in self._handle_to_attribute_map:
719
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
720
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
721
722
    def add_method_callback(self, methodid, callback):
723
        node = self._nodes[methodid]
724
        node.call = callback
725
726
    def add_value_callback_to_node(self, nodeid, attr, callback):
727
        if nodeid not in self._nodes:
728
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
729
        node = self._nodes[nodeid]
730
        if attr not in node.attributes:
731
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
732
        attval = self._nodes[nodeid].attributes[attr]
733
        attval.value_callback = callback
734
        return ua.StatusCode()
735