Passed
Pull Request — master (#144)
by
unknown
13:45
created

NodeManagementService._add_ref_from_parent()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 4
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    async def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(await self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    async def write(self, params, user=User.Admin):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user != User.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = await self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = await self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, direction, isforward):
140
        if direction == ua.BrowseDirection.Both:
141
            return True
142
        if direction == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if direction == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if path.StartingNode not in self._aspace:
159
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
160
            return res
161
        current = path.StartingNode
162
        for el in path.RelativePath.Elements:
163
            nodeid = self._find_element_in_node(el, current)
164
            if not nodeid:
165
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
166
                return res
167
            current = nodeid
168
        target = ua.BrowsePathTarget()
169
        target.TargetId = current
170
        target.RemainingPathIndex = 4294967295
171
        res.Targets = [target]
172
        return res
173
174
    def _find_element_in_node(self, el, nodeid):
175
        nodedata = self._aspace[nodeid]
176
        for ref in nodedata.references:
177
            # FIXME: here we should check other arguments!!
178
            if ref.BrowseName == el.TargetName:
179
                return ref.NodeId
180
        self.logger.info("element %s was not found in node %s", el, nodeid)
181
        return None
182
183
184
class NodeManagementService:
185
186
    def __init__(self, aspace: "AddressSpace"):
187
        self.logger = logging.getLogger(__name__)
188
        self._aspace: "AddressSpace" = aspace
189
190
    async def add_nodes(self, addnodeitems, user=User.Admin):
191
        results = []
192
        for item in addnodeitems:
193
            results.append(await self._add_node(item, user))
194
        return results
195
196
    async def try_add_nodes(self, addnodeitems, user=User.Admin, check=True):
197
        for item in addnodeitems:
198
            ret = await self._add_node(item, user, check=check)
199
            if not ret.StatusCode.is_good():
200
                yield item
201
202
    async def _add_node(self, item, user, check=True):
203
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
204
        result = ua.AddNodesResult()
205
206
        if not user == User.Admin:
207
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
208
            return result
209
210
        if item.RequestedNewNodeId.has_null_identifier():
211
            # If Identifier of requested NodeId is null we generate a new NodeId using
212
            # the namespace of the nodeid, this is an extention of the spec to allow
213
            # to requests the server to generate a new nodeid in a specified namespace
214
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
215
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
216
        else:
217
            if item.RequestedNewNodeId in self._aspace:
218
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
219
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
220
                return result
221
222
        if item.ParentNodeId.is_null():
223
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
224
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
225
            if check:
226
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
227
                return result
228
229
        parentdata = self._aspace.get(item.ParentNodeId)
230
        if parentdata is None and not item.ParentNodeId.is_null():
231
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
232
                item.RequestedNewNodeId, item.ParentNodeId)
233
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
234
            return result
235
236
        nodedata = NodeData(item.RequestedNewNodeId)
237
238
        self._add_node_attributes(nodedata, item, add_timestamps=check)
239
240
        # now add our node to db
241
        self._aspace[nodedata.nodeid] = nodedata
242
243
        if parentdata is not None:
244
            self._add_ref_from_parent(nodedata, item, parentdata)
245
            await self._add_ref_to_parent(nodedata, item, parentdata)
246
247
        # add type definition
248
        if item.TypeDefinition != ua.NodeId():
249
            await self._add_type_definition(nodedata, item)
250
251
        result.StatusCode = ua.StatusCode()
252
        result.AddedNodeId = nodedata.nodeid
253
254
        return result
255
256
    def _add_node_attributes(self, nodedata, item, add_timestamps):
257
        # add common attrs
258
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
259
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
260
        )
261
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
262
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
263
        )
264
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
265
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
266
        )
267
        # add requested attrs
268
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
269
270
    def _add_unique_reference(self, nodedata, desc):
271
        for r in nodedata.references:
272
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
273
                if r.IsForward != desc.IsForward:
274
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
275
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
276
                break  # ref already exists
277
        else:
278
            nodedata.references.append(desc)
279
        return ua.StatusCode()
280
281
    def _add_ref_from_parent(self, nodedata, item, parentdata):
282
        desc = ua.ReferenceDescription()
283
        desc.ReferenceTypeId = item.ReferenceTypeId
284
        desc.NodeId = nodedata.nodeid
285
        desc.NodeClass = item.NodeClass
286
        desc.BrowseName = item.BrowseName
287
        desc.DisplayName = item.NodeAttributes.DisplayName
288
        desc.TypeDefinition = item.TypeDefinition
289
        desc.IsForward = True
290
        self._add_unique_reference(parentdata, desc)
291
292
    async def _add_ref_to_parent(self, nodedata, item, parentdata):
293
        addref = ua.AddReferencesItem()
294
        addref.ReferenceTypeId = item.ReferenceTypeId
295
        addref.SourceNodeId = nodedata.nodeid
296
        addref.TargetNodeId = item.ParentNodeId
297
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
298
        addref.IsForward = False
299
        await self._add_reference_no_check(nodedata, addref)
300
301
    async def _add_type_definition(self, nodedata, item):
302
        addref = ua.AddReferencesItem()
303
        addref.SourceNodeId = nodedata.nodeid
304
        addref.IsForward = True
305
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
306
        addref.TargetNodeId = item.TypeDefinition
307
        addref.TargetNodeClass = ua.NodeClass.DataType
308
        await self._add_reference_no_check(nodedata, addref)
309
310
    def delete_nodes(self, deletenodeitems, user=User.Admin):
311
        results = []
312
        for item in deletenodeitems.NodesToDelete:
313
            results.append(self._delete_node(item, user))
314
        return results
315
316
    def _delete_node(self, item, user):
317
        if user != User.Admin:
318
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
319
320
        if item.NodeId not in self._aspace:
321
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
322
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
323
324
        if item.DeleteTargetReferences:
325
            for elem in self._aspace.keys():
326
                for rdesc in self._aspace[elem].references:
327
                    if rdesc.NodeId == item.NodeId:
328
                        self._aspace[elem].references.remove(rdesc)
329
330
        self._delete_node_callbacks(self._aspace[item.NodeId])
331
332
        del (self._aspace[item.NodeId])
333
334
        return ua.StatusCode()
335
336
    def _delete_node_callbacks(self, nodedata):
337
        if ua.AttributeIds.Value in nodedata.attributes:
338
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
339
                try:
340
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
341
                    self._aspace.delete_datachange_callback(handle)
342
                except Exception as ex:
343
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
344
                        ua.AttributeIds.Value, ex)
345
346
    async def add_references(self, refs, user=User.Admin):
347
        result = []
348
        for ref in refs:
349
            result.append(await self._add_reference(ref, user))
350
        return result
351
352
    async def try_add_references(self, refs, user=User.Admin):
353
        for ref in refs:
354
            ret = await self._add_reference(ref, user)
355
            if not ret.is_good():
356
                yield ref
357
358
    async def _add_reference(self, addref, user):
359
        sourcedata = self._aspace.get(addref.SourceNodeId)
360
        if sourcedata is None:
361
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
362
        if addref.TargetNodeId not in self._aspace:
363
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
364
        if user != User.Admin:
365
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
366
        return await self._add_reference_no_check(sourcedata, addref)
367
368
    async def _add_reference_no_check(self, sourcedata, addref):
369
        rdesc = ua.ReferenceDescription()
370
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
371
        rdesc.IsForward = addref.IsForward
372
        rdesc.NodeId = addref.TargetNodeId
373
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
374
            rdesc.NodeClass = (await self._aspace.get_attribute_value(
375
                addref.TargetNodeId, ua.AttributeIds.NodeClass)).Value.Value
376
        else:
377
            rdesc.NodeClass = addref.TargetNodeClass
378
        bname = (await self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName)).Value.Value
379
        if bname:
380
            rdesc.BrowseName = bname
381
        dname = (await self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName)).Value.Value
382
        if dname:
383
            rdesc.DisplayName = dname
384
        return self._add_unique_reference(sourcedata, rdesc)
385
386
    def delete_references(self, refs, user=User.Admin):
387
        result = []
388
        for ref in refs:
389
            result.append(self._delete_reference(ref, user))
390
        return result
391
392
    def _delete_unique_reference(self, item, invert=False):
393
        if invert:
394
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
395
        else:
396
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
397
        for rdesc in self._aspace[source].references:
398
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
399
                if rdesc.IsForward == forward:
400
                    self._aspace[source].references.remove(rdesc)
401
                    return ua.StatusCode()
402
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
403
404
    def _delete_reference(self, item, user):
405
        if item.SourceNodeId not in self._aspace:
406
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
407
        if item.TargetNodeId not in self._aspace:
408
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
409
        if item.ReferenceTypeId not in self._aspace:
410
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
411
        if user != User.Admin:
412
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
413
414
        if item.DeleteBidirectional:
415
            self._delete_unique_reference(item, True)
416
        return self._delete_unique_reference(item)
417
418
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
419
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
420
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
421
            if add_timestamps:
422
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
423
                dv.SourceTimestamp = datetime.utcnow()
424
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
425
426
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
427
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
428
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
429
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
430
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
431
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
432
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
433
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
434
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
435
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
436
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
437
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
438
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
439
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
440
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
441
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
442
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
443
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
444
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
445
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
446
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
447
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
448
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
449
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
450
451
452
class MethodService:
453
454
    def __init__(self, aspace: "AddressSpace"):
455
        self.logger = logging.getLogger(__name__)
456
        self._aspace: "AddressSpace" = aspace
457
        self._pool = ThreadPoolExecutor()
458
459
    def stop(self):
460
        self._pool.shutdown()
461
462
    async def call(self, methods):
463
        results = []
464
        for method in methods:
465
            res = await self._call(method)
466
            results.append(res)
467
        return results
468
469
    async def _call(self, method):
470
        self.logger.info("Calling: %s", method)
471
        res = ua.CallMethodResult()
472
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
473
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
474
        else:
475
            node = self._aspace[method.MethodId]
476
            if node.call is None:
477
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
478
            else:
479
                try:
480
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
481
                except Exception:
482
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
483
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
484
                else:
485
                    if isinstance(result, ua.CallMethodResult):
486
                        res = result
487
                    elif isinstance(result, ua.StatusCode):
488
                        res.StatusCode = result
489
                    else:
490
                        res.OutputArguments = result
491
                    while len(res.InputArgumentResults) < len(method.InputArguments):
492
                        res.InputArgumentResults.append(ua.StatusCode())
493
        return res
494
495
    async def _run_method(self, func, parent, *args):
496
        if asyncio.iscoroutinefunction(func):
497
            return await func(parent, *args)
498
        p = partial(func, parent, *args)
499
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
500
        return res
501
502
503
class AddressSpace:
504
    """
505
    The address space object stores all the nodes of the OPC-UA server and helper methods.
506
    The methods are thread safe
507
    """
508
509
    def __init__(self):
510
        self.logger = logging.getLogger(__name__)
511
        self._nodes = {}
512
        self._datachange_callback_counter = 200
513
        self._handle_to_attribute_map = {}
514
        self._default_idx = 2
515
        self._nodeid_counter = {0: 20000, 1: 2000}
516
517
    def __getitem__(self, nodeid):
518
        return self._nodes.__getitem__(nodeid)
519
520
    def get(self, nodeid):
521
        return self._nodes.get(nodeid, None)
522
523
    def __setitem__(self, nodeid, value):
524
        return self._nodes.__setitem__(nodeid, value)
525
526
    def __contains__(self, nodeid):
527
        return self._nodes.__contains__(nodeid)
528
529
    def __delitem__(self, nodeid):
530
        self._nodes.__delitem__(nodeid)
531
532
    def generate_nodeid(self, idx=None):
533
        if idx is None:
534
            idx = self._default_idx
535
        if idx in self._nodeid_counter:
536
            self._nodeid_counter[idx] += 1
537
        else:
538
            # get the biggest identifier number from the existed nodes in address space
539
            identifier_list = sorted([
540
                nodeid.Identifier for nodeid in self._nodes.keys()
541
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
542
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
543
                )
544
            ])
545
            if identifier_list:
546
                self._nodeid_counter[idx] = identifier_list[-1]
547
            else:
548
                self._nodeid_counter[idx] = 1
549
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
550
        while True:
551
            if nodeid in self._nodes:
552
                nodeid = self.generate_nodeid(idx)
553
            else:
554
                return nodeid
555
556
    def keys(self):
557
        return self._nodes.keys()
558
559
    def empty(self):
560
        """Delete all nodes in address space"""
561
        self._nodes = {}
562
563
    def dump(self, path):
564
        """
565
        Dump address space as binary to file; note that server must be stopped for this method to work
566
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
567
        """
568
        # prepare nodes in address space for being serialized
569
        for nodeid, ndata in self._nodes.items():
570
            # if the node has a reference to a method call, remove it so the object can be serialized
571
            if ndata.call is not None:
572
                self._nodes[nodeid].call = None
573
574
        with open(path, 'wb') as f:
575
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
576
577
    def load(self, path):
578
        """
579
        Load address space from a binary file, overwriting everything in the current address space
580
        """
581
        with open(path, 'rb') as f:
582
            self._nodes = pickle.load(f)
583
584
    def make_aspace_shelf(self, path):
585
        """
586
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
587
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
588
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
589
        is currently NOT supported, it is only used for the default OPC UA standard address space
590
591
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
592
        """
593
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
594
            for nodeid, ndata in self._nodes.items():
595
                s[nodeid.to_string()] = ndata
596
597
    def load_aspace_shelf(self, path):
598
        """
599
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
600
        The dump() method can no longer be used if the address space is being loaded from a shelf
601
602
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
603
        """
604
        raise NotImplementedError
605
606
        # ToDo: async friendly implementation - load all at once?
607
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
608
            """
609
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
610
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
611
            is currently NOT supported
612
            """
613
614
            def __init__(self, source):
615
                self.source = source  # python shelf
616
                self.cache = {}  # internal dict
617
618
            def __getitem__(self, key):
619
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
620
                try:
621
                    return self.cache[key]
622
                except KeyError:
623
                    node = self.cache[key] = self.source[key.to_string()]
624
                    return node
625
626
            def __setitem__(self, key, value):
627
                # add a new item to the cache; if this item is in the shelf it is not updated
628
                self.cache[key] = value
629
630
            def __contains__(self, key):
631
                return key in self.cache or key.to_string() in self.source
632
633
            def __delitem__(self, key):
634
                # only deleting items from the cache is allowed
635
                del self.cache[key]
636
637
            def __iter__(self):
638
                # only the cache can be iterated over
639
                return iter(self.cache.keys())
640
641
            def __len__(self):
642
                # only returns the length of items in the cache, not unaccessed items in the shelf
643
                return len(self.cache)
644
645
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
646
647
    async def get_attribute_value(self, nodeid, attr):
648
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
649
        if nodeid not in self._nodes:
650
            dv = ua.DataValue()
651
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
652
            return dv
653
        node = self._nodes[nodeid]
654
        if attr not in node.attributes:
655
            dv = ua.DataValue()
656
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
657
            return dv
658
        attval = node.attributes[attr]
659
        if attval.value_callback:
660
            if asyncio.iscoroutinefunction(attval.value_callback):
661
                await attval.value_callback()
662
            else:
663
                return attval.value_callback()
664
        return attval.value
665
666
    def set_attribute_value(self, nodeid, attr, value):
667
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
668
        node = self._nodes.get(nodeid, None)
669
        if node is None:
670
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
671
        attval = node.attributes.get(attr, None)
672
        if attval is None:
673
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
674
675
        old = attval.value
676
        attval.value = value
677
        cbs = []
678
        if old.Value != value.Value:  # only send call callback when a value change has happend
679
            cbs = list(attval.datachange_callbacks.items())
680
681
        for k, v in cbs:
682
            try:
683
                v(k, value)
684
            except Exception as ex:
685
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
686
687
        return ua.StatusCode()
688
689
    def add_datachange_callback(self, nodeid, attr, callback):
690
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
691
        if nodeid not in self._nodes:
692
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
693
        node = self._nodes[nodeid]
694
        if attr not in node.attributes:
695
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
696
        attval = node.attributes[attr]
697
        self._datachange_callback_counter += 1
698
        handle = self._datachange_callback_counter
699
        attval.datachange_callbacks[handle] = callback
700
        self._handle_to_attribute_map[handle] = (nodeid, attr)
701
        return ua.StatusCode(), handle
702
703
    def delete_datachange_callback(self, handle):
704
        if handle in self._handle_to_attribute_map:
705
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
706
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
707
708
    def add_method_callback(self, methodid, callback):
709
        node = self._nodes[methodid]
710
        node.call = callback
711
712
    def add_value_callback_to_node(self, nodeid, attr, callback):
713
        if nodeid not in self._nodes:
714
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
715
        node = self._nodes[nodeid]
716
        if attr not in node.attributes:
717
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
718
        attval = self._nodes[nodeid].attributes[attr]
719
        attval.value_callback = callback
720
        return ua.StatusCode()
721