Total Complexity | 8 |
Total Lines | 44 |
Duplicated Lines | 0 % |
Changes | 0 |
1 | from dataclasses import dataclass, field |
||
2 | from typing import Any, Iterable, Optional, Set |
||
3 | from asyncua import ua |
||
4 | from sortedcontainers import SortedDict |
||
5 | |||
6 | |||
7 | TypeSubHandler = Any |
||
8 | |||
9 | |||
10 | @dataclass(frozen=True) |
||
11 | class NodeAttr: |
||
12 | attr: Optional[ua.AttributeIds] = None |
||
13 | queuesize: int = 0 |
||
14 | |||
15 | |||
16 | @dataclass |
||
17 | class VirtualSubscription: |
||
18 | period: int |
||
19 | handler: TypeSubHandler |
||
20 | publishing: bool |
||
21 | monitoring: ua.MonitoringMode |
||
22 | # full type: SortedDict[str, NodeAttr] |
||
23 | nodes: SortedDict = field(default_factory=SortedDict) |
||
24 | |||
25 | def subscribe_data_change( |
||
26 | self, nodes: Iterable[str], attr: ua.AttributeIds, queuesize: int |
||
27 | ) -> None: |
||
28 | for node in nodes: |
||
29 | self.nodes[node] = NodeAttr(attr, queuesize) |
||
30 | |||
31 | def unsubscribe(self, nodes: Iterable[str]) -> None: |
||
32 | for node in nodes: |
||
33 | if self.nodes.get(node): |
||
34 | self.nodes.pop(node) |
||
35 | |||
36 | def set_monitoring_mode(self, mode: ua.MonitoringMode) -> None: |
||
37 | self.monitoring = mode |
||
38 | |||
39 | def set_publishing_mode(self, mode: bool) -> None: |
||
40 | self.publishing = mode |
||
41 | |||
42 | def get_nodes(self) -> Set[str]: |
||
43 | return set(self.nodes) |
||
44 |