Passed
Push — master ( e09c16...3cc1cc )
by Olivier
02:23
created

asyncua.server.address_space.MethodService.stop()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    def write(self, params, user=User.Admin):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user != User.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, desc, isforward):
140
        if desc == ua.BrowseDirection.Both:
141
            return True
142
        if desc == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if desc == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if path.StartingNode not in self._aspace:
159
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
160
            return res
161
        current = path.StartingNode
162
        for el in path.RelativePath.Elements:
163
            nodeid = self._find_element_in_node(el, current)
164
            if not nodeid:
165
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
166
                return res
167
            current = nodeid
168
        target = ua.BrowsePathTarget()
169
        target.TargetId = current
170
        target.RemainingPathIndex = 4294967295
171
        res.Targets = [target]
172
        return res
173
174
    def _find_element_in_node(self, el, nodeid):
175
        nodedata = self._aspace[nodeid]
176
        for ref in nodedata.references:
177
            # FIXME: here we should check other arguments!!
178
            if ref.BrowseName == el.TargetName:
179
                return ref.NodeId
180
        self.logger.info("element %s was not found in node %s", el, nodeid)
181
        return None
182
183
184
class NodeManagementService:
185
186
    def __init__(self, aspace: "AddressSpace"):
187
        self.logger = logging.getLogger(__name__)
188
        self._aspace: "AddressSpace" = aspace
189
190
    def add_nodes(self, addnodeitems, user=User.Admin):
191
        results = []
192
        for item in addnodeitems:
193
            results.append(self._add_node(item, user))
194
        return results
195
196
    def try_add_nodes(self, addnodeitems, user=User.Admin, check=True):
197
        for item in addnodeitems:
198
            ret = self._add_node(item, user, check=check)
199
            if not ret.StatusCode.is_good():
200
                yield item
201
202
    def _add_node(self, item, user, check=True):
203
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
204
        result = ua.AddNodesResult()
205
206
        if not user == User.Admin:
207
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
208
            return result
209
210
        if item.RequestedNewNodeId.has_null_identifier():
211
            # If Identifier of requested NodeId is null we generate a new NodeId using
212
            # the namespace of the nodeid, this is an extention of the spec to allow
213
            # to requests the server to generate a new nodeid in a specified namespace
214
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
215
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
216
        else:
217
            if item.RequestedNewNodeId in self._aspace:
218
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
219
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
220
                return result
221
222
        if item.ParentNodeId.is_null():
223
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
224
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
225
            if check:
226
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
227
                return result
228
229
        parentdata = self._aspace.get(item.ParentNodeId)
230
        if parentdata is None and not item.ParentNodeId.is_null():
231
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
232
                item.RequestedNewNodeId, item.ParentNodeId)
233
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
234
            return result
235
236
        nodedata = NodeData(item.RequestedNewNodeId)
237
238
        self._add_node_attributes(nodedata, item, add_timestamps=check)
239
240
        # now add our node to db
241
        self._aspace[nodedata.nodeid] = nodedata
242
243
        if parentdata is not None:
244
            self._add_ref_from_parent(nodedata, item, parentdata)
245
            self._add_ref_to_parent(nodedata, item, parentdata)
246
247
        # add type definition
248
        if item.TypeDefinition != ua.NodeId():
249
            self._add_type_definition(nodedata, item)
250
251
        result.StatusCode = ua.StatusCode()
252
        result.AddedNodeId = nodedata.nodeid
253
254
        return result
255
256
    def _add_node_attributes(self, nodedata, item, add_timestamps):
257
        # add common attrs
258
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
259
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
260
        )
261
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
262
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
263
        )
264
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
265
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
266
        )
267
        # add requested attrs
268
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
269
270
    def _add_unique_reference(self, nodedata, desc):
271
        for r in nodedata.references:
272
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
273
                if r.IsForward != desc.IsForward:
274
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
275
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
276
                break  # ref already exists
277
        else:
278
            nodedata.references.append(desc)
279
        return ua.StatusCode()
280
281
    def _add_ref_from_parent(self, nodedata, item, parentdata):
282
        desc = ua.ReferenceDescription()
283
        desc.ReferenceTypeId = item.ReferenceTypeId
284
        desc.NodeId = nodedata.nodeid
285
        desc.NodeClass = item.NodeClass
286
        desc.BrowseName = item.BrowseName
287
        desc.DisplayName = item.NodeAttributes.DisplayName
288
        desc.TypeDefinition = item.TypeDefinition
289
        desc.IsForward = True
290
        self._add_unique_reference(parentdata, desc)
291
292
    def _add_ref_to_parent(self, nodedata, item, parentdata):
293
        addref = ua.AddReferencesItem()
294
        addref.ReferenceTypeId = item.ReferenceTypeId
295
        addref.SourceNodeId = nodedata.nodeid
296
        addref.TargetNodeId = item.ParentNodeId
297
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
298
        addref.IsForward = False
299
        self._add_reference_no_check(nodedata, addref)
300
301
    def _add_type_definition(self, nodedata, item):
302
        addref = ua.AddReferencesItem()
303
        addref.SourceNodeId = nodedata.nodeid
304
        addref.IsForward = True
305
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
306
        addref.TargetNodeId = item.TypeDefinition
307
        addref.TargetNodeClass = ua.NodeClass.DataType
308
        self._add_reference_no_check(nodedata, addref)
309
310
    def delete_nodes(self, deletenodeitems, user=User.Admin):
311
        results = []
312
        for item in deletenodeitems.NodesToDelete:
313
            results.append(self._delete_node(item, user))
314
        return results
315
316
    def _delete_node(self, item, user):
317
        if user != User.Admin:
318
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
319
320
        if item.NodeId not in self._aspace:
321
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
322
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
323
324
        if item.DeleteTargetReferences:
325
            for elem in self._aspace.keys():
326
                for rdesc in self._aspace[elem].references:
327
                    if rdesc.NodeId == item.NodeId:
328
                        self._aspace[elem].references.remove(rdesc)
329
330
        self._delete_node_callbacks(self._aspace[item.NodeId])
331
332
        del (self._aspace[item.NodeId])
333
334
        return ua.StatusCode()
335
336
    def _delete_node_callbacks(self, nodedata):
337
        if ua.AttributeIds.Value in nodedata.attributes:
338
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
339
                try:
340
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
341
                    self._aspace.delete_datachange_callback(handle)
342
                except Exception as ex:
343
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
344
                        ua.AttributeIds.Value, ex)
345
346
    def add_references(self, refs, user=User.Admin):
347
        result = []
348
        for ref in refs:
349
            result.append(self._add_reference(ref, user))
350
        return result
351
352
    def try_add_references(self, refs, user=User.Admin):
353
        for ref in refs:
354
            if not self._add_reference(ref, user).is_good():
355
                yield ref
356
357
    def _add_reference(self, addref, user):
358
        sourcedata = self._aspace.get(addref.SourceNodeId)
359
        if sourcedata is None:
360
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
361
        if addref.TargetNodeId not in self._aspace:
362
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
363
        if user != User.Admin:
364
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
365
        return self._add_reference_no_check(sourcedata, addref)
366
367
    def _add_reference_no_check(self, sourcedata, addref):
368
        rdesc = ua.ReferenceDescription()
369
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
370
        rdesc.IsForward = addref.IsForward
371
        rdesc.NodeId = addref.TargetNodeId
372
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
373
            rdesc.NodeClass = self._aspace.get_attribute_value(
374
                addref.TargetNodeId, ua.AttributeIds.NodeClass).Value.Value
375
        else:
376
            rdesc.NodeClass = addref.TargetNodeClass
377
        bname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
378
        if bname:
379
            rdesc.BrowseName = bname
380
        dname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
381
        if dname:
382
            rdesc.DisplayName = dname
383
        return self._add_unique_reference(sourcedata, rdesc)
384
385
    def delete_references(self, refs, user=User.Admin):
386
        result = []
387
        for ref in refs:
388
            result.append(self._delete_reference(ref, user))
389
        return result
390
391
    def _delete_unique_reference(self, item, invert=False):
392
        if invert:
393
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
394
        else:
395
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
396
        for rdesc in self._aspace[source].references:
397
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
398
                if rdesc.IsForward == forward:
399
                    self._aspace[source].references.remove(rdesc)
400
                    return ua.StatusCode()
401
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
402
403
    def _delete_reference(self, item, user):
404
        if item.SourceNodeId not in self._aspace:
405
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
406
        if item.TargetNodeId not in self._aspace:
407
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
408
        if item.ReferenceTypeId not in self._aspace:
409
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
410
        if user != User.Admin:
411
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
412
413
        if item.DeleteBidirectional:
414
            self._delete_unique_reference(item, True)
415
        return self._delete_unique_reference(item)
416
417
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
418
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
419
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
420
            if add_timestamps:
421
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
422
                dv.SourceTimestamp = datetime.utcnow()
423
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
424
425
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
426
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
427
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
428
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
429
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
430
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
431
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
432
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
433
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
434
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
435
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
436
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
437
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
438
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
439
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
440
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
441
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
442
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
443
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
444
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
445
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
446
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
447
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
448
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
449
450
451
class MethodService:
452
453
    def __init__(self, aspace: "AddressSpace"):
454
        self.logger = logging.getLogger(__name__)
455
        self._aspace: "AddressSpace" = aspace
456
        self._pool = ThreadPoolExecutor()
457
458
    def stop(self):
459
        self._pool.shutdown()
460
461
    async def call(self, methods):
462
        results = []
463
        for method in methods:
464
            res = await self._call(method)
465
            results.append(res)
466
        return results
467
468
    async def _call(self, method):
469
        self.logger.info("Calling: %s", method)
470
        res = ua.CallMethodResult()
471
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
472
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
473
        else:
474
            node = self._aspace[method.MethodId]
475
            if node.call is None:
476
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
477
            else:
478
                try:
479
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
480
                except Exception:
481
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
482
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
483
                else:
484
                    if isinstance(result, ua.CallMethodResult):
485
                        res = result
486
                    elif isinstance(result, ua.StatusCode):
487
                        res.StatusCode = result
488
                    else:
489
                        res.OutputArguments = result
490
                    while len(res.InputArgumentResults) < len(method.InputArguments):
491
                        res.InputArgumentResults.append(ua.StatusCode())
492
        return res
493
494
    async def _run_method(self, func, parent, *args):
495
        if asyncio.iscoroutinefunction(func):
496
            return await func(parent, *args)
497
        p = partial(func, parent, *args)
498
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
499
        return res
500
501
502
class AddressSpace:
503
    """
504
    The address space object stores all the nodes of the OPC-UA server and helper methods.
505
    The methods are thread safe
506
    """
507
508
    def __init__(self):
509
        self.logger = logging.getLogger(__name__)
510
        self._nodes = {}
511
        self._datachange_callback_counter = 200
512
        self._handle_to_attribute_map = {}
513
        self._default_idx = 2
514
        self._nodeid_counter = {0: 20000, 1: 2000}
515
516
    def __getitem__(self, nodeid):
517
        return self._nodes.__getitem__(nodeid)
518
519
    def get(self, nodeid):
520
        return self._nodes.get(nodeid, None)
521
522
    def __setitem__(self, nodeid, value):
523
        return self._nodes.__setitem__(nodeid, value)
524
525
    def __contains__(self, nodeid):
526
        return self._nodes.__contains__(nodeid)
527
528
    def __delitem__(self, nodeid):
529
        self._nodes.__delitem__(nodeid)
530
531
    def generate_nodeid(self, idx=None):
532
        if idx is None:
533
            idx = self._default_idx
534
        if idx in self._nodeid_counter:
535
            self._nodeid_counter[idx] += 1
536
        else:
537
            # get the biggest identifier number from the existed nodes in address space
538
            identifier_list = sorted([
539
                nodeid.Identifier for nodeid in self._nodes.keys()
540
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
541
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
542
                )
543
            ])
544
            if identifier_list:
545
                self._nodeid_counter[idx] = identifier_list[-1]
546
            else:
547
                self._nodeid_counter[idx] = 1
548
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
549
        while True:
550
            if nodeid in self._nodes:
551
                nodeid = self.generate_nodeid(idx)
552
            else:
553
                return nodeid
554
555
    def keys(self):
556
        return self._nodes.keys()
557
558
    def empty(self):
559
        """Delete all nodes in address space"""
560
        self._nodes = {}
561
562
    def dump(self, path):
563
        """
564
        Dump address space as binary to file; note that server must be stopped for this method to work
565
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
566
        """
567
        # prepare nodes in address space for being serialized
568
        for nodeid, ndata in self._nodes.items():
569
            # if the node has a reference to a method call, remove it so the object can be serialized
570
            if ndata.call is not None:
571
                self._nodes[nodeid].call = None
572
573
        with open(path, 'wb') as f:
574
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
575
576
    def load(self, path):
577
        """
578
        Load address space from a binary file, overwriting everything in the current address space
579
        """
580
        with open(path, 'rb') as f:
581
            self._nodes = pickle.load(f)
582
583
    def make_aspace_shelf(self, path):
584
        """
585
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
586
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
587
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
588
        is currently NOT supported, it is only used for the default OPC UA standard address space
589
590
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
591
        """
592
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
593
            for nodeid, ndata in self._nodes.items():
594
                s[nodeid.to_string()] = ndata
595
596
    def load_aspace_shelf(self, path):
597
        """
598
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
599
        The dump() method can no longer be used if the address space is being loaded from a shelf
600
601
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
602
        """
603
        raise NotImplementedError
604
605
        # ToDo: async friendly implementation - load all at once?
606
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
607
            """
608
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
609
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
610
            is currently NOT supported
611
            """
612
613
            def __init__(self, source):
614
                self.source = source  # python shelf
615
                self.cache = {}  # internal dict
616
617
            def __getitem__(self, key):
618
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
619
                try:
620
                    return self.cache[key]
621
                except KeyError:
622
                    node = self.cache[key] = self.source[key.to_string()]
623
                    return node
624
625
            def __setitem__(self, key, value):
626
                # add a new item to the cache; if this item is in the shelf it is not updated
627
                self.cache[key] = value
628
629
            def __contains__(self, key):
630
                return key in self.cache or key.to_string() in self.source
631
632
            def __delitem__(self, key):
633
                # only deleting items from the cache is allowed
634
                del self.cache[key]
635
636
            def __iter__(self):
637
                # only the cache can be iterated over
638
                return iter(self.cache.keys())
639
640
            def __len__(self):
641
                # only returns the length of items in the cache, not unaccessed items in the shelf
642
                return len(self.cache)
643
644
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
645
646
    def get_attribute_value(self, nodeid, attr):
647
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
648
        if nodeid not in self._nodes:
649
            dv = ua.DataValue()
650
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
651
            return dv
652
        node = self._nodes[nodeid]
653
        if attr not in node.attributes:
654
            dv = ua.DataValue()
655
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
656
            return dv
657
        attval = node.attributes[attr]
658
        if attval.value_callback:
659
            return attval.value_callback()
660
        return attval.value
661
662
    def set_attribute_value(self, nodeid, attr, value):
663
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
664
        node = self._nodes.get(nodeid, None)
665
        if node is None:
666
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
667
        attval = node.attributes.get(attr, None)
668
        if attval is None:
669
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
670
671
        old = attval.value
672
        attval.value = value
673
        cbs = []
674
        if old.Value != value.Value:  # only send call callback when a value change has happend
675
            cbs = list(attval.datachange_callbacks.items())
676
677
        for k, v in cbs:
678
            try:
679
                v(k, value)
680
            except Exception as ex:
681
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
682
683
        return ua.StatusCode()
684
685
    def add_datachange_callback(self, nodeid, attr, callback):
686
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
687
        if nodeid not in self._nodes:
688
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
689
        node = self._nodes[nodeid]
690
        if attr not in node.attributes:
691
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
692
        attval = node.attributes[attr]
693
        self._datachange_callback_counter += 1
694
        handle = self._datachange_callback_counter
695
        attval.datachange_callbacks[handle] = callback
696
        self._handle_to_attribute_map[handle] = (nodeid, attr)
697
        return ua.StatusCode(), handle
698
699
    def delete_datachange_callback(self, handle):
700
        if handle in self._handle_to_attribute_map:
701
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
702
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
703
704
    def add_method_callback(self, methodid, callback):
705
        node = self._nodes[methodid]
706
        node.call = callback
707