Passed
Pull Request — master (#144)
by
unknown
02:29
created

AddressSpace.add_value_callback_to_node()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 4
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User, UserRole
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    async def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(await self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    async def write(self, params, user=User(role=UserRole.Admin)):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user.role != UserRole.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = await self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = await self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, direction, isforward):
140
        if direction == ua.BrowseDirection.Both:
141
            return True
142
        if direction == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if direction == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if not path.RelativePath.Elements[-1].TargetName:
159
            # OPC UA Part 4: Services, 5.8.4 TranslateBrowsePathsToNodeIds
160
            # it's unclear if this the check should also handle empty strings
161
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameInvalid)
162
            return res
163
        if path.StartingNode not in self._aspace:
164
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
165
            return res
166
        current = path.StartingNode
167
        for el in path.RelativePath.Elements:
168
            nodeid = self._find_element_in_node(el, current)
169
            if not nodeid:
170
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
171
                return res
172
            current = nodeid
173
        target = ua.BrowsePathTarget()
174
        target.TargetId = current
175
        target.RemainingPathIndex = 4294967295
176
        res.Targets = [target]
177
        return res
178
179
    def _find_element_in_node(self, el, nodeid):
180
        nodedata = self._aspace[nodeid]
181
        for ref in nodedata.references:
182
            if ref.BrowseName != el.TargetName:
183
                continue
184
            if ref.IsForward == el.IsInverse:
185
                continue
186
            if not el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
187
                continue
188
            elif el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
189
                if ref.ReferenceTypeId not in self._get_sub_ref(el.ReferenceTypeId):
190
                    continue
191
            return ref.NodeId
192
        self.logger.info("element %s was not found in node %s", el, nodeid)
193
        return None
194
195
196
class NodeManagementService:
197
198
    def __init__(self, aspace: "AddressSpace"):
199
        self.logger = logging.getLogger(__name__)
200
        self._aspace: "AddressSpace" = aspace
201
202
203
    async def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)):
204
        results = []
205
        for item in addnodeitems:
206
            results.append(await self._add_node(item, user))
207
        return results
208
209
210
    async def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True):
211
212
        for item in addnodeitems:
213
            ret = await self._add_node(item, user, check=check)
214
            if not ret.StatusCode.is_good():
215
                yield item
216
217
    async def _add_node(self, item, user, check=True):
218
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
219
        result = ua.AddNodesResult()
220
221
        if not user.role == UserRole.Admin:
222
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
223
            return result
224
225
        if item.RequestedNewNodeId.has_null_identifier():
226
            # If Identifier of requested NodeId is null we generate a new NodeId using
227
            # the namespace of the nodeid, this is an extention of the spec to allow
228
            # to requests the server to generate a new nodeid in a specified namespace
229
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
230
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
231
        else:
232
            if item.RequestedNewNodeId in self._aspace:
233
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
234
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
235
                return result
236
237
        if item.ParentNodeId.is_null():
238
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
239
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
240
            if check:
241
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
242
                return result
243
244
        parentdata = self._aspace.get(item.ParentNodeId)
245
        if parentdata is None and not item.ParentNodeId.is_null():
246
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
247
                item.RequestedNewNodeId, item.ParentNodeId)
248
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
249
            return result
250
251
        nodedata = NodeData(item.RequestedNewNodeId)
252
253
        self._add_node_attributes(nodedata, item, add_timestamps=check)
254
255
        # now add our node to db
256
        self._aspace[nodedata.nodeid] = nodedata
257
258
        if parentdata is not None:
259
            self._add_ref_from_parent(nodedata, item, parentdata)
260
            await self._add_ref_to_parent(nodedata, item, parentdata)
261
262
        # add type definition
263
        if item.TypeDefinition != ua.NodeId():
264
            await self._add_type_definition(nodedata, item)
265
266
        result.StatusCode = ua.StatusCode()
267
        result.AddedNodeId = nodedata.nodeid
268
269
        return result
270
271
    def _add_node_attributes(self, nodedata, item, add_timestamps):
272
        # add common attrs
273
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
274
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
275
        )
276
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
277
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
278
        )
279
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
280
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
281
        )
282
        # add requested attrs
283
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
284
285
    def _add_unique_reference(self, nodedata, desc):
286
        for r in nodedata.references:
287
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
288
                if r.IsForward != desc.IsForward:
289
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
290
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
291
                break  # ref already exists
292
        else:
293
            nodedata.references.append(desc)
294
        return ua.StatusCode()
295
296
    def _add_ref_from_parent(self, nodedata, item, parentdata):
297
        desc = ua.ReferenceDescription()
298
        desc.ReferenceTypeId = item.ReferenceTypeId
299
        desc.NodeId = nodedata.nodeid
300
        desc.NodeClass = item.NodeClass
301
        desc.BrowseName = item.BrowseName
302
        desc.DisplayName = item.NodeAttributes.DisplayName
303
        desc.TypeDefinition = item.TypeDefinition
304
        desc.IsForward = True
305
        self._add_unique_reference(parentdata, desc)
306
307
    async def _add_ref_to_parent(self, nodedata, item, parentdata):
308
        addref = ua.AddReferencesItem()
309
        addref.ReferenceTypeId = item.ReferenceTypeId
310
        addref.SourceNodeId = nodedata.nodeid
311
        addref.TargetNodeId = item.ParentNodeId
312
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
313
        addref.IsForward = False
314
        await self._add_reference_no_check(nodedata, addref)
315
316
    async def _add_type_definition(self, nodedata, item):
317
        addref = ua.AddReferencesItem()
318
        addref.SourceNodeId = nodedata.nodeid
319
        addref.IsForward = True
320
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
321
        addref.TargetNodeId = item.TypeDefinition
322
        addref.TargetNodeClass = ua.NodeClass.DataType
323
        await self._add_reference_no_check(nodedata, addref)
324
325
    def delete_nodes(self, deletenodeitems, user=User(role=UserRole.Admin)):
326
        results = []
327
        for item in deletenodeitems.NodesToDelete:
328
            results.append(self._delete_node(item, user))
329
        return results
330
331
    def _delete_node(self, item, user):
332
        if user.role != UserRole.Admin:
333
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
334
335
        if item.NodeId not in self._aspace:
336
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
337
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
338
339
        if item.DeleteTargetReferences:
340
            for elem in self._aspace.keys():
341
                for rdesc in self._aspace[elem].references:
342
                    if rdesc.NodeId == item.NodeId:
343
                        self._aspace[elem].references.remove(rdesc)
344
345
        self._delete_node_callbacks(self._aspace[item.NodeId])
346
347
        del (self._aspace[item.NodeId])
348
349
        return ua.StatusCode()
350
351
    def _delete_node_callbacks(self, nodedata):
352
        if ua.AttributeIds.Value in nodedata.attributes:
353
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
354
                try:
355
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
356
                    self._aspace.delete_datachange_callback(handle)
357
                except Exception as ex:
358
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
359
                        ua.AttributeIds.Value, ex)
360
361
362
    async def add_references(self, refs, user=User(role=UserRole.Admin)):
363
        result = []
364
        for ref in refs:
365
            result.append(await self._add_reference(ref, user))
366
        return result
367
368
369
    async def try_add_references(self, refs, user=User(role=UserRole.Admin)):
370
        for ref in refs:
371
            ret = await self._add_reference(ref, user)
372
            if not ret.is_good():
373
                yield ref
374
375
    async def _add_reference(self, addref, user):
376
        sourcedata = self._aspace.get(addref.SourceNodeId)
377
        if sourcedata is None:
378
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
379
        if addref.TargetNodeId not in self._aspace:
380
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
381
        if user.role != UserRole.Admin:
382
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
383
        return await self._add_reference_no_check(sourcedata, addref)
384
385
    async def _add_reference_no_check(self, sourcedata, addref):
386
        rdesc = ua.ReferenceDescription()
387
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
388
        rdesc.IsForward = addref.IsForward
389
        rdesc.NodeId = addref.TargetNodeId
390
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
391
            rdesc.NodeClass = (await self._aspace.read_attribute_value(
392
                addref.TargetNodeId, ua.AttributeIds.NodeClass)).Value.Value
393
        else:
394
            rdesc.NodeClass = addref.TargetNodeClass
395
        bname = (await self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName)).Value.Value
396
        if bname:
397
            rdesc.BrowseName = bname
398
        dname = (await self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName)).Value.Value
399
        if dname:
400
            rdesc.DisplayUser = dname
401
        return self._add_unique_reference(sourcedata, rdesc)
402
403
    def delete_references(self, refs, user=User(role=UserRole.Admin)):
404
        result = []
405
        for ref in refs:
406
            result.append(self._delete_reference(ref, user))
407
        return result
408
409
    def _delete_unique_reference(self, item, invert=False):
410
        if invert:
411
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
412
        else:
413
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
414
        for rdesc in self._aspace[source].references:
415
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
416
                if rdesc.IsForward == forward:
417
                    self._aspace[source].references.remove(rdesc)
418
                    return ua.StatusCode()
419
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
420
421
    def _delete_reference(self, item, user):
422
        if item.SourceNodeId not in self._aspace:
423
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
424
        if item.TargetNodeId not in self._aspace:
425
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
426
        if item.ReferenceTypeId not in self._aspace:
427
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
428
        if user.role != UserRole.Admin:
429
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
430
431
        if item.DeleteBidirectional:
432
            self._delete_unique_reference(item, True)
433
        return self._delete_unique_reference(item)
434
435
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
436
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
437
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
438
            if add_timestamps:
439
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
440
                dv.SourceTimestamp = datetime.utcnow()
441
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
442
443
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
444
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
445
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
446
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
447
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
448
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
449
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
450
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
451
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
452
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
453
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
454
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
455
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
456
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
457
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
458
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
459
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
460
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
461
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
462
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
463
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
464
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
465
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
466
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
467
468
469
class MethodService:
470
471
    def __init__(self, aspace: "AddressSpace"):
472
        self.logger = logging.getLogger(__name__)
473
        self._aspace: "AddressSpace" = aspace
474
        self._pool = ThreadPoolExecutor()
475
476
    def stop(self):
477
        self._pool.shutdown()
478
479
    async def call(self, methods):
480
        results = []
481
        for method in methods:
482
            res = await self._call(method)
483
            results.append(res)
484
        return results
485
486
    async def _call(self, method):
487
        self.logger.info("Calling: %s", method)
488
        res = ua.CallMethodResult()
489
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
490
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
491
        else:
492
            node = self._aspace[method.MethodId]
493
            if node.call is None:
494
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
495
            else:
496
                try:
497
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
498
                except Exception:
499
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
500
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
501
                else:
502
                    if isinstance(result, ua.CallMethodResult):
503
                        res = result
504
                    elif isinstance(result, ua.StatusCode):
505
                        res.StatusCode = result
506
                    else:
507
                        res.OutputArguments = result
508
                    while len(res.InputArgumentResults) < len(method.InputArguments):
509
                        res.InputArgumentResults.append(ua.StatusCode())
510
        return res
511
512
    async def _run_method(self, func, parent, *args):
513
        if asyncio.iscoroutinefunction(func):
514
            return await func(parent, *args)
515
        p = partial(func, parent, *args)
516
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
517
        return res
518
519
520
class AddressSpace:
521
    """
522
    The address space object stores all the nodes of the OPC-UA server and helper methods.
523
    The methods are thread safe
524
    """
525
526
    def __init__(self):
527
        self.logger = logging.getLogger(__name__)
528
        self._nodes = {}
529
        self._datachange_callback_counter = 200
530
        self._handle_to_attribute_map = {}
531
        self._default_idx = 2
532
        self._nodeid_counter = {0: 20000, 1: 2000}
533
534
    def __getitem__(self, nodeid):
535
        return self._nodes.__getitem__(nodeid)
536
537
    def get(self, nodeid):
538
        return self._nodes.get(nodeid, None)
539
540
    def __setitem__(self, nodeid, value):
541
        return self._nodes.__setitem__(nodeid, value)
542
543
    def __contains__(self, nodeid):
544
        return self._nodes.__contains__(nodeid)
545
546
    def __delitem__(self, nodeid):
547
        self._nodes.__delitem__(nodeid)
548
549
    def generate_nodeid(self, idx=None):
550
        if idx is None:
551
            idx = self._default_idx
552
        if idx in self._nodeid_counter:
553
            self._nodeid_counter[idx] += 1
554
        else:
555
            # get the biggest identifier number from the existed nodes in address space
556
            identifier_list = sorted([
557
                nodeid.Identifier for nodeid in self._nodes.keys()
558
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
559
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
560
                )
561
            ])
562
            if identifier_list:
563
                self._nodeid_counter[idx] = identifier_list[-1]
564
            else:
565
                self._nodeid_counter[idx] = 1
566
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
567
        while True:
568
            if nodeid in self._nodes:
569
                nodeid = self.generate_nodeid(idx)
570
            else:
571
                return nodeid
572
573
    def keys(self):
574
        return self._nodes.keys()
575
576
    def empty(self):
577
        """Delete all nodes in address space"""
578
        self._nodes = {}
579
580
    def dump(self, path):
581
        """
582
        Dump address space as binary to file; note that server must be stopped for this method to work
583
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
584
        """
585
        # prepare nodes in address space for being serialized
586
        for nodeid, ndata in self._nodes.items():
587
            # if the node has a reference to a method call, remove it so the object can be serialized
588
            if ndata.call is not None:
589
                self._nodes[nodeid].call = None
590
591
        with open(path, 'wb') as f:
592
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
593
594
    def load(self, path):
595
        """
596
        Load address space from a binary file, overwriting everything in the current address space
597
        """
598
        with open(path, 'rb') as f:
599
            self._nodes = pickle.load(f)
600
601
    def make_aspace_shelf(self, path):
602
        """
603
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
604
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
605
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
606
        is currently NOT supported, it is only used for the default OPC UA standard address space
607
608
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
609
        """
610
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
611
            for nodeid, ndata in self._nodes.items():
612
                s[nodeid.to_string()] = ndata
613
614
    def load_aspace_shelf(self, path):
615
        """
616
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
617
        The dump() method can no longer be used if the address space is being loaded from a shelf
618
619
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
620
        """
621
        raise NotImplementedError
622
623
        # ToDo: async friendly implementation - load all at once?
624
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
625
            """
626
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
627
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
628
            is currently NOT supported
629
            """
630
631
            def __init__(self, source):
632
                self.source = source  # python shelf
633
                self.cache = {}  # internal dict
634
635
            def __getitem__(self, key):
636
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
637
                try:
638
                    return self.cache[key]
639
                except KeyError:
640
                    node = self.cache[key] = self.source[key.to_string()]
641
                    return node
642
643
            def __setitem__(self, key, value):
644
                # add a new item to the cache; if this item is in the shelf it is not updated
645
                self.cache[key] = value
646
647
            def __contains__(self, key):
648
                return key in self.cache or key.to_string() in self.source
649
650
            def __delitem__(self, key):
651
                # only deleting items from the cache is allowed
652
                del self.cache[key]
653
654
            def __iter__(self):
655
                # only the cache can be iterated over
656
                return iter(self.cache.keys())
657
658
            def __len__(self):
659
                # only returns the length of items in the cache, not unaccessed items in the shelf
660
                return len(self.cache)
661
662
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
663
664
    async def read_attribute_value(self, nodeid, attr):
665
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
666
        if nodeid not in self._nodes:
667
            dv = ua.DataValue()
668
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
669
            return dv
670
        node = self._nodes[nodeid]
671
        if attr not in node.attributes:
672
            dv = ua.DataValue()
673
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
674
            return dv
675
        attval = node.attributes[attr]
676
        if attval.value_callback:
677
            if asyncio.iscoroutinefunction(attval.value_callback):
678
                await attval.value_callback()
679
            else:
680
                return attval.value_callback()
681
        return attval.value
682
683
    async def write_attribute_value(self, nodeid, attr, value):
684
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
685
        node = self._nodes.get(nodeid, None)
686
        if node is None:
687
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
688
        attval = node.attributes.get(attr, None)
689
        if attval is None:
690
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
691
692
        old = attval.value
693
        attval.value = value
694
        cbs = []
695
        if old.Value != value.Value:  # only send call callback when a value change has happend
696
            cbs = list(attval.datachange_callbacks.items())
697
698
        for k, v in cbs:
699
            try:
700
                await v(k, value)
701
            except Exception as ex:
702
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
703
704
        return ua.StatusCode()
705
706
    def add_datachange_callback(self, nodeid, attr, callback):
707
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
708
        if nodeid not in self._nodes:
709
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
710
        node = self._nodes[nodeid]
711
        if attr not in node.attributes:
712
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
713
        attval = node.attributes[attr]
714
        self._datachange_callback_counter += 1
715
        handle = self._datachange_callback_counter
716
        attval.datachange_callbacks[handle] = callback
717
        self._handle_to_attribute_map[handle] = (nodeid, attr)
718
        return ua.StatusCode(), handle
719
720
    def delete_datachange_callback(self, handle):
721
        if handle in self._handle_to_attribute_map:
722
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
723
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
724
725
    def add_method_callback(self, methodid, callback):
726
        node = self._nodes[methodid]
727
        node.call = callback
728
729
    def add_value_callback_to_node(self, nodeid, attr, callback):
730
        if nodeid not in self._nodes:
731
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
732
        node = self._nodes[nodeid]
733
        if attr not in node.attributes:
734
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
735
        attval = self._nodes[nodeid].attributes[attr]
736
        attval.value_callback = callback
737
        return ua.StatusCode()
738