Passed
Push — dev ( b34770...12545c )
by Olivier
02:36
created

opcua.server.Server   B

Complexity

Total Complexity 37

Size/Duplication

Total Lines 302
Duplicated Lines 0 %

Test Coverage

Coverage 84.67%
Metric Value
dl 0
loc 302
ccs 127
cts 150
cp 0.8467
rs 8.6
wmc 37

29 Methods

Rating   Name   Duplication   Size   Complexity  
A delete_nodes() 0 2 1
A get_endpoints() 0 2 1
B _setup_server_nodes() 0 29 3
A set_server_name() 0 2 1
A start() 0 9 1
A __init__() 0 20 1
A import_xml() 0 6 1
A get_objects_node() 0 5 1
A load_certificate() 0 5 1
A create_subscription() 0 14 1
A find_servers() 0 10 2
A get_server_node() 0 5 1
B _set_endpoints() 0 35 2
A unregister_to_discovery() 0 6 1
A get_event_object() 0 6 1
A _renew_registration() 0 4 2
A load_private_key() 0 2 1
A stop() 0 8 2
A register_namespace() 0 9 1
A get_namespace_index() 0 6 1
A allow_remote_admin() 0 5 1
A get_node() 0 5 1
A get_client_to_discovery() 0 7 1
A disable_clock() 0 6 1
A register_to_discovery() 0 16 3
A get_namespace_array() 0 6 1
A set_endpoint() 0 2 1
A set_application_uri() 0 9 1
A get_root_node() 0 5 1
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import Event
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
use_crypto = True
24 1
try:
25 1
    from opcua.crypto import uacrypto
26 1
except ImportError:
27 1
    print("cryptography is not installed, use of crypto disabled")
28 1
    use_crypto = False
29
30
31 1
class Server(object):
32
33
    """
34
    High level Server class
35
    Create an opcua server with default values
36
    The class is very short. Users are adviced to read the code.
37
    Create your own namespace and then populate your server address space
38
    using use the get_root() or get_objects() to get Node objects.
39
    and get_event_object() to fire events.
40
    Then start server. See example_server.py
41
    All methods are threadsafe
42
43
44
    :ivar application_uri:
45
    :vartype application_uri: uri
46
    :ivar product_uri:
47
    :vartype product_uri: uri
48
    :ivar name:
49
    :vartype name: string
50
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
51
    :vartype default_timeout: int
52
    :ivar iserver: internal server object
53
    :vartype default_timeout: InternalServer
54
    :ivar bserver: binary protocol server
55
    :vartype bserver: BinaryServer
56
57
    """
58
59 1
    def __init__(self):
60 1
        self.logger = logging.getLogger(__name__)
61 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
62 1
        self.application_uri = "urn:freeopcua:python:server"
63 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
64 1
        self.name = "FreeOpcUa Python Server"
65 1
        self.application_type = ua.ApplicationType.ClientAndServer
66 1
        self.default_timeout = 3600000
67 1
        self.iserver = InternalServer()
68 1
        self.bserver = None
69 1
        self._discovery_clients = {}
70 1
        self._discovery_period = 60
71 1
        self.certificate = None
72 1
        self.private_key = None
73 1
        self._policies = []
74
75
        # setup some expected values
76 1
        self.register_namespace(self.application_uri)
77 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
78 1
        sa_node.set_value([self.application_uri])
79
80 1
    def load_certificate(self, path):
81
        """
82
        load server certificate from file, either pem or der
83
        """
84
        self.certificate = uacrypto.load_certificate(path)
85
86 1
    def load_private_key(self, path):
87
        self.private_key = uacrypto.load_private_key(path)
88
89 1
    def disable_clock(self, val=True):
90
        """
91
        for debugging you may want to disable clock that write every second
92
        to address space
93
        """
94
        self.iserver.disabled_clock = val
95
96 1
    def set_application_uri(self, uri):
97
        """
98
        Set application/server URI.
99
        This uri is supposed to be unique. If you intent to register
100
        your server to a discovery server, it really should be unique in
101
        your system!
102
        default is : "urn:freeopcua:python:server"
103
        """
104 1
        self.application_uri = uri
105
106 1
    def find_servers(self, uris=None):
107
        """
108
        find_servers. mainly implemented for simmetry with client
109
        """
110 1
        if uris is None:
111 1
            uris = []
112 1
        params = ua.FindServersParameters()
113 1
        params.EndpointUrl = self.endpoint.geturl()
114 1
        params.ServerUris = uris
115 1
        return self.iserver.find_servers(params)
116
117 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
118
        """
119
        Register to an OPC-UA Discovery server. Registering must be renewed at
120
        least every 10 minutes, so this method will use our asyncio thread to
121
        re-register every period seconds
122
        if period is 0 registration is not automatically renewed
123
        """
124
        # FIXME: habe a period per discovery 
125 1
        if url in self._discovery_clients:
126 1
            self._discovery_clients[url].disconnect()
127 1
        self._discovery_clients[url] = Client(url)
128 1
        self._discovery_clients[url].connect()
129 1
        self._discovery_clients[url].register_server(self)
130 1
        self._discovery_period = period
131 1
        if period:
132
            self.iserver.loop.call_soon(self._renew_registration)
133
134 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
135
        """
136
        stop registration thread
137
        """
138
        # FIXME: is there really no way to deregister?
139
        self._discovery_clients[url].disconnect()
140
141 1
    def _renew_registration(self):
142
        for client in self._discovery_clients.values():
143
            client.register_server(self)
144
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
145
146 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
147
        """
148
        Create a client to discovery server and return it
149
        """
150
        client = Client(url)
151
        client.connect()
152
        return client
153
154 1
    def allow_remote_admin(self, allow):
155
        """
156
        Enable or disable the builtin Admin user from network clients
157
        """
158
        self.iserver.allow_remote_admin = allow
159
160 1
    def set_endpoint(self, url):
161 1
        self.endpoint = urlparse(url)
162
163 1
    def get_endpoints(self):
164 1
        return self.iserver.get_endpoints()
165
166 1
    def _setup_server_nodes(self):
167
        # to be called just before starting server since it needs all parameters to be setup
168 1
        self._set_endpoints()
169 1
        self._policies = [ua.SecurityPolicyFactory()]
170 1
        if self.certificate and self.private_key:
171
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
172
                                ua.MessageSecurityMode.SignAndEncrypt)
173
            self._policies.append(ua.SecurityPolicyFactory(
174
                                security_policies.SecurityPolicyBasic128Rsa15,
175
                                ua.MessageSecurityMode.SignAndEncrypt,
176
                                self.certificate, self.private_key))
177
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
178
                                ua.MessageSecurityMode.Sign)
179
            self._policies.append(ua.SecurityPolicyFactory(
180
                                security_policies.SecurityPolicyBasic128Rsa15,
181
                                ua.MessageSecurityMode.Sign,
182
                                self.certificate, self.private_key))
183
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
184
                                ua.MessageSecurityMode.SignAndEncrypt)
185
            self._policies.append(ua.SecurityPolicyFactory(
186
                                security_policies.SecurityPolicyBasic256,
187
                                ua.MessageSecurityMode.SignAndEncrypt,
188
                                self.certificate, self.private_key))
189
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
190
                                ua.MessageSecurityMode.Sign)
191
            self._policies.append(ua.SecurityPolicyFactory(
192
                                security_policies.SecurityPolicyBasic256,
193
                                ua.MessageSecurityMode.Sign,
194
                                self.certificate, self.private_key))
195
196 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
197 1
        idtoken = ua.UserTokenPolicy()
198 1
        idtoken.PolicyId = 'anonymous'
199 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
200
201 1
        idtoken2 = ua.UserTokenPolicy()
202 1
        idtoken2.PolicyId = 'certificate_basic256'
203 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
204
205 1
        idtoken3 = ua.UserTokenPolicy()
206 1
        idtoken3.PolicyId = 'certificate_basic128'
207 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
208
209 1
        idtoken4 = ua.UserTokenPolicy()
210 1
        idtoken4.PolicyId = 'username'
211 1
        idtoken4.TokenType = ua.UserTokenType.UserName
212
213 1
        appdesc = ua.ApplicationDescription()
214 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
215 1
        appdesc.ApplicationUri = self.application_uri
216 1
        appdesc.ApplicationType = self.application_type
217 1
        appdesc.ProductUri = self.product_uri
218 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
219
220 1
        edp = ua.EndpointDescription()
221 1
        edp.EndpointUrl = self.endpoint.geturl()
222 1
        edp.Server = appdesc
223 1
        if self.certificate:
224
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
225 1
        edp.SecurityMode = mode
226 1
        edp.SecurityPolicyUri = policy.URI
227 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
228 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
229 1
        edp.SecurityLevel = 0
230 1
        self.iserver.add_endpoint(edp)
231
232 1
    def set_server_name(self, name):
233
        self.name = name
234
235 1
    def start(self):
236
        """
237
        Start to listen on network
238
        """
239 1
        self._setup_server_nodes()
240 1
        self.iserver.start()
241 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
242 1
        self.bserver.set_policies(self._policies)
243 1
        self.bserver.start()
244
245 1
    def stop(self):
246
        """
247
        Stop server
248
        """
249 1
        for client in self._discovery_clients.values():
250 1
            client.disconnect()
251 1
        self.bserver.stop()
252 1
        self.iserver.stop()
253
254 1
    def get_root_node(self):
255
        """
256
        Get Root node of server. Returns a Node object.
257
        """
258 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
259
260 1
    def get_objects_node(self):
261
        """
262
        Get Objects node of server. Returns a Node object.
263
        """
264 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
265
266 1
    def get_server_node(self):
267
        """
268
        Get Server node of server. Returns a Node object.
269
        """
270 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
271
272 1
    def get_node(self, nodeid):
273
        """
274
        Get a specific node using NodeId object or a string representing a NodeId
275
        """
276 1
        return Node(self.iserver.isession, nodeid)
277
278 1
    def create_subscription(self, period, handler):
279
        """
280
        Create a subscription.
281
        returns a Subscription object which allow
282
        to subscribe to events or data on server
283
        """
284 1
        params = ua.CreateSubscriptionParameters()
285 1
        params.RequestedPublishingInterval = period
286 1
        params.RequestedLifetimeCount = 3000
287 1
        params.RequestedMaxKeepAliveCount = 10000
288 1
        params.MaxNotificationsPerPublish = 0
289 1
        params.PublishingEnabled = True
290 1
        params.Priority = 0
291 1
        return Subscription(self.iserver.isession, params, handler)
292
293 1
    def get_namespace_array(self):
294
        """
295
        get all namespace defined in server
296
        """
297 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
298 1
        return ns_node.get_value()
299
300 1
    def register_namespace(self, uri):
301
        """
302
        Register a new namespace. Nodes should in custom namespace, not 0.
303
        """
304 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
305 1
        uries = ns_node.get_value()
306 1
        uries.append(uri)
307 1
        ns_node.set_value(uries)
308 1
        return (len(uries) - 1)
309
310 1
    def get_namespace_index(self, uri):
311
        """
312
        get index of a namespace using its uri
313
        """
314 1
        uries = self.get_namespace_array()
315 1
        return uries.index(uri)
316
317 1
    def get_event_object(self, etype=ua.ObjectIds.BaseEventType, source=ua.ObjectIds.Server):
318
        """
319
        Returns an event object using an event type from address space.
320
        Use this object to fire events
321
        """
322
        return Event(self.iserver.isession, etype, source)
323
324 1
    def import_xml(self, path):
325
        """
326
        import nodes defined in xml
327
        """
328 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
329 1
        importer.import_xml(path)
330
331 1
    def delete_nodes(self, nodes, recursive=False):
332 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
333
 
334