Completed
Pull Request — master (#144)
by
unknown
08:19
created

NodeManagementService.try_add_references()   A

Complexity

Conditions 3

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 3
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User, UserRole
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    async def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(await self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    async def write(self, params, user=User(role=UserRole.Admin)):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user.role != UserRole.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = await self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = await self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, direction, isforward):
140
        if direction == ua.BrowseDirection.Both:
141
            return True
142
        if direction == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if direction == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if not path.RelativePath.Elements[-1].TargetName:
159
            # OPC UA Part 4: Services, 5.8.4 TranslateBrowsePathsToNodeIds
160
            # it's unclear if this the check should also handle empty strings
161
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameInvalid)
162
            return res
163
        if path.StartingNode not in self._aspace:
164
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
165
            return res
166
        current = path.StartingNode
167
        for el in path.RelativePath.Elements:
168
            nodeid = self._find_element_in_node(el, current)
169
            if not nodeid:
170
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
171
                return res
172
            current = nodeid
173
        target = ua.BrowsePathTarget()
174
        target.TargetId = current
175
        target.RemainingPathIndex = 4294967295
176
        res.Targets = [target]
177
        return res
178
179
    def _find_element_in_node(self, el, nodeid):
180
        nodedata = self._aspace[nodeid]
181
        for ref in nodedata.references:
182
            if ref.BrowseName != el.TargetName:
183
                continue
184
            if ref.IsForward == el.IsInverse:
185
                continue
186
            if not el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
187
                continue
188
            elif el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId:
189
                if ref.ReferenceTypeId not in self._get_sub_ref(el.ReferenceTypeId):
190
                    continue
191
            return ref.NodeId
192
        self.logger.info("element %s was not found in node %s", el, nodeid)
193
        return None
194
195
196
class NodeManagementService:
197
198
    def __init__(self, aspace: "AddressSpace"):
199
        self.logger = logging.getLogger(__name__)
200
        self._aspace: "AddressSpace" = aspace
201
202
    async def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)):
203
        results = []
204
        for item in addnodeitems:
205
            results.append(await self._add_node(item, user))
206
        return results
207
208
    async def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True):
209
        for item in addnodeitems:
210
            ret = await self._add_node(item, user, check=check)
211
            if not ret.StatusCode.is_good():
212
                yield item
213
214
    async def _add_node(self, item, user, check=True):
215
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
216
        result = ua.AddNodesResult()
217
218
        if not user.role == UserRole.Admin:
219
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
220
            return result
221
222
        if item.RequestedNewNodeId.has_null_identifier():
223
            # If Identifier of requested NodeId is null we generate a new NodeId using
224
            # the namespace of the nodeid, this is an extention of the spec to allow
225
            # to requests the server to generate a new nodeid in a specified namespace
226
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
227
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
228
        else:
229
            if item.RequestedNewNodeId in self._aspace:
230
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
231
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
232
                return result
233
234
        if item.ParentNodeId.is_null():
235
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
236
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
237
            if check:
238
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
239
                return result
240
241
        parentdata = self._aspace.get(item.ParentNodeId)
242
        if parentdata is None and not item.ParentNodeId.is_null():
243
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
244
                item.RequestedNewNodeId, item.ParentNodeId)
245
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
246
            return result
247
248
        nodedata = NodeData(item.RequestedNewNodeId)
249
250
        self._add_node_attributes(nodedata, item, add_timestamps=check)
251
252
        # now add our node to db
253
        self._aspace[nodedata.nodeid] = nodedata
254
255
        if parentdata is not None:
256
            self._add_ref_from_parent(nodedata, item, parentdata)
257
            await self._add_ref_to_parent(nodedata, item, parentdata)
258
259
        # add type definition
260
        if item.TypeDefinition != ua.NodeId():
261
            await self._add_type_definition(nodedata, item)
262
263
        result.StatusCode = ua.StatusCode()
264
        result.AddedNodeId = nodedata.nodeid
265
266
        return result
267
268
    def _add_node_attributes(self, nodedata, item, add_timestamps):
269
        # add common attrs
270
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
271
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
272
        )
273
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
274
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
275
        )
276
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
277
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
278
        )
279
        # add requested attrs
280
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
281
282
    def _add_unique_reference(self, nodedata, desc):
283
        for r in nodedata.references:
284
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
285
                if r.IsForward != desc.IsForward:
286
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
287
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
288
                break  # ref already exists
289
        else:
290
            nodedata.references.append(desc)
291
        return ua.StatusCode()
292
293
    def _add_ref_from_parent(self, nodedata, item, parentdata):
294
        desc = ua.ReferenceDescription()
295
        desc.ReferenceTypeId = item.ReferenceTypeId
296
        desc.NodeId = nodedata.nodeid
297
        desc.NodeClass = item.NodeClass
298
        desc.BrowseName = item.BrowseName
299
        desc.DisplayName = item.NodeAttributes.DisplayName
300
        desc.TypeDefinition = item.TypeDefinition
301
        desc.IsForward = True
302
        self._add_unique_reference(parentdata, desc)
303
304
    async def _add_ref_to_parent(self, nodedata, item, parentdata):
305
        addref = ua.AddReferencesItem()
306
        addref.ReferenceTypeId = item.ReferenceTypeId
307
        addref.SourceNodeId = nodedata.nodeid
308
        addref.TargetNodeId = item.ParentNodeId
309
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
310
        addref.IsForward = False
311
        await self._add_reference_no_check(nodedata, addref)
312
313
    async def _add_type_definition(self, nodedata, item):
314
        addref = ua.AddReferencesItem()
315
        addref.SourceNodeId = nodedata.nodeid
316
        addref.IsForward = True
317
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
318
        addref.TargetNodeId = item.TypeDefinition
319
        addref.TargetNodeClass = ua.NodeClass.DataType
320
        await self._add_reference_no_check(nodedata, addref)
321
322
    def delete_nodes(self, deletenodeitems, user=User(role=UserRole.Admin)):
323
        results = []
324
        for item in deletenodeitems.NodesToDelete:
325
            results.append(self._delete_node(item, user))
326
        return results
327
328
    def _delete_node(self, item, user):
329
        if user.role != UserRole.Admin:
330
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
331
332
        if item.NodeId not in self._aspace:
333
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
334
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
335
336
        if item.DeleteTargetReferences:
337
            for elem in self._aspace.keys():
338
                for rdesc in self._aspace[elem].references:
339
                    if rdesc.NodeId == item.NodeId:
340
                        self._aspace[elem].references.remove(rdesc)
341
342
        self._delete_node_callbacks(self._aspace[item.NodeId])
343
344
        del (self._aspace[item.NodeId])
345
346
        return ua.StatusCode()
347
348
    def _delete_node_callbacks(self, nodedata):
349
        if ua.AttributeIds.Value in nodedata.attributes:
350
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
351
                try:
352
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
353
                    self._aspace.delete_datachange_callback(handle)
354
                except Exception as ex:
355
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
356
                        ua.AttributeIds.Value, ex)
357
358
    async def add_references(self, refs, user=User(role=UserRole.Admin)):
359
        result = []
360
        for ref in refs:
361
            result.append(await self._add_reference(ref, user))
362
        return result
363
364
    async def try_add_references(self, refs, user=User(role=UserRole.Admin)):
365
        for ref in refs:
366
            ret = await self._add_reference(ref, user)
367
            if not ret.is_good():
368
                yield ref
369
370
    async def _add_reference(self, addref, user):
371
        sourcedata = self._aspace.get(addref.SourceNodeId)
372
        if sourcedata is None:
373
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
374
        if addref.TargetNodeId not in self._aspace:
375
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
376
        if user.role != UserRole.Admin:
377
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
378
        return await self._add_reference_no_check(sourcedata, addref)
379
380
    async def _add_reference_no_check(self, sourcedata, addref):
381
        rdesc = ua.ReferenceDescription()
382
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
383
        rdesc.IsForward = addref.IsForward
384
        rdesc.NodeId = addref.TargetNodeId
385
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
386
            rdesc.NodeClass = (await self._aspace.read_attribute_value(
387
                addref.TargetNodeId, ua.AttributeIds.NodeClass)).Value.Value
388
        else:
389
            rdesc.NodeClass = addref.TargetNodeClass
390
        bname = (await self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName)).Value.Value
391
        if bname:
392
            rdesc.BrowseName = bname
393
        dname = (await self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName)).Value.Value
394
        if dname:
395
            rdesc.DisplayUser = dname
396
        return self._add_unique_reference(sourcedata, rdesc)
397
398
    def delete_references(self, refs, user=User(role=UserRole.Admin)):
399
        result = []
400
        for ref in refs:
401
            result.append(self._delete_reference(ref, user))
402
        return result
403
404
    def _delete_unique_reference(self, item, invert=False):
405
        if invert:
406
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
407
        else:
408
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
409
        for rdesc in self._aspace[source].references:
410
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
411
                if rdesc.IsForward == forward:
412
                    self._aspace[source].references.remove(rdesc)
413
                    return ua.StatusCode()
414
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
415
416
    def _delete_reference(self, item, user):
417
        if item.SourceNodeId not in self._aspace:
418
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
419
        if item.TargetNodeId not in self._aspace:
420
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
421
        if item.ReferenceTypeId not in self._aspace:
422
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
423
        if user.role != UserRole.Admin:
424
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
425
426
        if item.DeleteBidirectional:
427
            self._delete_unique_reference(item, True)
428
        return self._delete_unique_reference(item)
429
430
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
431
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
432
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
433
            if add_timestamps:
434
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
435
                dv.SourceTimestamp = datetime.utcnow()
436
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
437
438
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
439
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
440
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
441
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
442
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
443
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
444
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
445
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
446
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
447
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
448
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
449
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
450
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
451
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
452
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
453
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
454
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
455
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
456
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
457
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
458
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
459
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
460
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
461
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
462
463
464
class MethodService:
465
466
    def __init__(self, aspace: "AddressSpace"):
467
        self.logger = logging.getLogger(__name__)
468
        self._aspace: "AddressSpace" = aspace
469
        self._pool = ThreadPoolExecutor()
470
471
    def stop(self):
472
        self._pool.shutdown()
473
474
    async def call(self, methods):
475
        results = []
476
        for method in methods:
477
            res = await self._call(method)
478
            results.append(res)
479
        return results
480
481
    async def _call(self, method):
482
        self.logger.info("Calling: %s", method)
483
        res = ua.CallMethodResult()
484
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
485
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
486
        else:
487
            node = self._aspace[method.MethodId]
488
            if node.call is None:
489
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
490
            else:
491
                try:
492
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
493
                except Exception:
494
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
495
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
496
                else:
497
                    if isinstance(result, ua.CallMethodResult):
498
                        res = result
499
                    elif isinstance(result, ua.StatusCode):
500
                        res.StatusCode = result
501
                    else:
502
                        res.OutputArguments = result
503
                    while len(res.InputArgumentResults) < len(method.InputArguments):
504
                        res.InputArgumentResults.append(ua.StatusCode())
505
        return res
506
507
    async def _run_method(self, func, parent, *args):
508
        if asyncio.iscoroutinefunction(func):
509
            return await func(parent, *args)
510
        p = partial(func, parent, *args)
511
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
512
        return res
513
514
515
class AddressSpace:
516
    """
517
    The address space object stores all the nodes of the OPC-UA server and helper methods.
518
    The methods are thread safe
519
    """
520
521
    def __init__(self):
522
        self.logger = logging.getLogger(__name__)
523
        self._nodes = {}
524
        self._datachange_callback_counter = 200
525
        self._handle_to_attribute_map = {}
526
        self._default_idx = 2
527
        self._nodeid_counter = {0: 20000, 1: 2000}
528
529
    def __getitem__(self, nodeid):
530
        return self._nodes.__getitem__(nodeid)
531
532
    def get(self, nodeid):
533
        return self._nodes.get(nodeid, None)
534
535
    def __setitem__(self, nodeid, value):
536
        return self._nodes.__setitem__(nodeid, value)
537
538
    def __contains__(self, nodeid):
539
        return self._nodes.__contains__(nodeid)
540
541
    def __delitem__(self, nodeid):
542
        self._nodes.__delitem__(nodeid)
543
544
    def generate_nodeid(self, idx=None):
545
        if idx is None:
546
            idx = self._default_idx
547
        if idx in self._nodeid_counter:
548
            self._nodeid_counter[idx] += 1
549
        else:
550
            # get the biggest identifier number from the existed nodes in address space
551
            identifier_list = sorted([
552
                nodeid.Identifier for nodeid in self._nodes.keys()
553
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
554
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
555
                )
556
            ])
557
            if identifier_list:
558
                self._nodeid_counter[idx] = identifier_list[-1]
559
            else:
560
                self._nodeid_counter[idx] = 1
561
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
562
        while True:
563
            if nodeid in self._nodes:
564
                nodeid = self.generate_nodeid(idx)
565
            else:
566
                return nodeid
567
568
    def keys(self):
569
        return self._nodes.keys()
570
571
    def empty(self):
572
        """Delete all nodes in address space"""
573
        self._nodes = {}
574
575
    def dump(self, path):
576
        """
577
        Dump address space as binary to file; note that server must be stopped for this method to work
578
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
579
        """
580
        # prepare nodes in address space for being serialized
581
        for nodeid, ndata in self._nodes.items():
582
            # if the node has a reference to a method call, remove it so the object can be serialized
583
            if ndata.call is not None:
584
                self._nodes[nodeid].call = None
585
586
        with open(path, 'wb') as f:
587
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
588
589
    def load(self, path):
590
        """
591
        Load address space from a binary file, overwriting everything in the current address space
592
        """
593
        with open(path, 'rb') as f:
594
            self._nodes = pickle.load(f)
595
596
    def make_aspace_shelf(self, path):
597
        """
598
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
599
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
600
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
601
        is currently NOT supported, it is only used for the default OPC UA standard address space
602
603
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
604
        """
605
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
606
            for nodeid, ndata in self._nodes.items():
607
                s[nodeid.to_string()] = ndata
608
609
    def load_aspace_shelf(self, path):
610
        """
611
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
612
        The dump() method can no longer be used if the address space is being loaded from a shelf
613
614
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
615
        """
616
        raise NotImplementedError
617
618
        # ToDo: async friendly implementation - load all at once?
619
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
620
            """
621
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
622
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
623
            is currently NOT supported
624
            """
625
626
            def __init__(self, source):
627
                self.source = source  # python shelf
628
                self.cache = {}  # internal dict
629
630
            def __getitem__(self, key):
631
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
632
                try:
633
                    return self.cache[key]
634
                except KeyError:
635
                    node = self.cache[key] = self.source[key.to_string()]
636
                    return node
637
638
            def __setitem__(self, key, value):
639
                # add a new item to the cache; if this item is in the shelf it is not updated
640
                self.cache[key] = value
641
642
            def __contains__(self, key):
643
                return key in self.cache or key.to_string() in self.source
644
645
            def __delitem__(self, key):
646
                # only deleting items from the cache is allowed
647
                del self.cache[key]
648
649
            def __iter__(self):
650
                # only the cache can be iterated over
651
                return iter(self.cache.keys())
652
653
            def __len__(self):
654
                # only returns the length of items in the cache, not unaccessed items in the shelf
655
                return len(self.cache)
656
657
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
658
659
    async def read_attribute_value(self, nodeid, attr):
660
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
661
        if nodeid not in self._nodes:
662
            dv = ua.DataValue()
663
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
664
            return dv
665
        node = self._nodes[nodeid]
666
        if attr not in node.attributes:
667
            dv = ua.DataValue()
668
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
669
            return dv
670
        attval = node.attributes[attr]
671
        if attval.value_callback:
672
            if asyncio.iscoroutinefunction(attval.value_callback):
673
                await attval.value_callback()
674
            else:
675
                return attval.value_callback()
676
        return attval.value
677
678
    async def write_attribute_value(self, nodeid, attr, value):
679
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
680
        node = self._nodes.get(nodeid, None)
681
        if node is None:
682
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
683
        attval = node.attributes.get(attr, None)
684
        if attval is None:
685
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
686
687
        old = attval.value
688
        attval.value = value
689
        cbs = []
690
        if old.Value != value.Value:  # only send call callback when a value change has happend
691
            cbs = list(attval.datachange_callbacks.items())
692
693
        for k, v in cbs:
694
            try:
695
                await v(k, value)
696
            except Exception as ex:
697
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
698
699
        return ua.StatusCode()
700
701
    def add_datachange_callback(self, nodeid, attr, callback):
702
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
703
        if nodeid not in self._nodes:
704
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
705
        node = self._nodes[nodeid]
706
        if attr not in node.attributes:
707
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
708
        attval = node.attributes[attr]
709
        self._datachange_callback_counter += 1
710
        handle = self._datachange_callback_counter
711
        attval.datachange_callbacks[handle] = callback
712
        self._handle_to_attribute_map[handle] = (nodeid, attr)
713
        return ua.StatusCode(), handle
714
715
    def delete_datachange_callback(self, handle):
716
        if handle in self._handle_to_attribute_map:
717
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
718
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
719
720
    def add_method_callback(self, methodid, callback):
721
        node = self._nodes[methodid]
722
        node.call = callback
723
724
    def add_value_callback_to_node(self, nodeid, attr, callback):
725
        if nodeid not in self._nodes:
726
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
727
        node = self._nodes[nodeid]
728
        if attr not in node.attributes:
729
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
730
        attval = self._nodes[nodeid].attributes[attr]
731
        attval.value_callback = callback
732
        return ua.StatusCode()
733