UaClient.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 8
Bugs 0 Features 2
Metric Value
cc 1
c 8
b 0
f 2
dl 0
loc 12
rs 9.8
1
import logging
2
3
from PyQt5.QtCore import QSettings
4
5
from opcua import ua
6
from opcua import Client
7
from opcua import Node
8
from opcua import crypto
9
from opcua.tools import endpoint_to_strings
10
11
12
logger = logging.getLogger(__name__)
13
14
15
class UaClient(object):
16
    """
17
    OPC-Ua client specialized for the need of GUI client
18
    return exactly what GUI needs, no customization possible
19
    """
20
21
    def __init__(self):
22
        self.settings = QSettings()
23
        self.client = None
24
        self._connected = False
25
        self._datachange_sub = None
26
        self._event_sub = None
27
        self._subs_dc = {}
28
        self._subs_ev = {}
29
        self.security_mode = None
30
        self.security_policy = None
31
        self.certificate_path = None
32
        self.private_key_path = None
33
34
    def _reset(self):
35
        self.client = None
36
        self._connected = False
37
        self._datachange_sub = None
38
        self._event_sub = None
39
        self._subs_dc = {}
40
        self._subs_ev = {}
41
42
    @staticmethod
43
    def get_endpoints(uri):
44
        client = Client(uri, timeout=2)
45
        client.connect_and_get_server_endpoints()
46
        edps = client.connect_and_get_server_endpoints()
47
        for i, ep in enumerate(edps, start=1):
48
            logger.info('Endpoint %s:', i)
49
            for (n, v) in endpoint_to_strings(ep):
50
                logger.info('  %s: %s', n, v)
51
            logger.info('')
52
        return edps
53
54
    def load_security_settings(self, uri):
55
        self.security_mode = None
56
        self.security_policy = None
57
        self.certificate_path = None
58
        self.private_key_path = None
59
60
        mysettings = self.settings.value("security_settings", None)
61
        if mysettings is None:
62
            return
63
        if uri in mysettings:
64
            mode, policy, cert, key = mysettings[uri]
65
            self.security_mode = mode
66
            self.security_policy = policy
67
            self.certificate_path = cert
68
            self.private_key_path = key
69
70
    def save_security_settings(self, uri):
71
        mysettings = self.settings.value("security_settings", None)
72
        if mysettings is None:
73
            mysettings = {}
74
        mysettings[uri] = [self.security_mode,
75
                           self.security_policy,
76
                           self.certificate_path,
77
                           self.private_key_path]
78
        self.settings.setValue("security_settings", mysettings)
79
80
    def get_node(self, nodeid):
81
        return self.client.get_node(nodeid)
82
    
83
    def connect(self, uri):
84
        self.disconnect()
85
        logger.info("Connecting to %s with parameters %s, %s, %s, %s", uri, self.security_mode, self.security_policy, self.certificate_path, self.private_key_path)
86
        self.client = Client(uri)
87
        if self.security_mode is not None and self.security_policy is not None:
88
            self.client.set_security(
89
                getattr(crypto.security_policies, 'SecurityPolicy' + self.security_policy),
90
                self.certificate_path,
91
                self.private_key_path,
92
                mode=getattr(ua.MessageSecurityMode, self.security_mode)
93
            )
94
        self.client.connect()
95
        self._connected = True
96
        self.save_security_settings(uri)
97
98
    def disconnect(self):
99
        if self._connected:
100
            print("Disconnecting from server")
101
            self._connected = False
102
            try:
103
                self.client.disconnect()
104
            finally:
105
                self._reset()
106
107
    def subscribe_datachange(self, node, handler):
108
        if not self._datachange_sub:
109
            self._datachange_sub = self.client.create_subscription(500, handler)
110
        handle = self._datachange_sub.subscribe_data_change(node)
111
        self._subs_dc[node.nodeid] = handle
112
        return handle
113
114
    def unsubscribe_datachange(self, node):
115
        self._datachange_sub.unsubscribe(self._subs_dc[node.nodeid])
116
117
    def subscribe_events(self, node, handler):
118
        if not self._event_sub:
119
            print("subscirbing with handler: ", handler, dir(handler))
120
            self._event_sub = self.client.create_subscription(500, handler)
121
        handle = self._event_sub.subscribe_events(node)
122
        self._subs_ev[node.nodeid] = handle
123
        return handle
124
125
    def unsubscribe_events(self, node):
126
        self._event_sub.unsubscribe(self._subs_ev[node.nodeid])
127
128
    def get_node_attrs(self, node):
129
        if not isinstance(node, Node):
130
            node = self.client.get_node(node)
131
        attrs = node.get_attributes([ua.AttributeIds.DisplayName, ua.AttributeIds.BrowseName, ua.AttributeIds.NodeId])
132
        return node, [attr.Value.Value.to_string() for attr in attrs]
133
134
    @staticmethod
135
    def get_children(node):
136
        descs = node.get_children_descriptions()
137
        descs.sort(key=lambda x: x.BrowseName)
138
        return descs
139
140