1 | import asyncio |
||
2 | import pickle |
||
3 | import shelve |
||
4 | import logging |
||
5 | import collections |
||
6 | from datetime import datetime |
||
7 | from concurrent.futures import ThreadPoolExecutor |
||
8 | from functools import partial |
||
9 | |||
10 | from asyncua import ua |
||
11 | from .users import User, UserRole |
||
12 | |||
13 | _logger = logging.getLogger(__name__) |
||
14 | |||
15 | |||
16 | class AttributeValue(object): |
||
17 | |||
18 | def __init__(self, value): |
||
19 | self.value = value |
||
20 | self.value_callback = None |
||
21 | self.datachange_callbacks = {} |
||
22 | |||
23 | def __str__(self): |
||
24 | return f"AttributeValue({self.value})" |
||
25 | |||
26 | __repr__ = __str__ |
||
27 | |||
28 | |||
29 | class NodeData: |
||
30 | |||
31 | def __init__(self, nodeid): |
||
32 | self.nodeid = nodeid |
||
33 | self.attributes = {} |
||
34 | self.references = [] |
||
35 | self.call = None |
||
36 | |||
37 | def __str__(self): |
||
38 | return f"NodeData(id:{self.nodeid}, attrs:{self.attributes}, refs:{self.references})" |
||
39 | |||
40 | __repr__ = __str__ |
||
41 | |||
42 | |||
43 | class AttributeService: |
||
44 | |||
45 | def __init__(self, aspace: "AddressSpace"): |
||
46 | self.logger = logging.getLogger(__name__) |
||
47 | self._aspace: "AddressSpace" = aspace |
||
48 | |||
49 | def read(self, params): |
||
50 | # self.logger.debug("read %s", params) |
||
51 | res = [] |
||
52 | for readvalue in params.NodesToRead: |
||
53 | res.append(self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId)) |
||
54 | return res |
||
55 | |||
56 | async def write(self, params, user=User(role=UserRole.Admin)): |
||
57 | # self.logger.debug("write %s as user %s", params, user) |
||
58 | res = [] |
||
59 | for writevalue in params.NodesToWrite: |
||
60 | if user.role != UserRole.Admin: |
||
61 | if writevalue.AttributeId != ua.AttributeIds.Value: |
||
62 | res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)) |
||
63 | continue |
||
64 | al = self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel) |
||
65 | ual = self._aspace.read_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel) |
||
66 | if not al.StatusCode.is_good() or not ua.ua_binary.test_bit( |
||
67 | al.Value.Value, ua.AccessLevel.CurrentWrite) or not \ |
||
68 | ua.ua_binary.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite): |
||
69 | res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)) |
||
70 | continue |
||
71 | res.append( |
||
72 | await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value)) |
||
73 | return res |
||
74 | |||
75 | |||
76 | class ViewService(object): |
||
77 | |||
78 | def __init__(self, aspace: "AddressSpace"): |
||
79 | self.logger = logging.getLogger(__name__) |
||
80 | self._aspace: "AddressSpace" = aspace |
||
81 | |||
82 | def browse(self, params): |
||
83 | # self.logger.debug("browse %s", params) |
||
84 | res = [] |
||
85 | for desc in params.NodesToBrowse: |
||
86 | res.append(self._browse(desc)) |
||
87 | return res |
||
88 | |||
89 | def _browse(self, desc): |
||
90 | res = ua.BrowseResult() |
||
91 | if desc.NodeId not in self._aspace: |
||
92 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid) |
||
93 | return res |
||
94 | node = self._aspace[desc.NodeId] |
||
95 | for ref in node.references: |
||
96 | if not self._is_suitable_ref(desc, ref): |
||
97 | continue |
||
98 | res.References.append(ref) |
||
99 | return res |
||
100 | |||
101 | def _is_suitable_ref(self, desc, ref): |
||
102 | if not self._suitable_direction(desc.BrowseDirection, ref.IsForward): |
||
103 | # self.logger.debug("%s is not suitable due to direction", ref) |
||
104 | return False |
||
105 | if not self._suitable_reftype(desc.ReferenceTypeId, ref.ReferenceTypeId, desc.IncludeSubtypes): |
||
106 | # self.logger.debug("%s is not suitable due to type", ref) |
||
107 | return False |
||
108 | if desc.NodeClassMask and ((desc.NodeClassMask & ref.NodeClass) == 0): |
||
109 | # self.logger.debug("%s is not suitable due to class", ref) |
||
110 | return False |
||
111 | # self.logger.debug("%s is a suitable ref for desc %s", ref, desc) |
||
112 | return True |
||
113 | |||
114 | def _suitable_reftype(self, ref1, ref2, subtypes): |
||
115 | """ |
||
116 | """ |
||
117 | if ref1 == ua.NodeId(ua.ObjectIds.Null): |
||
118 | # If ReferenceTypeId is not specified in the BrowseDescription, |
||
119 | # all References are returned and includeSubtypes is ignored. |
||
120 | return True |
||
121 | if not subtypes and ref2.Identifier == ua.ObjectIds.HasSubtype: |
||
122 | return False |
||
123 | if ref1.Identifier == ref2.Identifier: |
||
124 | return True |
||
125 | oktypes = self._get_sub_ref(ref1) |
||
126 | if not subtypes and ua.NodeId(ua.ObjectIds.HasSubtype) in oktypes: |
||
127 | oktypes.remove(ua.NodeId(ua.ObjectIds.HasSubtype)) |
||
128 | return ref2 in oktypes |
||
129 | |||
130 | def _get_sub_ref(self, ref): |
||
131 | res = [] |
||
132 | nodedata = self._aspace[ref] |
||
133 | if nodedata is not None: |
||
134 | for ref in nodedata.references: |
||
135 | if ref.ReferenceTypeId.Identifier == ua.ObjectIds.HasSubtype and ref.IsForward: |
||
136 | res.append(ref.NodeId) |
||
137 | res += self._get_sub_ref(ref.NodeId) |
||
138 | return res |
||
139 | |||
140 | def _suitable_direction(self, direction, isforward): |
||
141 | if direction == ua.BrowseDirection.Both: |
||
142 | return True |
||
143 | if direction == ua.BrowseDirection.Forward and isforward: |
||
144 | return True |
||
145 | if direction == ua.BrowseDirection.Inverse and not isforward: |
||
146 | return True |
||
147 | return False |
||
148 | |||
149 | def translate_browsepaths_to_nodeids(self, browsepaths): |
||
150 | # self.logger.debug("translate browsepath: %s", browsepaths) |
||
151 | results = [] |
||
152 | for path in browsepaths: |
||
153 | results.append(self._translate_browsepath_to_nodeid(path)) |
||
154 | return results |
||
155 | |||
156 | def _translate_browsepath_to_nodeid(self, path): |
||
157 | # self.logger.debug("looking at path: %s", path) |
||
158 | res = ua.BrowsePathResult() |
||
159 | if not path.RelativePath.Elements[-1].TargetName: |
||
160 | # OPC UA Part 4: Services, 5.8.4 TranslateBrowsePathsToNodeIds |
||
161 | # it's unclear if this the check should also handle empty strings |
||
162 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameInvalid) |
||
163 | return res |
||
164 | if path.StartingNode not in self._aspace: |
||
165 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid) |
||
166 | return res |
||
167 | current = path.StartingNode |
||
168 | for el in path.RelativePath.Elements: |
||
169 | nodeid = self._find_element_in_node(el, current) |
||
170 | if not nodeid: |
||
171 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNoMatch) |
||
172 | return res |
||
173 | current = nodeid |
||
174 | target = ua.BrowsePathTarget() |
||
175 | target.TargetId = current |
||
176 | target.RemainingPathIndex = 4294967295 |
||
177 | res.Targets = [target] |
||
178 | return res |
||
179 | |||
180 | def _find_element_in_node(self, el, nodeid): |
||
181 | nodedata = self._aspace[nodeid] |
||
182 | for ref in nodedata.references: |
||
183 | if ref.BrowseName != el.TargetName: |
||
184 | continue |
||
185 | if ref.IsForward == el.IsInverse: |
||
186 | continue |
||
187 | if not el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId: |
||
188 | continue |
||
189 | elif el.IncludeSubtypes and ref.ReferenceTypeId != el.ReferenceTypeId: |
||
190 | if ref.ReferenceTypeId not in self._get_sub_ref(el.ReferenceTypeId): |
||
191 | continue |
||
192 | return ref.NodeId |
||
193 | self.logger.info("element %s was not found in node %s", el, nodeid) |
||
194 | return None |
||
195 | |||
196 | |||
197 | class NodeManagementService: |
||
198 | |||
199 | def __init__(self, aspace: "AddressSpace"): |
||
200 | self.logger = logging.getLogger(__name__) |
||
201 | self._aspace: "AddressSpace" = aspace |
||
202 | |||
203 | def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)): |
||
204 | results = [] |
||
205 | for item in addnodeitems: |
||
206 | results.append(self._add_node(item, user)) |
||
207 | return results |
||
208 | |||
209 | def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True): |
||
210 | for item in addnodeitems: |
||
211 | ret = self._add_node(item, user, check=check) |
||
212 | if not ret.StatusCode.is_good(): |
||
213 | yield item |
||
214 | |||
215 | def _add_node(self, item, user, check=True): |
||
216 | # self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName) |
||
217 | result = ua.AddNodesResult() |
||
218 | |||
219 | if not user.role == UserRole.Admin: |
||
220 | result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied) |
||
221 | return result |
||
222 | |||
223 | if item.RequestedNewNodeId.has_null_identifier(): |
||
224 | # If Identifier of requested NodeId is null we generate a new NodeId using |
||
225 | # the namespace of the nodeid, this is an extention of the spec to allow |
||
226 | # to requests the server to generate a new nodeid in a specified namespace |
||
227 | # self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier") |
||
228 | item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex) |
||
229 | else: |
||
230 | if item.RequestedNewNodeId in self._aspace: |
||
231 | self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId) |
||
232 | result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists) |
||
233 | return result |
||
234 | |||
235 | if item.ParentNodeId.is_null(): |
||
236 | # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s", |
||
237 | # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null()) |
||
238 | if check: |
||
239 | result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid) |
||
240 | return result |
||
241 | |||
242 | parentdata = self._aspace.get(item.ParentNodeId) |
||
243 | if parentdata is None and not item.ParentNodeId.is_null(): |
||
244 | self.logger.info("add_node: while adding node %s, requested parent node %s does not exists", |
||
245 | item.RequestedNewNodeId, item.ParentNodeId) |
||
246 | result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid) |
||
247 | return result |
||
248 | |||
249 | if item.ParentNodeId in self._aspace: |
||
250 | for ref in self._aspace[item.ParentNodeId].references: |
||
251 | # Check if the Parent has a "HasChild" Reference (or subtype of it) with the Node |
||
252 | if ref.ReferenceTypeId.Identifier in [ua.ObjectIds.HasChild, ua.ObjectIds.HasComponent, |
||
253 | ua.ObjectIds.HasProperty, ua.ObjectIds.HasSubtype, |
||
254 | ua.ObjectIds.HasOrderedComponent] and ref.IsForward: |
||
255 | if item.BrowseName.Name == ref.BrowseName.Name: |
||
256 | self.logger.warning(f"AddNodesItem: Requested Browsename {item.BrowseName.Name}" |
||
257 | f" already exists in Parent Node. ParentID:{item.ParentNodeId} --- " |
||
258 | f"ItemId:{item.RequestedNewNodeId}") |
||
259 | result.StatusCode = ua.StatusCode(ua.StatusCodes.BadBrowseNameDuplicated) |
||
260 | return result |
||
261 | |||
262 | nodedata = NodeData(item.RequestedNewNodeId) |
||
263 | |||
264 | self._add_node_attributes(nodedata, item, add_timestamps=check) |
||
265 | |||
266 | # now add our node to db |
||
267 | self._aspace[nodedata.nodeid] = nodedata |
||
268 | |||
269 | if parentdata is not None: |
||
270 | self._add_ref_from_parent(nodedata, item, parentdata) |
||
271 | self._add_ref_to_parent(nodedata, item, parentdata) |
||
272 | |||
273 | # add type definition |
||
274 | if item.TypeDefinition != ua.NodeId(): |
||
275 | self._add_type_definition(nodedata, item) |
||
276 | |||
277 | result.StatusCode = ua.StatusCode() |
||
278 | result.AddedNodeId = nodedata.nodeid |
||
279 | |||
280 | return result |
||
281 | |||
282 | def _add_node_attributes(self, nodedata, item, add_timestamps): |
||
283 | # add common attrs |
||
284 | nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue( |
||
285 | ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId)) |
||
286 | ) |
||
287 | nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue( |
||
288 | ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName)) |
||
289 | ) |
||
290 | nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue( |
||
291 | ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32)) |
||
292 | ) |
||
293 | # add requested attrs |
||
294 | self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps) |
||
295 | |||
296 | def _add_unique_reference(self, nodedata, desc): |
||
297 | for r in nodedata.references: |
||
298 | if r.ReferenceTypeId == desc.ReferenceTypeId and r.NodeId == desc.NodeId: |
||
299 | if r.IsForward != desc.IsForward: |
||
300 | self.logger.error("Cannot add conflicting reference %s ", str(desc)) |
||
301 | return ua.StatusCode(ua.StatusCodes.BadReferenceNotAllowed) |
||
302 | break # ref already exists |
||
303 | else: |
||
304 | nodedata.references.append(desc) |
||
305 | return ua.StatusCode() |
||
306 | |||
307 | def _add_ref_from_parent(self, nodedata, item, parentdata): |
||
308 | desc = ua.ReferenceDescription() |
||
309 | desc.ReferenceTypeId = item.ReferenceTypeId |
||
310 | desc.NodeId = nodedata.nodeid |
||
311 | desc.NodeClass = item.NodeClass |
||
312 | desc.BrowseName = item.BrowseName |
||
313 | desc.DisplayName = item.NodeAttributes.DisplayName |
||
314 | desc.TypeDefinition = item.TypeDefinition |
||
315 | desc.IsForward = True |
||
316 | self._add_unique_reference(parentdata, desc) |
||
317 | |||
318 | def _add_ref_to_parent(self, nodedata, item, parentdata): |
||
319 | addref = ua.AddReferencesItem() |
||
320 | addref.ReferenceTypeId = item.ReferenceTypeId |
||
321 | addref.SourceNodeId = nodedata.nodeid |
||
322 | addref.TargetNodeId = item.ParentNodeId |
||
323 | addref.TargetNodeClass = parentdata.attributes[ua.AttributeIds.NodeClass].value.Value.Value |
||
324 | addref.IsForward = False |
||
325 | self._add_reference_no_check(nodedata, addref) |
||
326 | |||
327 | def _add_type_definition(self, nodedata, item): |
||
328 | addref = ua.AddReferencesItem() |
||
329 | addref.SourceNodeId = nodedata.nodeid |
||
330 | addref.IsForward = True |
||
331 | addref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasTypeDefinition) |
||
332 | addref.TargetNodeId = item.TypeDefinition |
||
333 | addref.TargetNodeClass = ua.NodeClass.DataType |
||
334 | self._add_reference_no_check(nodedata, addref) |
||
335 | |||
336 | def delete_nodes(self, deletenodeitems, user=User(role=UserRole.Admin)): |
||
337 | results = [] |
||
338 | for item in deletenodeitems.NodesToDelete: |
||
339 | results.append(self._delete_node(item, user)) |
||
340 | return results |
||
341 | |||
342 | def _delete_node(self, item, user): |
||
343 | if user.role != UserRole.Admin: |
||
344 | return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied) |
||
345 | |||
346 | if item.NodeId not in self._aspace: |
||
347 | self.logger.warning("DeleteNodesItem: NodeId %s does not exists", item.NodeId) |
||
348 | return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown) |
||
349 | |||
350 | if item.DeleteTargetReferences: |
||
351 | for elem in self._aspace.keys(): |
||
352 | for rdesc in self._aspace[elem].references[:]: |
||
353 | if rdesc.NodeId == item.NodeId: |
||
354 | self._aspace[elem].references.remove(rdesc) |
||
355 | |||
356 | self._delete_node_callbacks(self._aspace[item.NodeId]) |
||
357 | |||
358 | del (self._aspace[item.NodeId]) |
||
359 | |||
360 | return ua.StatusCode() |
||
361 | |||
362 | def _delete_node_callbacks(self, nodedata): |
||
363 | if ua.AttributeIds.Value in nodedata.attributes: |
||
364 | for handle, callback in list(nodedata.attributes[ua.AttributeIds.Value].datachange_callbacks.items()): |
||
365 | try: |
||
366 | callback(handle, None, ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown)) |
||
367 | self._aspace.delete_datachange_callback(handle) |
||
368 | except Exception as ex: |
||
369 | self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata, |
||
370 | ua.AttributeIds.Value, ex) |
||
371 | |||
372 | def add_references(self, refs, user=User(role=UserRole.Admin)): |
||
373 | result = [] |
||
374 | for ref in refs: |
||
375 | result.append(self._add_reference(ref, user)) |
||
376 | return result |
||
377 | |||
378 | def try_add_references(self, refs, user=User(role=UserRole.Admin)): |
||
379 | for ref in refs: |
||
380 | if not self._add_reference(ref, user).is_good(): |
||
381 | yield ref |
||
382 | |||
383 | def _add_reference(self, addref, user): |
||
384 | sourcedata = self._aspace.get(addref.SourceNodeId) |
||
385 | if sourcedata is None: |
||
386 | return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid) |
||
387 | if addref.TargetNodeId not in self._aspace: |
||
388 | return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid) |
||
389 | if user.role != UserRole.Admin: |
||
390 | return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied) |
||
391 | return self._add_reference_no_check(sourcedata, addref) |
||
392 | |||
393 | def _add_reference_no_check(self, sourcedata, addref): |
||
394 | rdesc = ua.ReferenceDescription() |
||
395 | rdesc.ReferenceTypeId = addref.ReferenceTypeId |
||
396 | rdesc.IsForward = addref.IsForward |
||
397 | rdesc.NodeId = addref.TargetNodeId |
||
398 | if addref.TargetNodeClass == ua.NodeClass.Unspecified: |
||
399 | rdesc.NodeClass = self._aspace.read_attribute_value( |
||
400 | addref.TargetNodeId, ua.AttributeIds.NodeClass).Value.Value |
||
401 | else: |
||
402 | rdesc.NodeClass = addref.TargetNodeClass |
||
403 | bname = self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.BrowseName).Value.Value |
||
404 | if bname: |
||
405 | rdesc.BrowseName = bname |
||
406 | dname = self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value |
||
407 | if dname: |
||
408 | rdesc.DisplayName = dname |
||
409 | return self._add_unique_reference(sourcedata, rdesc) |
||
410 | |||
411 | def delete_references(self, refs, user=User(role=UserRole.Admin)): |
||
412 | result = [] |
||
413 | for ref in refs: |
||
414 | result.append(self._delete_reference(ref, user)) |
||
415 | return result |
||
416 | |||
417 | def _delete_unique_reference(self, item, invert=False): |
||
418 | if invert: |
||
419 | source, target, forward = item.TargetNodeId, item.SourceNodeId, not item.IsForward |
||
420 | else: |
||
421 | source, target, forward = item.SourceNodeId, item.TargetNodeId, item.IsForward |
||
422 | for rdesc in self._aspace[source].references: |
||
423 | if rdesc.NodeId == target and rdesc.ReferenceTypeId == item.ReferenceTypeId: |
||
424 | if rdesc.IsForward == forward: |
||
425 | self._aspace[source].references.remove(rdesc) |
||
426 | return ua.StatusCode() |
||
427 | return ua.StatusCode(ua.StatusCodes.BadNotFound) |
||
428 | |||
429 | def _delete_reference(self, item, user): |
||
430 | if item.SourceNodeId not in self._aspace: |
||
431 | return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid) |
||
432 | if item.TargetNodeId not in self._aspace: |
||
433 | return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid) |
||
434 | if item.ReferenceTypeId not in self._aspace: |
||
435 | return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid) |
||
436 | if user.role != UserRole.Admin: |
||
437 | return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied) |
||
438 | |||
439 | if item.DeleteBidirectional: |
||
440 | self._delete_unique_reference(item, True) |
||
441 | return self._delete_unique_reference(item) |
||
442 | |||
443 | def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False): |
||
444 | if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
445 | dv = ua.DataValue(ua.Variant(getattr(item, name), vtype)) |
||
446 | if add_timestamps: |
||
447 | # dv.ServerTimestamp = datetime.utcnow() # Disabled until someone explains us it should be there |
||
448 | dv.SourceTimestamp = datetime.utcnow() |
||
449 | nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv) |
||
450 | |||
451 | def _add_nodeattributes(self, item, nodedata, add_timestamps): |
||
452 | self._add_node_attr(item, nodedata, "AccessLevel", ua.VariantType.Byte) |
||
453 | self._add_node_attr(item, nodedata, "ArrayDimensions", ua.VariantType.UInt32) |
||
454 | self._add_node_attr(item, nodedata, "BrowseName", ua.VariantType.QualifiedName) |
||
455 | self._add_node_attr(item, nodedata, "ContainsNoLoops", ua.VariantType.Boolean) |
||
456 | self._add_node_attr(item, nodedata, "DataType", ua.VariantType.NodeId) |
||
457 | self._add_node_attr(item, nodedata, "Description", ua.VariantType.LocalizedText) |
||
458 | self._add_node_attr(item, nodedata, "DisplayName", ua.VariantType.LocalizedText) |
||
459 | self._add_node_attr(item, nodedata, "EventNotifier", ua.VariantType.Byte) |
||
460 | self._add_node_attr(item, nodedata, "Executable", ua.VariantType.Boolean) |
||
461 | self._add_node_attr(item, nodedata, "Historizing", ua.VariantType.Boolean) |
||
462 | self._add_node_attr(item, nodedata, "InverseName", ua.VariantType.LocalizedText) |
||
463 | self._add_node_attr(item, nodedata, "IsAbstract", ua.VariantType.Boolean) |
||
464 | self._add_node_attr(item, nodedata, "MinimumSamplingInterval", ua.VariantType.Double) |
||
465 | self._add_node_attr(item, nodedata, "NodeClass", ua.VariantType.Int32) |
||
466 | self._add_node_attr(item, nodedata, "NodeId", ua.VariantType.NodeId) |
||
467 | self._add_node_attr(item, nodedata, "Symmetric", ua.VariantType.Boolean) |
||
468 | self._add_node_attr(item, nodedata, "UserAccessLevel", ua.VariantType.Byte) |
||
469 | self._add_node_attr(item, nodedata, "UserExecutable", ua.VariantType.Boolean) |
||
470 | self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.Byte) |
||
471 | self._add_node_attr(item, nodedata, "ValueRank", ua.VariantType.Int32) |
||
472 | self._add_node_attr(item, nodedata, "WriteMask", ua.VariantType.UInt32) |
||
473 | self._add_node_attr(item, nodedata, "UserWriteMask", ua.VariantType.UInt32) |
||
474 | self._add_node_attr(item, nodedata, "DataTypeDefinition", ua.VariantType.ExtensionObject) |
||
475 | self._add_node_attr(item, nodedata, "Value", add_timestamps=add_timestamps) |
||
476 | |||
477 | |||
478 | class MethodService: |
||
479 | |||
480 | def __init__(self, aspace: "AddressSpace"): |
||
481 | self.logger = logging.getLogger(__name__) |
||
482 | self._aspace: "AddressSpace" = aspace |
||
483 | self._pool = ThreadPoolExecutor() |
||
484 | |||
485 | def stop(self): |
||
486 | self._pool.shutdown() |
||
487 | |||
488 | async def call(self, methods): |
||
489 | results = [] |
||
490 | for method in methods: |
||
491 | res = await self._call(method) |
||
492 | results.append(res) |
||
493 | return results |
||
494 | |||
495 | async def _call(self, method): |
||
496 | self.logger.info("Calling: %s", method) |
||
497 | res = ua.CallMethodResult() |
||
498 | if method.ObjectId not in self._aspace or method.MethodId not in self._aspace: |
||
499 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid) |
||
500 | else: |
||
501 | node = self._aspace[method.MethodId] |
||
502 | if node.call is None: |
||
503 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo) |
||
504 | else: |
||
505 | try: |
||
506 | result = await self._run_method(node.call, method.ObjectId, *method.InputArguments) |
||
507 | except Exception: |
||
508 | self.logger.exception("Error executing method call %s, an exception was raised: ", method) |
||
509 | res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError) |
||
510 | else: |
||
511 | if isinstance(result, ua.CallMethodResult): |
||
512 | res = result |
||
513 | elif isinstance(result, ua.StatusCode): |
||
514 | res.StatusCode = result |
||
515 | else: |
||
516 | res.OutputArguments = result |
||
517 | while len(res.InputArgumentResults) < len(method.InputArguments): |
||
518 | res.InputArgumentResults.append(ua.StatusCode()) |
||
519 | return res |
||
520 | |||
521 | async def _run_method(self, func, parent, *args): |
||
522 | if asyncio.iscoroutinefunction(func): |
||
523 | return await func(parent, *args) |
||
524 | p = partial(func, parent, *args) |
||
525 | res = await asyncio.get_event_loop().run_in_executor(self._pool, p) |
||
526 | return res |
||
527 | |||
528 | |||
529 | class AddressSpace: |
||
530 | """ |
||
531 | The address space object stores all the nodes of the OPC-UA server and helper methods. |
||
532 | The methods are thread safe |
||
533 | """ |
||
534 | |||
535 | def __init__(self): |
||
536 | self.logger = logging.getLogger(__name__) |
||
537 | self._nodes = {} |
||
538 | self._datachange_callback_counter = 200 |
||
539 | self._handle_to_attribute_map = {} |
||
540 | self._default_idx = 2 |
||
541 | self._nodeid_counter = {0: 20000, 1: 2000} |
||
542 | |||
543 | def __getitem__(self, nodeid): |
||
544 | return self._nodes.__getitem__(nodeid) |
||
545 | |||
546 | def get(self, nodeid): |
||
547 | return self._nodes.get(nodeid, None) |
||
548 | |||
549 | def __setitem__(self, nodeid, value): |
||
550 | return self._nodes.__setitem__(nodeid, value) |
||
551 | |||
552 | def __contains__(self, nodeid): |
||
553 | return self._nodes.__contains__(nodeid) |
||
554 | |||
555 | def __delitem__(self, nodeid): |
||
556 | self._nodes.__delitem__(nodeid) |
||
557 | |||
558 | def generate_nodeid(self, idx=None): |
||
559 | if idx is None: |
||
560 | idx = self._default_idx |
||
561 | if idx in self._nodeid_counter: |
||
562 | self._nodeid_counter[idx] += 1 |
||
563 | else: |
||
564 | # get the biggest identifier number from the existed nodes in address space |
||
565 | identifier_list = sorted([ |
||
566 | nodeid.Identifier for nodeid in self._nodes.keys() |
||
567 | if nodeid.NamespaceIndex == idx and nodeid.NodeIdType in ( |
||
568 | ua.NodeIdType.Numeric, ua.NodeIdType.TwoByte, ua.NodeIdType.FourByte |
||
569 | ) |
||
570 | ]) |
||
571 | if identifier_list: |
||
572 | self._nodeid_counter[idx] = identifier_list[-1] |
||
573 | else: |
||
574 | self._nodeid_counter[idx] = 1 |
||
575 | nodeid = ua.NodeId(self._nodeid_counter[idx], idx) |
||
576 | while True: |
||
577 | if nodeid in self._nodes: |
||
578 | nodeid = self.generate_nodeid(idx) |
||
579 | else: |
||
580 | return nodeid |
||
581 | |||
582 | def keys(self): |
||
583 | return self._nodes.keys() |
||
584 | |||
585 | def empty(self): |
||
586 | """Delete all nodes in address space""" |
||
587 | self._nodes = {} |
||
588 | |||
589 | def dump(self, path): |
||
590 | """ |
||
591 | Dump address space as binary to file; note that server must be stopped for this method to work |
||
592 | DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED! |
||
593 | """ |
||
594 | # prepare nodes in address space for being serialized |
||
595 | for nodeid, ndata in self._nodes.items(): |
||
596 | # if the node has a reference to a method call, remove it so the object can be serialized |
||
597 | if ndata.call is not None: |
||
598 | self._nodes[nodeid].call = None |
||
599 | |||
600 | with open(path, 'wb') as f: |
||
601 | pickle.dump(self._nodes, f, pickle.HIGHEST_PROTOCOL) |
||
602 | |||
603 | def load(self, path): |
||
604 | """ |
||
605 | Load address space from a binary file, overwriting everything in the current address space |
||
606 | """ |
||
607 | with open(path, 'rb') as f: |
||
608 | self._nodes = pickle.load(f) |
||
609 | |||
610 | def make_aspace_shelf(self, path): |
||
611 | """ |
||
612 | Make a shelf for containing the nodes from the standard address space; this is typically only done on first |
||
613 | start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache |
||
614 | by the LazyLoadingDict class when they are accessed. Saving data back to the shelf |
||
615 | is currently NOT supported, it is only used for the default OPC UA standard address space |
||
616 | |||
617 | Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time |
||
618 | """ |
||
619 | with shelve.open(path, 'n', protocol=pickle.HIGHEST_PROTOCOL) as s: |
||
620 | for nodeid, ndata in self._nodes.items(): |
||
621 | s[nodeid.to_string()] = ndata |
||
622 | |||
623 | def load_aspace_shelf(self, path): |
||
624 | """ |
||
625 | Load the standard address space nodes from a python shelve via LazyLoadingDict as needed. |
||
626 | The dump() method can no longer be used if the address space is being loaded from a shelf |
||
627 | |||
628 | Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time |
||
629 | """ |
||
630 | raise NotImplementedError |
||
631 | |||
632 | # ToDo: async friendly implementation - load all at once? |
||
633 | class LazyLoadingDict(collections.MutableMapping): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
634 | """ |
||
635 | Special dict that only loads nodes as they are accessed. If a node is accessed it gets copied from the |
||
636 | shelve to the cache dict. All user nodes are saved in the cache ONLY. Saving data back to the shelf |
||
637 | is currently NOT supported |
||
638 | """ |
||
639 | |||
640 | def __init__(self, source): |
||
641 | self.source = source # python shelf |
||
642 | self.cache = {} # internal dict |
||
643 | |||
644 | def __getitem__(self, key): |
||
645 | # try to get the item (node) from the cache, if it isn't there get it from the shelf |
||
646 | try: |
||
647 | return self.cache[key] |
||
648 | except KeyError: |
||
649 | node = self.cache[key] = self.source[key.to_string()] |
||
650 | return node |
||
651 | |||
652 | def __setitem__(self, key, value): |
||
653 | # add a new item to the cache; if this item is in the shelf it is not updated |
||
654 | self.cache[key] = value |
||
655 | |||
656 | def __contains__(self, key): |
||
657 | return key in self.cache or key.to_string() in self.source |
||
658 | |||
659 | def __delitem__(self, key): |
||
660 | # only deleting items from the cache is allowed |
||
661 | del self.cache[key] |
||
662 | |||
663 | def __iter__(self): |
||
664 | # only the cache can be iterated over |
||
665 | return iter(self.cache.keys()) |
||
666 | |||
667 | def __len__(self): |
||
668 | # only returns the length of items in the cache, not unaccessed items in the shelf |
||
669 | return len(self.cache) |
||
670 | |||
671 | self._nodes = LazyLoadingDict(shelve.open(path, "r")) |
||
672 | |||
673 | def read_attribute_value(self, nodeid, attr): |
||
674 | # self.logger.debug("get attr val: %s %s", nodeid, attr) |
||
675 | if nodeid not in self._nodes: |
||
676 | dv = ua.DataValue() |
||
677 | dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown) |
||
678 | return dv |
||
679 | node = self._nodes[nodeid] |
||
680 | if attr not in node.attributes: |
||
681 | dv = ua.DataValue() |
||
682 | dv.StatusCode = ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid) |
||
683 | return dv |
||
684 | attval = node.attributes[attr] |
||
685 | if attval.value_callback: |
||
686 | return attval.value_callback() |
||
687 | return attval.value |
||
688 | |||
689 | async def write_attribute_value(self, nodeid, attr, value): |
||
690 | # self.logger.debug("set attr val: %s %s %s", nodeid, attr, value) |
||
691 | node = self._nodes.get(nodeid, None) |
||
692 | if node is None: |
||
693 | return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown) |
||
694 | attval = node.attributes.get(attr, None) |
||
695 | if attval is None: |
||
696 | return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid) |
||
697 | |||
698 | old = attval.value |
||
699 | attval.value = value |
||
700 | cbs = [] |
||
701 | if old.Value != value.Value: # only send call callback when a value change has happend |
||
702 | cbs = list(attval.datachange_callbacks.items()) |
||
703 | |||
704 | for k, v in cbs: |
||
705 | try: |
||
706 | await v(k, value) |
||
707 | except Exception as ex: |
||
708 | self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex) |
||
709 | |||
710 | return ua.StatusCode() |
||
711 | |||
712 | def add_datachange_callback(self, nodeid, attr, callback): |
||
713 | self.logger.debug("set attr callback: %s %s %s", nodeid, attr, callback) |
||
714 | if nodeid not in self._nodes: |
||
715 | return ua.StatusCode(ua.StatusCodes.BadNodeIdUnknown), 0 |
||
716 | node = self._nodes[nodeid] |
||
717 | if attr not in node.attributes: |
||
718 | return ua.StatusCode(ua.StatusCodes.BadAttributeIdInvalid), 0 |
||
719 | attval = node.attributes[attr] |
||
720 | self._datachange_callback_counter += 1 |
||
721 | handle = self._datachange_callback_counter |
||
722 | attval.datachange_callbacks[handle] = callback |
||
723 | self._handle_to_attribute_map[handle] = (nodeid, attr) |
||
724 | return ua.StatusCode(), handle |
||
725 | |||
726 | def delete_datachange_callback(self, handle): |
||
727 | if handle in self._handle_to_attribute_map: |
||
728 | nodeid, attr = self._handle_to_attribute_map.pop(handle) |
||
729 | self._nodes[nodeid].attributes[attr].datachange_callbacks.pop(handle) |
||
730 | |||
731 | def add_method_callback(self, methodid, callback): |
||
732 | node = self._nodes[methodid] |
||
733 | node.call = callback |
||
734 |