| Total Complexity | 8 |
| Total Lines | 44 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | from dataclasses import dataclass, field |
||
| 2 | from typing import Any, Iterable, Optional, Set |
||
| 3 | from asyncua import ua |
||
| 4 | from sortedcontainers import SortedDict |
||
| 5 | |||
| 6 | |||
| 7 | TypeSubHandler = Any |
||
| 8 | |||
| 9 | |||
| 10 | @dataclass(frozen=True) |
||
| 11 | class NodeAttr: |
||
| 12 | attr: Optional[ua.AttributeIds] = None |
||
| 13 | queuesize: int = 0 |
||
| 14 | |||
| 15 | |||
| 16 | @dataclass |
||
| 17 | class VirtualSubscription: |
||
| 18 | period: int |
||
| 19 | handler: TypeSubHandler |
||
| 20 | publishing: bool |
||
| 21 | monitoring: ua.MonitoringMode |
||
| 22 | # full type: SortedDict[str, NodeAttr] |
||
| 23 | nodes: SortedDict = field(default_factory=SortedDict) |
||
| 24 | |||
| 25 | def subscribe_data_change( |
||
| 26 | self, nodes: Iterable[str], attr: ua.AttributeIds, queuesize: int |
||
| 27 | ) -> None: |
||
| 28 | for node in nodes: |
||
| 29 | self.nodes[node] = NodeAttr(attr, queuesize) |
||
| 30 | |||
| 31 | def unsubscribe(self, nodes: Iterable[str]) -> None: |
||
| 32 | for node in nodes: |
||
| 33 | if self.nodes.get(node): |
||
| 34 | self.nodes.pop(node) |
||
| 35 | |||
| 36 | def set_monitoring_mode(self, mode: ua.MonitoringMode) -> None: |
||
| 37 | self.monitoring = mode |
||
| 38 | |||
| 39 | def set_publishing_mode(self, mode: bool) -> None: |
||
| 40 | self.publishing = mode |
||
| 41 | |||
| 42 | def get_nodes(self) -> Set[str]: |
||
| 43 | return set(self.nodes) |
||
| 44 |