Completed
Pull Request — master (#369)
by Olivier
03:28
created

Server.export_xml_by_ns()   A

Complexity

Conditions 2

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.048

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 15
ccs 1
cts 5
cp 0.2
crap 4.048
rs 9.4285
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common import xmlimporter
21 1
from opcua.common.manage_nodes import delete_nodes
22 1
from opcua.client.client import Client
23 1
from opcua.crypto import security_policies
24 1
from opcua.common.event_objects import BaseEvent
25 1
from opcua.common.shortcuts import Shortcuts
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.ua_utils import get_nodes_of_namespace
28 1
use_crypto = True
29 1
try:
30 1
    from opcua.crypto import uacrypto
31
except ImportError:
32
    print("cryptography is not installed, use of crypto disabled")
33
    use_crypto = False
34
35
36 1
class Server(object):
37
38
    """
39
    High level Server class
40
41
    This class creates an opcua server with default values
42
43
    Create your own namespace and then populate your server address space
44
    using use the get_root() or get_objects() to get Node objects.
45
    and get_event_object() to fire events.
46
    Then start server. See example_server.py
47
    All methods are threadsafe
48
49
    If you need more flexibility you call directly the Ua Service methods
50
    on the iserver  or iserver.isesssion object members.
51
52
    During startup the standard address space will be constructed, which may be
53
    time-consuming when running a server on a less powerful device (e.g. a
54
    Raspberry Pi). In order to improve startup performance, a optional path to a
55
    cache file can be passed to the server constructor.
56
    If the parameter is defined, the address space will be loaded from the
57
    cache file or the file will be created if it does not exist yet.
58
    As a result the first startup will be even slower due to the cache file
59
    generation but all further start ups will be significantly faster.
60
61
    :ivar application_uri:
62
    :vartype application_uri: uri
63
    :ivar product_uri:
64
    :vartype product_uri: uri
65
    :ivar name:
66
    :vartype name: string
67
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
68
    :vartype default_timeout: int
69
    :ivar iserver: internal server object
70
    :vartype default_timeout: InternalServer
71
    :ivar bserver: binary protocol server
72
    :vartype bserver: BinaryServer
73
    :ivar nodes: shortcuts to common nodes
74
    :vartype nodes: Shortcuts
75
76
    """
77
78 1
    def __init__(self, shelffile=None, iserver=None):
79 1
        self.logger = logging.getLogger(__name__)
80 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
81 1
        self.application_uri = "urn:freeopcua:python:server"
82 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
83 1
        self.name = "FreeOpcUa Python Server"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.register_namespace(self.application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self.application_uri])
102
103 1
    def __enter__(self):
104 1
        self.start()
105 1
        return self
106
107 1
    def __exit__(self, exc_type, exc_value, traceback):
108 1
        self.stop()
109
110 1
    def load_certificate(self, path):
111
        """
112
        load server certificate from file, either pem or der
113
        """
114 1
        self.certificate = uacrypto.load_certificate(path)
115
116 1
    def load_private_key(self, path):
117 1
        self.private_key = uacrypto.load_private_key(path)
118
119 1
    def disable_clock(self, val=True):
120
        """
121
        for debugging you may want to disable clock that write every second
122
        to address space
123
        """
124
        self.iserver.disabled_clock = val
125
126 1
    def set_application_uri(self, uri):
127
        """
128
        Set application/server URI.
129
        This uri is supposed to be unique. If you intent to register
130
        your server to a discovery server, it really should be unique in
131
        your system!
132
        default is : "urn:freeopcua:python:server"
133
        """
134 1
        self.application_uri = uri
135
136 1
    def find_servers(self, uris=None):
137
        """
138
        find_servers. mainly implemented for symmetry with client
139
        """
140 1
        if uris is None:
141 1
            uris = []
142 1
        params = ua.FindServersParameters()
143 1
        params.EndpointUrl = self.endpoint.geturl()
144 1
        params.ServerUris = uris
145 1
        return self.iserver.find_servers(params)
146
147 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
148
        """
149
        Register to an OPC-UA Discovery server. Registering must be renewed at
150
        least every 10 minutes, so this method will use our asyncio thread to
151
        re-register every period seconds
152
        if period is 0 registration is not automatically renewed
153
        """
154
        # FIXME: have a period per discovery
155 1
        if url in self._discovery_clients:
156 1
            self._discovery_clients[url].disconnect()
157 1
        self._discovery_clients[url] = Client(url)
158 1
        self._discovery_clients[url].connect()
159 1
        self._discovery_clients[url].register_server(self)
160 1
        self._discovery_period = period
161 1
        if period:
162
            self.iserver.loop.call_soon(self._renew_registration)
163
164 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
165
        """
166
        stop registration thread
167
        """
168
        # FIXME: is there really no way to deregister?
169
        self._discovery_clients[url].disconnect()
170
171 1
    def _renew_registration(self):
172
        for client in self._discovery_clients.values():
173
            client.register_server(self)
174
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
175
176 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
177
        """
178
        Create a client to discovery server and return it
179
        """
180
        client = Client(url)
181
        client.connect()
182
        return client
183
184 1
    def allow_remote_admin(self, allow):
185
        """
186
        Enable or disable the builtin Admin user from network clients
187
        """
188
        self.iserver.allow_remote_admin = allow
189
190 1
    def set_endpoint(self, url):
191 1
        self.endpoint = urlparse(url)
192
193 1
    def get_endpoints(self):
194 1
        return self.iserver.get_endpoints()
195
196 1
    def _setup_server_nodes(self):
197
        # to be called just before starting server since it needs all parameters to be setup
198 1
        self._set_endpoints()
199 1
        self._policies = [ua.SecurityPolicyFactory()]
200 1
        if self.certificate and self.private_key:
201 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
202
                                ua.MessageSecurityMode.SignAndEncrypt)
203 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
204
                                                           ua.MessageSecurityMode.SignAndEncrypt,
205
                                                           self.certificate,
206
                                                           self.private_key)
207
                                 )
208 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
209
                                ua.MessageSecurityMode.Sign)
210 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
211
                                                           ua.MessageSecurityMode.Sign,
212
                                                           self.certificate,
213
                                                           self.private_key)
214
                                 )
215 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
216
                                ua.MessageSecurityMode.SignAndEncrypt)
217 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
218
                                                           ua.MessageSecurityMode.SignAndEncrypt,
219
                                                           self.certificate,
220
                                                           self.private_key)
221
                                 )
222 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
223
                                ua.MessageSecurityMode.Sign)
224 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
225
                                                           ua.MessageSecurityMode.Sign,
226
                                                           self.certificate,
227
                                                           self.private_key)
228
                                 )
229
230 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
231 1
        idtoken = ua.UserTokenPolicy()
232 1
        idtoken.PolicyId = 'anonymous'
233 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
234
235 1
        idtoken2 = ua.UserTokenPolicy()
236 1
        idtoken2.PolicyId = 'certificate_basic256'
237 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
238
239 1
        idtoken3 = ua.UserTokenPolicy()
240 1
        idtoken3.PolicyId = 'certificate_basic128'
241 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
242
243 1
        idtoken4 = ua.UserTokenPolicy()
244 1
        idtoken4.PolicyId = 'username'
245 1
        idtoken4.TokenType = ua.UserTokenType.UserName
246
247 1
        appdesc = ua.ApplicationDescription()
248 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
249 1
        appdesc.ApplicationUri = self.application_uri
250 1
        appdesc.ApplicationType = self.application_type
251 1
        appdesc.ProductUri = self.product_uri
252 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
253
254 1
        edp = ua.EndpointDescription()
255 1
        edp.EndpointUrl = self.endpoint.geturl()
256 1
        edp.Server = appdesc
257 1
        if self.certificate:
258 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
259 1
        edp.SecurityMode = mode
260 1
        edp.SecurityPolicyUri = policy.URI
261 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
262 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
263 1
        edp.SecurityLevel = 0
264 1
        self.iserver.add_endpoint(edp)
265
266 1
    def set_server_name(self, name):
267
        self.name = name
268
269 1
    def start(self):
270
        """
271
        Start to listen on network
272
        """
273 1
        self._setup_server_nodes()
274 1
        self.iserver.start()
275 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
276 1
        self.bserver.set_policies(self._policies)
277 1
        self.bserver.start()
278
279 1
    def stop(self):
280
        """
281
        Stop server
282
        """
283 1
        for client in self._discovery_clients.values():
284 1
            client.disconnect()
285 1
        self.bserver.stop()
286 1
        self.iserver.stop()
287
288 1
    def get_root_node(self):
289
        """
290
        Get Root node of server. Returns a Node object.
291
        """
292 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
293
294 1
    def get_objects_node(self):
295
        """
296
        Get Objects node of server. Returns a Node object.
297
        """
298 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
299
300 1
    def get_server_node(self):
301
        """
302
        Get Server node of server. Returns a Node object.
303
        """
304 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
305
306 1
    def get_node(self, nodeid):
307
        """
308
        Get a specific node using NodeId object or a string representing a NodeId
309
        """
310 1
        return Node(self.iserver.isession, nodeid)
311
312 1
    def create_subscription(self, period, handler):
313
        """
314
        Create a subscription.
315
        returns a Subscription object which allow
316
        to subscribe to events or data on server
317
        """
318 1
        params = ua.CreateSubscriptionParameters()
319 1
        params.RequestedPublishingInterval = period
320 1
        params.RequestedLifetimeCount = 3000
321 1
        params.RequestedMaxKeepAliveCount = 10000
322 1
        params.MaxNotificationsPerPublish = 0
323 1
        params.PublishingEnabled = True
324 1
        params.Priority = 0
325 1
        return Subscription(self.iserver.isession, params, handler)
326
327 1
    def get_namespace_array(self):
328
        """
329
        get all namespace defined in server
330
        """
331 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
332 1
        return ns_node.get_value()
333
334 1
    def register_namespace(self, uri):
335
        """
336
        Register a new namespace. Nodes should in custom namespace, not 0.
337
        """
338 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
339 1
        uries = ns_node.get_value()
340 1
        if uri in uries:
341 1
            return uries.index(uri)
342 1
        uries.append(uri)
343 1
        ns_node.set_value(uries)
344 1
        return len(uries) - 1
345
346 1
    def get_namespace_index(self, uri):
347
        """
348
        get index of a namespace using its uri
349
        """
350 1
        uries = self.get_namespace_array()
351 1
        return uries.index(uri)
352
353 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
354
        """
355
        Returns an event object using an event type from address space.
356
        Use this object to fire events
357
        """
358 1
        if not etype:
359 1
            etype = BaseEvent()
360 1
        return EventGenerator(self.iserver.isession, etype, source)
361
362 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
363 1
        if properties is None:
364 1
            properties = []
365 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
366
367 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
368 1
        if properties is None:
369
            properties = []
370 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
371
372 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
373 1
        if properties is None:
374
            properties = []
375 1
        if variables is None:
376
            variables = []
377 1
        if methods is None:
378
            methods = []
379 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
380
381
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
382
        # return self._create_custom_type(idx, name, basetype, properties)
383
384 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
385 1
        if properties is None:
386
            properties = []
387 1
        if variables is None:
388 1
            variables = []
389 1
        if methods is None:
390 1
            methods = []
391 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
392
393 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
394 1
        if isinstance(basetype, Node):
395 1
            base_t = basetype
396 1
        elif isinstance(basetype, ua.NodeId):
397 1
            base_t = Node(self.iserver.isession, basetype)
398
        else:
399 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
400
401 1
        custom_t = base_t.add_object_type(idx, name)
402 1
        for prop in properties:
403 1
            datatype = None
404 1
            if len(prop) > 2:
405
                datatype = prop[2]
406 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
407 1
        for variable in variables:
408 1
            datatype = None
409 1
            if len(variable) > 2:
410 1
                datatype = variable[2]
411 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
412 1
        for method in methods:
413 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
414
415 1
        return custom_t
416
417 1
    def import_xml(self, path):
418
        """
419
        Import nodes defined in xml
420
        """
421 1
        importer = xmlimporter.XmlImporter(self)
422 1
        return importer.import_xml(path)
423
424 1
    def export_xml(self, nodes, path):
425
        """
426
        Export defined nodes to xml
427
        """
428 1
        exp = XmlExporter(self)
429 1
        exp.build_etree(nodes)
430 1
        return exp.write_xml(path)
431
432 1
    def export_xml_by_ns(self, path, namespaces=None):
433
        """
434
        Export nodes of one or more namespaces to an XML file.  
435
        Namespaces used by nodes are always exported for consistency.
436
        Args:
437
            server: opc ua server to use
438
            path: name of the xml file to write
439
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
440
    
441
        Returns:
442
        """
443
        if namespaces is None:
444
            namespaces = []
445
        nodes = get_nodes_of_namespace(self, namespaces)
446
        self.export_xml(nodes, path)
447
448 1
    def delete_nodes(self, nodes, recursive=False):
449 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
450
451 1
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
452
        """
453
        Start historizing supplied nodes; see history module
454
        Args:
455
            node: node or list of nodes that can be historized (variables/properties)
456
            period: time delta to store the history; older data will be deleted from the storage
457
            count: number of changes to store in the history
458
459
        Returns:
460
        """
461 1
        nodes = [node]
462 1
        for node in nodes:
463 1
            self.iserver.enable_history_data_change(node, period, count)
464
465 1
    def dehistorize_node_data_change(self, node):
466
        """
467
        Stop historizing supplied nodes; see history module
468
        Args:
469
            node: node or list of nodes that can be historized (UA variables/properties)
470
471
        Returns:
472
        """
473
        nodes = [node]
474
        for node in nodes:
475
            self.iserver.disable_history_data_change(node)
476
477 1
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
478
        """
479
        Start historizing events from node (typically a UA object); see history module
480
        Args:
481
            node: node or list of nodes that can be historized (UA objects)
482
            period: time delta to store the history; older data will be deleted from the storage
483
            count: number of events to store in the history
484
485
        Returns:
486
        """
487 1
        nodes = [node]
488 1
        for node in nodes:
489 1
            self.iserver.enable_history_event(node, period, count)
490
491 1
    def dehistorize_node_event(self, node):
492
        """
493
        Stop historizing events from node (typically a UA object); see history module
494
        Args:
495
           node: node or list of nodes that can be historized (UA objects)
496
497
        Returns:
498
        """
499
        nodes = [node]
500
        for node in nodes:
501
            self.iserver.disable_history_event(node)
502
503 1
    def subscribe_server_callback(self, event, handle):
504
        self.iserver.subscribe_server_callback(event, handle)
505
506 1
    def unsubscribe_server_callback(self, event, handle):
507
        self.iserver.unsubscribe_server_callback(event, handle)
508
509 1
    def link_method(self, node, callback):
510
        """
511
        Link a python function to a UA method in the address space; required when a UA method has been imported
512
        to the address space via XML; the python executable must be linked manually
513
        Args:
514
            node: UA method node
515
            callback: python function that the UA method will call
516
517
        Returns:
518
        """
519
        self.iserver.isession.add_method_callback(node.nodeid, callback)
520