Completed
Pull Request — master (#51)
by Olivier
121:54 queued 119:28
created

asyncua.server.address_space.MethodService._call()   C

Complexity

Conditions 9

Size

Total Lines 25
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 21
nop 2
dl 0
loc 25
rs 6.6666
c 0
b 0
f 0
1
import asyncio
2
import pickle
3
import shelve
4
import logging
5
import collections
6
from datetime import datetime
7
from concurrent.futures import ThreadPoolExecutor
8
from functools import partial
9
10
from asyncua import ua
11
from .users import User
12
13
_logger = logging.getLogger(__name__)
14
15
16
class AttributeValue(object):
17
18
    def __init__(self, value):
19
        self.value = value
20
        self.value_callback = None
21
        self.datachange_callbacks = {}
22
23
    def __str__(self):
24
        return f"AttributeValue({self.value})"
25
26
    __repr__ = __str__
27
28
29
class NodeData:
30
31
    def __init__(self, nodeid):
32
        self.nodeid = nodeid
33
        self.attributes = {}
34
        self.references = []
35
        self.call = None
36
37
    def __str__(self):
38
        return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})"
39
40
    __repr__ = __str__
41
42
43
class AttributeService:
44
45
    def __init__(self, aspace: "AddressSpace"):
46
        self.logger = logging.getLogger(__name__)
47
        self._aspace: "AddressSpace" = aspace
48
49
    def read(self, params):
50
        #self.logger.debug("read %s", params)
51
        res = []
52
        for readvalue in params.NodesToRead:
53
            res.append(self._aspace.get_attribute_value(readvalue.NodeId, readvalue.AttributeId))
54
        return res
55
56
    def write(self, params, user=User.Admin):
57
        #self.logger.debug("write %s as user %s", params, user)
58
        res = []
59
        for writevalue in params.NodesToWrite:
60
            if user != User.Admin:
61
                if writevalue.AttributeId != ua.AttributeIds.Value:
62
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
63
                    continue
64
                al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
65
                ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
66
                if not al.StatusCode.is_good() or not ua.ua_binary.test_bit(
67
                        al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(
68
                    ual.Value.Value, ua.AccessLevel.CurrentWrite):
69
                    res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
70
                    continue
71
            res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
72
        return res
73
74
75
class ViewService(object):
76
77
    def __init__(self, aspace: "AddressSpace"):
78
        self.logger = logging.getLogger(__name__)
79
        self._aspace: "AddressSpace" = aspace
80
81
    def browse(self, params):
82
        #self.logger.debug("browse %s", params)
83
        res = []
84
        for desc in params.NodesToBrowse:
85
            res.append(self._browse(desc))
86
        return res
87
88
    def _browse(self, desc):
89
        res = ua.BrowseResult()
90
        if desc.NodeId not in self._aspace:
91
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
92
            return res
93
        node = self._aspace[desc.NodeId]
94
        for ref in node.references:
95
            if not self._is_suitable_ref(desc, ref):
96
                continue
97
            res.References.append(ref)
98
        return res
99
100
    def _is_suitable_ref(self, desc, ref):
101
        if not self._suitable_direction(desc.BrowseDirection, ref.IsForward):
102
            #self.logger.debug("%s is not suitable due to direction", ref)
103
            return False
104
        if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes):
105
            #self.logger.debug("%s is not suitable due to type", ref)
106
            return False
107
        if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0):
108
            #self.logger.debug("%s is not suitable due to class", ref)
109
            return False
110
        #self.logger.debug("%s is a suitable ref for desc %s", ref, desc)
111
        return True
112
113
    def _suitable_reftype(self, ref1, ref2, subtypes):
114
        """
115
        """
116
        if ref1 == ua.NodeId(ua.ObjectIds.Null):
117
            # If ReferenceTypeId is not specified in the BrowseDescription,
118
            # all References are returned and includeSubtypes is ignored.
119
            return True
120
        if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype:
121
            return False
122
        if ref1.Identifier == ref2.Identifier:
123
            return True
124
        oktypes = self._get_sub_ref(ref1)
125
        if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes:
126
            oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype))
127
        return ref2 in oktypes
128
129
    def _get_sub_ref(self, ref):
130
        res = []
131
        nodedata = self._aspace[ref]
132
        if nodedata is not None:
133
            for ref in nodedata.references:
134
                if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward:
135
                    res.append(ref.NodeId)
136
                    res += self._get_sub_ref(ref.NodeId)
137
        return res
138
139
    def _suitable_direction(self, desc, isforward):
140
        if desc == ua.BrowseDirection.Both:
141
            return True
142
        if desc == ua.BrowseDirection.Forward and isforward:
143
            return True
144
        if desc == ua.BrowseDirection.Inverse and not isforward:
145
            return True
146
        return False
147
148
    def translate_browsepaths_to_nodeids(self, browsepaths):
149
        #self.logger.debug("translate browsepath: %s", browsepaths)
150
        results = []
151
        for path in browsepaths:
152
            results.append(self._translate_browsepath_to_nodeid(path))
153
        return results
154
155
    def _translate_browsepath_to_nodeid(self, path):
156
        #self.logger.debug("looking at path: %s", path)
157
        res = ua.BrowsePathResult()
158
        if path.StartingNode not in self._aspace:
159
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
160
            return res
161
        current = path.StartingNode
162
        for el in path.RelativePath.Elements:
163
            nodeid = self._find_element_in_node(el, current)
164
            if not nodeid:
165
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch)
166
                return res
167
            current = nodeid
168
        target = ua.BrowsePathTarget()
169
        target.TargetId = current
170
        target.RemainingPathIndex = 4294967295
171
        res.Targets = [target]
172
        return res
173
174
    def _find_element_in_node(self, el, nodeid):
175
        nodedata = self._aspace[nodeid]
176
        for ref in nodedata.references:
177
            # FIXME: here we should check other arguments!!
178
            if ref.BrowseName == el.TargetName:
179
                return ref.NodeId
180
        self.logger.info("element %s was not found in node %s", el, nodeid)
181
        return None
182
183
184
class NodeManagementService:
185
186
    def __init__(self, aspace: "AddressSpace"):
187
        self.logger = logging.getLogger(__name__)
188
        self._aspace: "AddressSpace" = aspace
189
190
    def add_nodes(self, addnodeitems, user=User.Admin):
191
        results = []
192
        for item in addnodeitems:
193
            results.append(self._add_node(item, user))
194
        return results
195
196
    def try_add_nodes(self, addnodeitems, user=User.Admin, check=True):
197
        for item in addnodeitems:
198
            ret = self._add_node(item, user, check=check)
199
            if not ret.StatusCode.is_good():
200
                yield item
201
202
    def _add_node(self, item, user, check=True):
203
        #self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
204
        result = ua.AddNodesResult()
205
206
        if not user == User.Admin:
207
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
208
            return result
209
210
        if item.RequestedNewNodeId.has_null_identifier():
211
            # If Identifier of requested NodeId is null we generate a new NodeId using
212
            # the namespace of the nodeid, this is an extention of the spec to allow
213
            # to requests the server to generate a new nodeid in a specified namespace
214
            #self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
215
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
216
        else:
217
            if item.RequestedNewNodeId in self._aspace:
218
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
219
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
220
                return result
221
222
        if item.ParentNodeId.is_null():
223
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
224
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
225
            if check:
226
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
227
                return result
228
229
        parentdata = self._aspace.get(item.ParentNodeId)
230
        if parentdata is None and not item.ParentNodeId.is_null():
231
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
232
                item.RequestedNewNodeId, item.ParentNodeId)
233
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
234
            return result
235
236
        nodedata = NodeData(item.RequestedNewNodeId)
237
238
        self._add_node_attributes(nodedata, item, add_timestamps=check)
239
240
        # now add our node to db
241
        self._aspace[nodedata.nodeid] = nodedata
242
243
        if parentdata is not None:
244
            self._add_ref_from_parent(nodedata, item, parentdata)
245
            self._add_ref_to_parent(nodedata, item, parentdata)
246
247
        # add type definition
248
        if item.TypeDefinition != ua.NodeId():
249
            self._add_type_definition(nodedata, item)
250
251
        result.StatusCode = ua.StatusCode()
252
        result.AddedNodeId = nodedata.nodeid
253
254
        return result
255
256
    def _add_node_attributes(self, nodedata, item, add_timestamps):
257
        # add common attrs
258
        nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
259
            ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
260
        )
261
        nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
262
            ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
263
        )
264
        nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
265
            ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
266
        )
267
        # add requested attrs
268
        self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
269
270
    def _add_unique_reference(self, nodedata, desc):
271
        for r in nodedata.references:
272
            if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId:
273
                if r.IsForward != desc.IsForward:
274
                    self.logger.error("Cannot add conflicting reference %s ", str(desc))
275
                    return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed)
276
                break  # ref already exists
277
        else:
278
            nodedata.references.append(desc)
279
        return ua.StatusCode()
280
281
    def _add_ref_from_parent(self, nodedata, item, parentdata):
282
        desc = ua.ReferenceDescription()
283
        desc.ReferenceTypeId = item.ReferenceTypeId
284
        desc.NodeId = nodedata.nodeid
285
        desc.NodeClass = item.NodeClass
286
        desc.BrowseName = item.BrowseName
287
        desc.DisplayName = item.NodeAttributes.DisplayName
288
        desc.TypeDefinition = item.TypeDefinition
289
        desc.IsForward = True
290
        self._add_unique_reference(parentdata, desc)
291
292
    def _add_ref_to_parent(self, nodedata, item, parentdata):
293
        addref = ua.AddReferencesItem()
294
        addref.ReferenceTypeId = item.ReferenceTypeId
295
        addref.SourceNodeId = nodedata.nodeid
296
        addref.TargetNodeId = item.ParentNodeId
297
        addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value
298
        addref.IsForward = False
299
        self._add_reference_no_check(nodedata, addref)
300
301
    def _add_type_definition(self, nodedata, item):
302
        addref = ua.AddReferencesItem()
303
        addref.SourceNodeId = nodedata.nodeid
304
        addref.IsForward = True
305
        addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition)
306
        addref.TargetNodeId = item.TypeDefinition
307
        addref.TargetNodeClass = ua.NodeClass.DataType
308
        self._add_reference_no_check(nodedata, addref)
309
310
    def delete_nodes(self, deletenodeitems, user=User.Admin):
311
        results = []
312
        for item in deletenodeitems.NodesToDelete:
313
            results.append(self._delete_node(item, user))
314
        return results
315
316
    def _delete_node(self, item, user):
317
        if user != User.Admin:
318
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
319
320
        if item.NodeId not in self._aspace:
321
            self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId)
322
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
323
324
        if item.DeleteTargetReferences:
325
            for elem in self._aspace.keys():
326
                for rdesc in self._aspace[elem].references:
327
                    if rdesc.NodeId == item.NodeId:
328
                        self._aspace[elem].references.remove(rdesc)
329
330
        self._delete_node_callbacks(self._aspace[item.NodeId])
331
332
        del (self._aspace[item.NodeId])
333
334
        return ua.StatusCode()
335
336
    def _delete_node_callbacks(self, nodedata):
337
        if ua.AttributeIds.Value in nodedata.attributes:
338
            for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()):
339
                try:
340
                    callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown))
341
                    self._aspace.delete_datachange_callback(handle)
342
                except Exception as ex:
343
                    self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
344
                        ua.AttributeIds.Value, ex)
345
346
    def add_references(self, refs, user=User.Admin):
347
        result = []
348
        for ref in refs:
349
            result.append(self._add_reference(ref, user))
350
        return result
351
352
    def try_add_references(self, refs, user=User.Admin):
353
        for ref in refs:
354
            if not self._add_reference(ref, user).is_good():
355
                yield ref
356
357
    def _add_reference(self, addref, user):
358
        sourcedata = self._aspace.get(addref.SourceNodeId)
359
        if sourcedata is None:
360
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
361
        if addref.TargetNodeId not in self._aspace:
362
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
363
        if user != User.Admin:
364
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
365
        return self._add_reference_no_check(sourcedata, addref)
366
367
    def _add_reference_no_check(self, sourcedata, addref):
368
        rdesc = ua.ReferenceDescription()
369
        rdesc.ReferenceTypeId = addref.ReferenceTypeId
370
        rdesc.IsForward = addref.IsForward
371
        rdesc.NodeId = addref.TargetNodeId
372
        if addref.TargetNodeClass == ua.NodeClass.Unspecified:
373
            rdesc.NodeClass = self._aspace.get_attribute_value(
374
                addref.TargetNodeId, ua.AttributeIds.NodeClass).Value.Value
375
        else:
376
            rdesc.NodeClass = addref.TargetNodeClass
377
        bname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value
378
        if bname:
379
            rdesc.BrowseName = bname
380
        dname = self._aspace.get_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
381
        if dname:
382
            rdesc.DisplayName = dname
383
        return self._add_unique_reference(sourcedata, rdesc)
384
385
    def delete_references(self, refs, user=User.Admin):
386
        result = []
387
        for ref in refs:
388
            result.append(self._delete_reference(ref, user))
389
        return result
390
391
    def _delete_unique_reference(self, item, invert=False):
392
        if invert:
393
            source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward
394
        else:
395
            source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward
396
        for rdesc in self._aspace[source].references:
397
            if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId:
398
                if rdesc.IsForward == forward:
399
                    self._aspace[source].references.remove(rdesc)
400
                    return ua.StatusCode()
401
        return ua.StatusCode(ua.StatusCodes.BadNotFound)
402
403
    def _delete_reference(self, item, user):
404
        if item.SourceNodeId not in self._aspace:
405
            return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
406
        if item.TargetNodeId not in self._aspace:
407
            return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
408
        if item.ReferenceTypeId not in self._aspace:
409
            return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
410
        if user != User.Admin:
411
            return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
412
413
        if item.DeleteBidirectional:
414
            self._delete_unique_reference(item, True)
415
        return self._delete_unique_reference(item)
416
417
    def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
418
        if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
419
            dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
420
            if add_timestamps:
421
                # dv.ServerTimestamp = datetime.utcnow()  # Disabled until someone explains us it should be there
422
                dv.SourceTimestamp = datetime.utcnow()
423
            nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
424
425
    def _add_nodeattributes(self, item, nodedata, add_timestamps):
426
        self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte)
427
        self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32)
428
        self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName)
429
        self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean)
430
        self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId)
431
        self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText)
432
        self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText)
433
        self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte)
434
        self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean)
435
        self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean)
436
        self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText)
437
        self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean)
438
        self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double)
439
        self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32)
440
        self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId)
441
        self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean)
442
        self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte)
443
        self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean)
444
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte)
445
        self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32)
446
        self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32)
447
        self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32)
448
        self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps)
449
450
451
class MethodService:
452
453
    def __init__(self, aspace: "AddressSpace"):
454
        self.logger = logging.getLogger(__name__)
455
        self._aspace: "AddressSpace" = aspace
456
        self._pool = ThreadPoolExecutor()
457
458
    def stop(self):
459
        self._pool.shutdown()
460
461
    async def call(self, methods):
462
        results = []
463
        for method in methods:
464
            res = await self._call(method)
465
            results.append(res)
466
        return results
467
468
    async def _call(self, method):
469
        self.logger.info("Calling: %s", method)
470
        res = ua.CallMethodResult()
471
        if method.ObjectId not in self._aspace or method.MethodId not in self._aspace:
472
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
473
        else:
474
            node = self._aspace[method.MethodId]
475
            if node.call is None:
476
                res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
477
            else:
478
                try:
479
                    result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
480
                except Exception:
481
                    self.logger.exception("Error executing method call %s, an exception was raised: ", method)
482
                    res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
483
                else:
484
                    if isinstance(result, ua.CallMethodResult):
485
                        res = result
486
                    elif isinstance(result, ua.StatusCode):
487
                        res.StatusCode = result
488
                    else:
489
                        res.OutputArguments = result
490
                    while len(res.InputArgumentResults) < len(method.InputArguments):
491
                        res.InputArgumentResults.append(ua.StatusCode())
492
        return res
493
494
    async def _run_method(self, func, parent, *args):
495
        if asyncio.iscoroutine(func):
496
            self.logger.warning("func %s is a coroutine, awaiting with args: %s", func, args)
497
            return await func(parent, *args)
498
        p = partial(func, parent, *args)
499
        self.logger.warning("func %s is a sync function, awaiting in executor %s with args: %s", func, self._pool, args)
500
        res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
501
        return res
502
503
504
505
class AddressSpace:
506
    """
507
    The address space object stores all the nodes of the OPC-UA server and helper methods.
508
    The methods are thread safe
509
    """
510
511
    def __init__(self):
512
        self.logger = logging.getLogger(__name__)
513
        self._nodes = {}
514
        self._datachange_callback_counter = 200
515
        self._handle_to_attribute_map = {}
516
        self._default_idx = 2
517
        self._nodeid_counter = {0: 20000, 1: 2000}
518
519
    def __getitem__(self, nodeid):
520
        return self._nodes.__getitem__(nodeid)
521
522
    def get(self, nodeid):
523
        return self._nodes.get(nodeid, None)
524
525
    def __setitem__(self, nodeid, value):
526
        return self._nodes.__setitem__(nodeid, value)
527
528
    def __contains__(self, nodeid):
529
        return self._nodes.__contains__(nodeid)
530
531
    def __delitem__(self, nodeid):
532
        self._nodes.__delitem__(nodeid)
533
534
    def generate_nodeid(self, idx=None):
535
        if idx is None:
536
            idx = self._default_idx
537
        if idx in self._nodeid_counter:
538
            self._nodeid_counter[idx] += 1
539
        else:
540
            # get the biggest identifier number from the existed nodes in address space
541
            identifier_list = sorted([
542
                nodeid.Identifier for nodeid in self._nodes.keys()
543
                if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in (
544
                    ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte
545
                )
546
            ])
547
            if identifier_list:
548
                self._nodeid_counter[idx] = identifier_list[-1]
549
            else:
550
                self._nodeid_counter[idx] = 1
551
        nodeid = ua.NodeId(self._nodeid_counter[idx], idx)
552
        while True:
553
            if nodeid in self._nodes:
554
                nodeid = self.generate_nodeid(idx)
555
            else:
556
                return nodeid
557
558
    def keys(self):
559
        return self._nodes.keys()
560
561
    def empty(self):
562
        """Delete all nodes in address space"""
563
        self._nodes = {}
564
565
    def dump(self, path):
566
        """
567
        Dump address space as binary to file; note that server must be stopped for this method to work
568
        DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!
569
        """
570
        # prepare nodes in address space for being serialized
571
        for nodeid, ndata in self._nodes.items():
572
            # if the node has a reference to a method call, remove it so the object can be serialized
573
            if ndata.call is not None:
574
                self._nodes[nodeid].call = None
575
576
        with open(path, 'wb') as f:
577
            pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL)
578
579
    def load(self, path):
580
        """
581
        Load address space from a binary file, overwriting everything in the current address space
582
        """
583
        with open(path, 'rb') as f:
584
            self._nodes = pickle.load(f)
585
586
    def make_aspace_shelf(self, path):
587
        """
588
        Make a shelf for containing the nodes from the standard address space; this is typically only done on first
589
        start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache
590
        by the LazyLoadingDict class when they are accessed. Saving data back to the shelf
591
        is currently NOT supported, it is only used for the default OPC UA standard address space
592
593
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
594
        """
595
        with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s:
596
            for nodeid, ndata in self._nodes.items():
597
                s[nodeid.to_string()] = ndata
598
599
    def load_aspace_shelf(self, path):
600
        """
601
        Load the standard address space nodes from a python shelve via LazyLoadingDict as needed.
602
        The dump() method can no longer be used if the address space is being loaded from a shelf
603
604
        Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time
605
        """
606
        raise NotImplementedError
607
608
        # ToDo: async friendly implementation - load all at once?
609
        class LazyLoadingDict(collections.MutableMapping):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable collections does not seem to be defined.
Loading history...
610
            """
611
            Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the
612
            shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf
613
            is currently NOT supported
614
            """
615
616
            def __init__(self, source):
617
                self.source = source  # python shelf
618
                self.cache = {}  # internal dict
619
620
            def __getitem__(self, key):
621
                # try to get the item (node) from the cache, if it isn't there get it from the shelf
622
                try:
623
                    return self.cache[key]
624
                except KeyError:
625
                    node = self.cache[key] = self.source[key.to_string()]
626
                    return node
627
628
            def __setitem__(self, key, value):
629
                # add a new item to the cache; if this item is in the shelf it is not updated
630
                self.cache[key] = value
631
632
            def __contains__(self, key):
633
                return key in self.cache or key.to_string() in self.source
634
635
            def __delitem__(self, key):
636
                # only deleting items from the cache is allowed
637
                del self.cache[key]
638
639
            def __iter__(self):
640
                # only the cache can be iterated over
641
                return iter(self.cache.keys())
642
643
            def __len__(self):
644
                # only returns the length of items in the cache, not unaccessed items in the shelf
645
                return len(self.cache)
646
647
        self._nodes = LazyLoadingDict(shelve.open(path, "r"))
648
649
    def get_attribute_value(self, nodeid, attr):
650
        # self.logger.debug("get attr val: %s %s", nodeid, attr)
651
        if nodeid not in self._nodes:
652
            dv = ua.DataValue()
653
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
654
            return dv
655
        node = self._nodes[nodeid]
656
        if attr not in node.attributes:
657
            dv = ua.DataValue()
658
            dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
659
            return dv
660
        attval = node.attributes[attr]
661
        if attval.value_callback:
662
            return attval.value_callback()
663
        return attval.value
664
665
    def set_attribute_value(self, nodeid, attr, value):
666
        # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
667
        node = self._nodes.get(nodeid, None)
668
        if node is None:
669
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)
670
        attval = node.attributes.get(attr, None)
671
        if attval is None:
672
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid)
673
674
        old = attval.value
675
        attval.value = value
676
        cbs = []
677
        if old.Value != value.Value:  # only send call callback when a value change has happend
678
            cbs = list(attval.datachange_callbacks.items())
679
680
        for k, v in cbs:
681
            try:
682
                v(k, value)
683
            except Exception as ex:
684
                self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
685
686
        return ua.StatusCode()
687
688
    def add_datachange_callback(self, nodeid, attr, callback):
689
        self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback)
690
        if nodeid not in self._nodes:
691
            return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0
692
        node = self._nodes[nodeid]
693
        if attr not in node.attributes:
694
            return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0
695
        attval = node.attributes[attr]
696
        self._datachange_callback_counter += 1
697
        handle = self._datachange_callback_counter
698
        attval.datachange_callbacks[handle] = callback
699
        self._handle_to_attribute_map[handle] = (nodeid, attr)
700
        return ua.StatusCode(), handle
701
702
    def delete_datachange_callback(self, handle):
703
        if handle in self._handle_to_attribute_map:
704
            nodeid, attr = self._handle_to_attribute_map.pop(handle)
705
            self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle)
706
707
    def add_method_callback(self, methodid, callback):
708
        node = self._nodes[methodid]
709
        node.call = callback
710