Passed
Branch master (6083f5)
by Olivier
03:32
created

Server._set_endpoints()   B

Complexity

Conditions 2

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 35
ccs 30
cts 30
cp 1
crap 2
rs 8.8571
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
# from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.server.event_generator import EventGenerator
17 1
from opcua.common.node import Node
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.xmlexporter import XmlExporter
26 1
from opcua.common.ua_utils import get_nodes_of_namespace
27 1
use_crypto = True
28 1
try:
29 1
    from opcua.crypto import uacrypto
30
except ImportError:
31
    print("cryptography is not installed, use of crypto disabled")
32
    use_crypto = False
33
34
35 1
class Server(object):
36
37
    """
38
    High level Server class
39
40
    This class creates an opcua server with default values
41
42
    Create your own namespace and then populate your server address space
43
    using use the get_root() or get_objects() to get Node objects.
44
    and get_event_object() to fire events.
45
    Then start server. See example_server.py
46
    All methods are threadsafe
47
48
    If you need more flexibility you call directly the Ua Service methods
49
    on the iserver  or iserver.isesssion object members.
50
51
    During startup the standard address space will be constructed, which may be
52
    time-consuming when running a server on a less powerful device (e.g. a
53
    Raspberry Pi). In order to improve startup performance, a optional path to a
54
    cache file can be passed to the server constructor.
55
    If the parameter is defined, the address space will be loaded from the
56
    cache file or the file will be created if it does not exist yet.
57
    As a result the first startup will be even slower due to the cache file
58
    generation but all further startups will be significantly faster.
59
60
    :ivar application_uri:
61
    :vartype application_uri: uri
62
    :ivar product_uri:
63
    :vartype product_uri: uri
64
    :ivar name:
65
    :vartype name: string
66
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
67
    :vartype default_timeout: int
68
    :ivar iserver: internal server object
69
    :vartype default_timeout: InternalServer
70
    :ivar bserver: binary protocol server
71
    :vartype bserver: BinaryServer
72
    :ivar nodes: shortcuts to common nodes
73
    :vartype nodes: Shortcuts
74
75
    """
76
77 1
    def __init__(self, cacheFile=None, iserver=None):
78 1
        self.logger = logging.getLogger(__name__)
79 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self.application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83 1
        self.application_type = ua.ApplicationType.ClientAndServer
84 1
        self.default_timeout = 3600000
85 1
        if iserver is not None:
86
            self.iserver = iserver
87
        else:
88 1
            self.iserver = InternalServer(cacheFile)
89 1
        self.bserver = None
90 1
        self._discovery_clients = {}
91 1
        self._discovery_period = 60
92 1
        self.certificate = None
93 1
        self.private_key = None
94 1
        self._policies = []
95 1
        self.nodes = Shortcuts(self.iserver.isession)
96
97
        # setup some expected values
98 1
        self.register_namespace(self.application_uri)
99 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
100 1
        sa_node.set_value([self.application_uri])
101
102 1
    def __enter__(self):
103 1
        self.start()
104 1
        return self
105
106 1
    def __exit__(self, exc_type, exc_value, traceback):
107 1
        self.stop()
108
109 1
    def load_certificate(self, path):
110
        """
111
        load server certificate from file, either pem or der
112
        """
113 1
        self.certificate = uacrypto.load_certificate(path)
114
115 1
    def load_private_key(self, path):
116 1
        self.private_key = uacrypto.load_private_key(path)
117
118 1
    def disable_clock(self, val=True):
119
        """
120
        for debugging you may want to disable clock that write every second
121
        to address space
122
        """
123
        self.iserver.disabled_clock = val
124
125 1
    def set_application_uri(self, uri):
126
        """
127
        Set application/server URI.
128
        This uri is supposed to be unique. If you intent to register
129
        your server to a discovery server, it really should be unique in
130
        your system!
131
        default is : "urn:freeopcua:python:server"
132
        """
133 1
        self.application_uri = uri
134
135 1
    def find_servers(self, uris=None):
136
        """
137
        find_servers. mainly implemented for simmetry with client
138
        """
139 1
        if uris is None:
140 1
            uris = []
141 1
        params = ua.FindServersParameters()
142 1
        params.EndpointUrl = self.endpoint.geturl()
143 1
        params.ServerUris = uris
144 1
        return self.iserver.find_servers(params)
145
146 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
147
        """
148
        Register to an OPC-UA Discovery server. Registering must be renewed at
149
        least every 10 minutes, so this method will use our asyncio thread to
150
        re-register every period seconds
151
        if period is 0 registration is not automatically renewed
152
        """
153
        # FIXME: habe a period per discovery
154 1
        if url in self._discovery_clients:
155 1
            self._discovery_clients[url].disconnect()
156 1
        self._discovery_clients[url] = Client(url)
157 1
        self._discovery_clients[url].connect()
158 1
        self._discovery_clients[url].register_server(self)
159 1
        self._discovery_period = period
160 1
        if period:
161
            self.iserver.loop.call_soon(self._renew_registration)
162
163 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
164
        """
165
        stop registration thread
166
        """
167
        # FIXME: is there really no way to deregister?
168
        self._discovery_clients[url].disconnect()
169
170 1
    def _renew_registration(self):
171
        for client in self._discovery_clients.values():
172
            client.register_server(self)
173
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
174
175 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
176
        """
177
        Create a client to discovery server and return it
178
        """
179
        client = Client(url)
180
        client.connect()
181
        return client
182
183 1
    def allow_remote_admin(self, allow):
184
        """
185
        Enable or disable the builtin Admin user from network clients
186
        """
187
        self.iserver.allow_remote_admin = allow
188
189 1
    def set_endpoint(self, url):
190 1
        self.endpoint = urlparse(url)
191
192 1
    def get_endpoints(self):
193 1
        return self.iserver.get_endpoints()
194
195 1
    def _setup_server_nodes(self):
196
        # to be called just before starting server since it needs all parameters to be setup
197 1
        self._set_endpoints()
198 1
        self._policies = [ua.SecurityPolicyFactory()]
199 1
        if self.certificate and self.private_key:
200 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
201
                                ua.MessageSecurityMode.SignAndEncrypt)
202 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
203
                                                           ua.MessageSecurityMode.SignAndEncrypt,
204
                                                           self.certificate,
205
                                                           self.private_key)
206
                                 )
207 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
208
                                ua.MessageSecurityMode.Sign)
209 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
210
                                                           ua.MessageSecurityMode.Sign,
211
                                                           self.certificate,
212
                                                           self.private_key)
213
                                 )
214 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
215
                                ua.MessageSecurityMode.SignAndEncrypt)
216 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
217
                                                           ua.MessageSecurityMode.SignAndEncrypt,
218
                                                           self.certificate,
219
                                                           self.private_key)
220
                                 )
221 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
222
                                ua.MessageSecurityMode.Sign)
223 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
224
                                                           ua.MessageSecurityMode.Sign,
225
                                                           self.certificate,
226
                                                           self.private_key)
227
                                 )
228
229 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
230 1
        idtoken = ua.UserTokenPolicy()
231 1
        idtoken.PolicyId = 'anonymous'
232 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
233
234 1
        idtoken2 = ua.UserTokenPolicy()
235 1
        idtoken2.PolicyId = 'certificate_basic256'
236 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
237
238 1
        idtoken3 = ua.UserTokenPolicy()
239 1
        idtoken3.PolicyId = 'certificate_basic128'
240 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
241
242 1
        idtoken4 = ua.UserTokenPolicy()
243 1
        idtoken4.PolicyId = 'username'
244 1
        idtoken4.TokenType = ua.UserTokenType.UserName
245
246 1
        appdesc = ua.ApplicationDescription()
247 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
248 1
        appdesc.ApplicationUri = self.application_uri
249 1
        appdesc.ApplicationType = self.application_type
250 1
        appdesc.ProductUri = self.product_uri
251 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
252
253 1
        edp = ua.EndpointDescription()
254 1
        edp.EndpointUrl = self.endpoint.geturl()
255 1
        edp.Server = appdesc
256 1
        if self.certificate:
257 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
258 1
        edp.SecurityMode = mode
259 1
        edp.SecurityPolicyUri = policy.URI
260 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
261 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
262 1
        edp.SecurityLevel = 0
263 1
        self.iserver.add_endpoint(edp)
264
265 1
    def set_server_name(self, name):
266
        self.name = name
267
268 1
    def start(self):
269
        """
270
        Start to listen on network
271
        """
272 1
        self._setup_server_nodes()
273 1
        self.iserver.start()
274 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
275 1
        self.bserver.set_policies(self._policies)
276 1
        self.bserver.start()
277
278 1
    def stop(self):
279
        """
280
        Stop server
281
        """
282 1
        for client in self._discovery_clients.values():
283 1
            client.disconnect()
284 1
        self.bserver.stop()
285 1
        self.iserver.stop()
286
287 1
    def get_root_node(self):
288
        """
289
        Get Root node of server. Returns a Node object.
290
        """
291 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
292
293 1
    def get_objects_node(self):
294
        """
295
        Get Objects node of server. Returns a Node object.
296
        """
297 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
298
299 1
    def get_server_node(self):
300
        """
301
        Get Server node of server. Returns a Node object.
302
        """
303 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
304
305 1
    def get_node(self, nodeid):
306
        """
307
        Get a specific node using NodeId object or a string representing a NodeId
308
        """
309 1
        return Node(self.iserver.isession, nodeid)
310
311 1
    def create_subscription(self, period, handler):
312
        """
313
        Create a subscription.
314
        returns a Subscription object which allow
315
        to subscribe to events or data on server
316
        """
317 1
        params = ua.CreateSubscriptionParameters()
318 1
        params.RequestedPublishingInterval = period
319 1
        params.RequestedLifetimeCount = 3000
320 1
        params.RequestedMaxKeepAliveCount = 10000
321 1
        params.MaxNotificationsPerPublish = 0
322 1
        params.PublishingEnabled = True
323 1
        params.Priority = 0
324 1
        return Subscription(self.iserver.isession, params, handler)
325
326 1
    def get_namespace_array(self):
327
        """
328
        get all namespace defined in server
329
        """
330 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
331 1
        return ns_node.get_value()
332
333 1
    def register_namespace(self, uri):
334
        """
335
        Register a new namespace. Nodes should in custom namespace, not 0.
336
        """
337 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
338 1
        uries = ns_node.get_value()
339 1
        if uri in uries:
340 1
            return uries.index(uri)
341 1
        uries.append(uri)
342 1
        ns_node.set_value(uries)
343 1
        return len(uries) - 1
344
345 1
    def get_namespace_index(self, uri):
346
        """
347
        get index of a namespace using its uri
348
        """
349 1
        uries = self.get_namespace_array()
350 1
        return uries.index(uri)
351
352 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
353
        """
354
        Returns an event object using an event type from address space.
355
        Use this object to fire events
356
        """
357 1
        if not etype:
358 1
            etype = BaseEvent()
359 1
        return EventGenerator(self.iserver.isession, etype, source)
360
361 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=[]):
362 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
363
364 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=[]):
365 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
366
367 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=[], variables=[], methods=[]):
368 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
369
370
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
371
        # return self._create_custom_type(idx, name, basetype, properties)
372
373 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=[], variables=[], methods=[]):
374 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
375
376 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
377 1
        if isinstance(basetype, Node):
378 1
            base_t = basetype
379 1
        elif isinstance(basetype, ua.NodeId):
380 1
            base_t = Node(self.iserver.isession, basetype)
381
        else:
382 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
383
384 1
        custom_t = base_t.add_object_type(idx, name)
385 1
        for prop in properties:
386 1
            datatype = None
387 1
            if len(prop) > 2:
388
                datatype = prop[2]
389 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
390 1
        for variable in variables:
391 1
            datatype = None
392 1
            if len(variable) > 2:
393 1
                datatype = variable[2]
394 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
395 1
        for method in methods:
396 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
397
398 1
        return custom_t
399
400 1
    def import_xml(self, path):
401
        """
402
        Import nodes defined in xml
403
        """
404 1
        importer = xmlimporter.XmlImporter(self)
405 1
        return importer.import_xml(path)
406
407 1
    def export_xml(self, nodes, path):
408
        """
409
        Export defined nodes to xml
410
        """
411 1
        exp = XmlExporter(self)
412 1
        exp.build_etree(nodes)
413 1
        return exp.write_xml(path)
414
415 1
    def export_xml_by_ns(self, path, namespaces=[]):
416
        """
417
        Export nodes of one or more namespaces to an XML file.  
418
        Namespaces used by nodes are always exported for consistency.
419
        Args:
420
            server: opc ua server to use
421
            path: name of the xml file to write
422
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
423
    
424
        Returns:
425
        """
426
        nodes = get_nodes_of_namespace(self, namespaces)
427
        self.export_xml(nodes, path)
428
429 1
    def delete_nodes(self, nodes, recursive=False):
430 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
431
432 1
    def historize_node(self, node):
433
        self.iserver.enable_history(node)
434
435 1
    def dehistorize_node(self, node):
436
        self.iserver.disable_history(node)
437
438 1
    def subscribe_server_callback(self, event, handle):
439
        self.iserver.subscribe_server_callback(event, handle)
440
441 1
    def unsubscribe_server_callback(self, event, handle):
442
        self.iserver.unsubscribe_server_callback(event, handle)
443
444 1
    def link_method(self, node, callback):
445
        self.iserver.isession.add_method_callback(node.nodeid, callback)
446