Completed
Pull Request — master (#91)
by Olivier
05:25
created

NodeManagementService.try_add_references()   A

Complexity

Conditions 3

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 4
nop 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    def write(self, params, user=User.Admin):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user != User.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, desc, isforward):
140
        if desc == ua.BrowseDirection.Both:
141
            return True
142
        if desc == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if desc == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if path.StartingNode not in self._aspace:
159
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
160
            return res
161
        current = path.StartingNode
162
        for el in path.RelativePath.Elements:
163
            nodeid = self._find_element_in_node(el, current)
164
            if not nodeid:
165
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
166
                return res
167
            current = nodeid
168
        target = ua.BrowsePathTarget()
169
        target.TargetId = current
170
        target.RemainingPathIndex = 4294967295
171
        res.Targets = [target]
172
        return res
173
174
    def _find_element_in_node(self, el, nodeid):
175
        nodedata = self._aspace[nodeid]
176
        for ref in nodedata.references:
177
            # FIXME: here we should check other arguments!!
178
            if ref.BrowseName == el.TargetName:
179
                return ref.NodeId
180
        self.logger.info("element %s was not found in node %s", el, nodeid)
181
        return None
182
183
184
class NodeManagementService:
185
186
    def __init__(self, aspace: "AddressSpace"):
187
        self.logger = logging.getLogger(__name__)
188
        self._aspace: "AddressSpace" = aspace
189
190
    def add_nodes(self, addnodeitems, user=User.Admin):
191
        results = []
192
        for item in addnodeitems:
193
            results.append(self._add_node(item, user))
194
        return results
195
196
    def try_add_nodes(self, addnodeitems, user=User.Admin, check=True):
197
        for item in addnodeitems:
198
            ret = self._add_node(item, user, check=check)
199
            if not ret.StatusCode.is_good():
200
                yield item
201
202
    def _add_node(self, item, user, check=True):
203
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
204
        result = ua.AddNodesResult()
205
206
        if not user == User.Admin:
207
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
208
            return result
209
210
        if item.RequestedNewNodeId.has_null_identifier():
211
            # If Identifier of requested NodeId is null we generate a new NodeId using
212
            # the namespace of the nodeid, this is an extention of the spec to allow
213
            # to requests the server to generate a new nodeid in a specified namespace
214
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
215
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
216
        else:
217
            if item.RequestedNewNodeId in self._aspace:
218
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
219
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
220
                return result
221
222
        if item.ParentNodeId.is_null():
223
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
224
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
225
            if check:
226
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
227
                return result
228
229
        parentdata = self._aspace.get(item.ParentNodeId)
230
        if parentdata is None and not item.ParentNodeId.is_null():
231
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
232
                item.RequestedNewNodeId, item.ParentNodeId)
233
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
234
            return result
235
236
        nodedata = NodeData(item.RequestedNewNodeId)
237
238
        self._add_node_attributes(nodedata, item, add_timestamps=check)
239
240
        # now add our node to db
241
        self._aspace[nodedata.nodeid] = nodedata
242
243
        if parentdata is not None:
244
            self._add_ref_from_parent(nodedata, item, parentdata)
245
            self._add_ref_to_parent(nodedata, item, parentdata)
246
247
        # add type definition
248
        if item.TypeDefinition != ua.NodeId():
249
            self._add_type_definition(nodedata, item)
250
251
        result.StatusCode = ua.StatusCode()
252
        result.AddedNodeId = nodedata.nodeid
253
254
        return result
255
256
    def _add_node_attributes(self, nodedata, item, add_timestamps):
257
        # add common attrs
258
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
259
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
260
        )
261
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
262
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
263
        )
264
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
265
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
266
        )
267
        # add requested attrs
268
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
269
270
    def _add_unique_reference(self, nodedata, desc):
271
        for r in nodedata.references:
272
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
273
                if r.IsForward != desc.IsForward:
274
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
275
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
276
                break  # ref already exists
277
        else:
278
            nodedata.references.append(desc)
279
        return ua.StatusCode()
280
281
    def _add_ref_from_parent(self, nodedata, item, parentdata):
282
        desc = ua.ReferenceDescription()
283
        desc.ReferenceTypeId = item.ReferenceTypeId
284
        desc.NodeId = nodedata.nodeid
285
        desc.NodeClass = item.NodeClass
286
        desc.BrowseName = item.BrowseName
287
        desc.DisplayName = item.NodeAttributes.DisplayName
288
        desc.TypeDefinition = item.TypeDefinition
289
        desc.IsForward = True
290
        self._add_unique_reference(parentdata, desc)
291
292
    def _add_ref_to_parent(self, nodedata, item, parentdata):
293
        addref = ua.AddReferencesItem()
294
        addref.ReferenceTypeId = item.ReferenceTypeId
295
        addref.SourceNodeId = nodedata.nodeid
296
        addref.TargetNodeId = item.ParentNodeId
297
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
298
        addref.IsForward = False
299
        self._add_reference_no_check(nodedata, addref)
300
301
    def _add_type_definition(self, nodedata, item):
302
        addref = ua.AddReferencesItem()
303
        addref.SourceNodeId = nodedata.nodeid
304
        addref.IsForward = True
305
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
306
        addref.TargetNodeId = item.TypeDefinition
307
        addref.TargetNodeClass = ua.NodeClass.DataType
308
        self._add_reference_no_check(nodedata, addref)
309
310
    def delete_nodes(self, deletenodeitems, user=User.Admin):
311
        results = []
312
        for item in deletenodeitems.NodesToDelete:
313
            results.append(self._delete_node(item, user))
314
        return results
315
316
    def _delete_node(self, item, user):
317
        if user != User.Admin:
318
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
319
320
        if item.NodeId not in self._aspace:
321
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
322
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
323
324
        if item.DeleteTargetReferences:
325
            for elem in self._aspace.keys():
326
                for rdesc in self._aspace[elem].references:
327
                    if rdesc.NodeId == item.NodeId:
328
                        self._aspace[elem].references.remove(rdesc)
329
330
        self._delete_node_callbacks(self._aspace[item.NodeId])
331
332
        del (self._aspace[item.NodeId])
333
334
        return ua.StatusCode()
335
336
    def _delete_node_callbacks(self, nodedata):
337
        if ua.AttributeIds.Value in nodedata.attributes:
338
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
339
                try:
340
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
341
                    self._aspace.delete_datachange_callback(handle)
342
                except Exception as ex:
343
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
344
                        ua.AttributeIds.Value, ex)
345
346
    def add_references(self, refs, user=User.Admin):
347
        result = []
348
        for ref in refs:
349
            result.append(self._add_reference(ref, user))
350
        return result
351
352
    def try_add_references(self, refs, user=User.Admin):
353
        for ref in refs:
354
            if not self._add_reference(ref, user).is_good():
355
                yield ref
356
357
    def _add_reference(self, addref, user):
358
        sourcedata = self._aspace.get(addref.SourceNodeId)
359
        if sourcedata is None:
360
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
361
        if addref.TargetNodeId not in self._aspace:
362
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
363
        if user != User.Admin:
364
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
365
        return self._add_reference_no_check(sourcedata, addref)
366
367
    def _add_reference_no_check(self, sourcedata, addref):
368
        rdesc = ua.ReferenceDescription()
369
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
370
        rdesc.IsForward = addref.IsForward
371
        rdesc.NodeId = addref.TargetNodeId
372
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
373
            rdesc.NodeClass = self._aspace.get_attribute_value(
374
                addref.TargetNodeId, ua.AttributeIds.NodeClass).Value.Value
375
        else:
376
            rdesc.NodeClass = addref.TargetNodeClass
377
        bname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
378
        if bname:
379
            rdesc.BrowseName = bname
380
        dname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
381
        if dname:
382
            rdesc.DisplayName = dname
383
        return self._add_unique_reference(sourcedata, rdesc)
384
385
    def delete_references(self, refs, user=User.Admin):
386
        result = []
387
        for ref in refs:
388
            result.append(self._delete_reference(ref, user))
389
        return result
390
391
    def _delete_unique_reference(self, item, invert=False):
392
        if invert:
393
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
394
        else:
395
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
396
        for rdesc in self._aspace[source].references:
397
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
398
                if rdesc.IsForward == forward:
399
                    self._aspace[source].references.remove(rdesc)
400
                    return ua.StatusCode()
401
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
402
403
    def _delete_reference(self, item, user):
404
        if item.SourceNodeId not in self._aspace:
405
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
406
        if item.TargetNodeId not in self._aspace:
407
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
408
        if item.ReferenceTypeId not in self._aspace:
409
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
410
        if user != User.Admin:
411
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
412
413
        if item.DeleteBidirectional:
414
            self._delete_unique_reference(item, True)
415
        return self._delete_unique_reference(item)
416
417
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
418
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
419
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
420
            if add_timestamps:
421
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
422
                dv.SourceTimestamp = datetime.utcnow()
423
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
424
425
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
426
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
427
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
428
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
429
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
430
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
431
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
432
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
433
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
434
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
435
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
436
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
437
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
438
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
439
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
440
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
441
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
442
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
443
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
444
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
445
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
446
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
447
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
448
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
449
450
451
class MethodService:
452
453
    def __init__(self, aspace: "AddressSpace"):
454
        self.logger = logging.getLogger(__name__)
455
        self._aspace: "AddressSpace" = aspace
456
        self._pool = ThreadPoolExecutor()
457
458
    def stop(self):
459
        self._pool.shutdown()
460
461
    async def call(self, methods):
462
        results = []
463
        for method in methods:
464
            res = await self._call(method)
465
            results.append(res)
466
        return results
467
468
    async def _call(self, method):
469
        self.logger.info("Calling: %s", method)
470
        res = ua.CallMethodResult()
471
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
472
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
473
        else:
474
            node = self._aspace[method.MethodId]
475
            if node.call is None:
476
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
477
            else:
478
                try:
479
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
480
                except Exception:
481
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
482
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
483
                else:
484
                    if isinstance(result, ua.CallMethodResult):
485
                        res = result
486
                    elif isinstance(result, ua.StatusCode):
487
                        res.StatusCode = result
488
                    else:
489
                        res.OutputArguments = result
490
                    while len(res.InputArgumentResults) < len(method.InputArguments):
491
                        res.InputArgumentResults.append(ua.StatusCode())
492
        return res
493
494
    async def _run_method(self, func, parent, *args):
495
        if asyncio.iscoroutinefunction(func):
496
            self.logger.warning("func %s is a coroutine, awaiting with args: %s", func, args)
497
            return await func(parent, *args)
498
        p = partial(func, parent, *args)
499
        self.logger.warning("func %s is a sync function, awaiting in executor %s with args: %s", func, self._pool, args)
500
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
501
        return res
502
503
504
class AddressSpace:
505
    """
506
    The address space object stores all the nodes of the OPC-UA server and helper methods.
507
    The methods are thread safe
508
    """
509
510
    def __init__(self):
511
        self.logger = logging.getLogger(__name__)
512
        self._nodes = {}
513
        self._datachange_callback_counter = 200
514
        self._handle_to_attribute_map = {}
515
        self._default_idx = 2
516
        self._nodeid_counter = {0: 20000, 1: 2000}
517
518
    def __getitem__(self, nodeid):
519
        return self._nodes.__getitem__(nodeid)
520
521
    def get(self, nodeid):
522
        return self._nodes.get(nodeid, None)
523
524
    def __setitem__(self, nodeid, value):
525
        return self._nodes.__setitem__(nodeid, value)
526
527
    def __contains__(self, nodeid):
528
        return self._nodes.__contains__(nodeid)
529
530
    def __delitem__(self, nodeid):
531
        self._nodes.__delitem__(nodeid)
532
533
    def generate_nodeid(self, idx=None):
534
        if idx is None:
535
            idx = self._default_idx
536
        if idx in self._nodeid_counter:
537
            self._nodeid_counter[idx] += 1
538
        else:
539
            # get the biggest identifier number from the existed nodes in address space
540
            identifier_list = sorted([
541
                nodeid.Identifier for nodeid in self._nodes.keys()
542
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
543
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
544
                )
545
            ])
546
            if identifier_list:
547
                self._nodeid_counter[idx] = identifier_list[-1]
548
            else:
549
                self._nodeid_counter[idx] = 1
550
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
551
        while True:
552
            if nodeid in self._nodes:
553
                nodeid = self.generate_nodeid(idx)
554
            else:
555
                return nodeid
556
557
    def keys(self):
558
        return self._nodes.keys()
559
560
    def empty(self):
561
        """Delete all nodes in address space"""
562
        self._nodes = {}
563
564
    def dump(self, path):
565
        """
566
        Dump address space as binary to file; note that server must be stopped for this method to work
567
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
568
        """
569
        # prepare nodes in address space for being serialized
570
        for nodeid, ndata in self._nodes.items():
571
            # if the node has a reference to a method call, remove it so the object can be serialized
572
            if ndata.call is not None:
573
                self._nodes[nodeid].call = None
574
575
        with open(path, 'wb') as f:
576
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
577
578
    def load(self, path):
579
        """
580
        Load address space from a binary file, overwriting everything in the current address space
581
        """
582
        with open(path, 'rb') as f:
583
            self._nodes = pickle.load(f)
584
585
    def make_aspace_shelf(self, path):
586
        """
587
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
588
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
589
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
590
        is currently NOT supported, it is only used for the default OPC UA standard address space
591
592
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
593
        """
594
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
595
            for nodeid, ndata in self._nodes.items():
596
                s[nodeid.to_string()] = ndata
597
598
    def load_aspace_shelf(self, path):
599
        """
600
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
601
        The dump() method can no longer be used if the address space is being loaded from a shelf
602
603
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
604
        """
605
        raise NotImplementedError
606
607
        # ToDo: async friendly implementation - load all at once?
608
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
609
            """
610
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
611
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
612
            is currently NOT supported
613
            """
614
615
            def __init__(self, source):
616
                self.source = source  # python shelf
617
                self.cache = {}  # internal dict
618
619
            def __getitem__(self, key):
620
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
621
                try:
622
                    return self.cache[key]
623
                except KeyError:
624
                    node = self.cache[key] = self.source[key.to_string()]
625
                    return node
626
627
            def __setitem__(self, key, value):
628
                # add a new item to the cache; if this item is in the shelf it is not updated
629
                self.cache[key] = value
630
631
            def __contains__(self, key):
632
                return key in self.cache or key.to_string() in self.source
633
634
            def __delitem__(self, key):
635
                # only deleting items from the cache is allowed
636
                del self.cache[key]
637
638
            def __iter__(self):
639
                # only the cache can be iterated over
640
                return iter(self.cache.keys())
641
642
            def __len__(self):
643
                # only returns the length of items in the cache, not unaccessed items in the shelf
644
                return len(self.cache)
645
646
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
647
648
    def get_attribute_value(self, nodeid, attr):
649
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
650
        if nodeid not in self._nodes:
651
            dv = ua.DataValue()
652
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
653
            return dv
654
        node = self._nodes[nodeid]
655
        if attr not in node.attributes:
656
            dv = ua.DataValue()
657
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
658
            return dv
659
        attval = node.attributes[attr]
660
        if attval.value_callback:
661
            return attval.value_callback()
662
        return attval.value
663
664
    def set_attribute_value(self, nodeid, attr, value):
665
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
666
        node = self._nodes.get(nodeid, None)
667
        if node is None:
668
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
669
        attval = node.attributes.get(attr, None)
670
        if attval is None:
671
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
672
673
        old = attval.value
674
        attval.value = value
675
        cbs = []
676
        if old.Value != value.Value:  # only send call callback when a value change has happend
677
            cbs = list(attval.datachange_callbacks.items())
678
679
        for k, v in cbs:
680
            try:
681
                v(k, value)
682
            except Exception as ex:
683
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
684
685
        return ua.StatusCode()
686
687
    def add_datachange_callback(self, nodeid, attr, callback):
688
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
689
        if nodeid not in self._nodes:
690
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
691
        node = self._nodes[nodeid]
692
        if attr not in node.attributes:
693
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
694
        attval = node.attributes[attr]
695
        self._datachange_callback_counter += 1
696
        handle = self._datachange_callback_counter
697
        attval.datachange_callbacks[handle] = callback
698
        self._handle_to_attribute_map[handle] = (nodeid, attr)
699
        return ua.StatusCode(), handle
700
701
    def delete_datachange_callback(self, handle):
702
        if handle in self._handle_to_attribute_map:
703
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
704
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
705
706
    def add_method_callback(self, methodid, callback):
707
        node = self._nodes[methodid]
708
        node.call = callback
709