Completed
Push — master ( e5fff1...bc270d )
by Olivier
03:39
created

Server.export_xml_by_ns()   A

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 13
ccs 0
cts 0
cp 0
crap 2
rs 9.4285
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
# from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.server.event_generator import EventGenerator
17 1
from opcua.common.node import Node
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.xmlexporter import XmlExporter
26
from opcua.common.ua_utils import get_nodes_of_namespace
27
use_crypto = True
28
try:
29
    from opcua.crypto import uacrypto
30
except ImportError:
31 1
    print("cryptography is not installed, use of crypto disabled")
32
    use_crypto = False
33
34
35
class Server(object):
36
37
    """
38
    High level Server class
39
40
    This class creates an opcua server with default values
41
42
    Create your own namespace and then populate your server address space
43
    using use the get_root() or get_objects() to get Node objects.
44
    and get_event_object() to fire events.
45
    Then start server. See example_server.py
46
    All methods are threadsafe
47
48
    If you need more flexibility you call directly the Ua Service methods
49
    on the iserver  or iserver.isesssion object members.
50
51
    During startup the standard address space will be constructed, which may be
52
    time-consuming when running a server on a less powerful device (e.g. a
53
    Raspberry Pi). In order to improve startup performance, a optional path to a
54
    cache file can be passed to the server constructor.
55
    If the parameter is defined, the address space will be loaded from the
56
    cache file or the file will be created if it does not exist yet.
57
    As a result the first startup will be even slower due to the cache file
58
    generation but all further startups will be significantly faster.
59
60
    :ivar application_uri:
61
    :vartype application_uri: uri
62
    :ivar product_uri:
63 1
    :vartype product_uri: uri
64 1
    :ivar name:
65 1
    :vartype name: string
66 1
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
67 1
    :vartype default_timeout: int
68 1
    :ivar iserver: internal server object
69 1
    :vartype default_timeout: InternalServer
70 1
    :ivar bserver: binary protocol server
71 1
    :vartype bserver: BinaryServer
72 1
    :ivar nodes: shortcuts to common nodes
73 1
    :vartype nodes: Shortcuts
74 1
75 1
    """
76 1
77 1
    def __init__(self, cacheFile=None, iserver=None):
78
        self.logger = logging.getLogger(__name__)
79
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self.application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83
        self.application_type = ua.ApplicationType.ClientAndServer
84 1
        self.default_timeout = 3600000
85
        if iserver is not None:
86
            self.iserver = iserver
87
        else:
88 1
            self.iserver = InternalServer(cacheFile)
89
        self.bserver = None
90
        self._discovery_clients = {}
91 1
        self._discovery_period = 60
92
        self.certificate = None
93
        self.private_key = None
94
        self._policies = []
95 1
        self.nodes = Shortcuts(self.iserver.isession)
96
97 1
        # setup some expected values
98 1
        self.register_namespace(self.application_uri)
99
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
100 1
        sa_node.set_value([self.application_uri])
101
102
    def __enter__(self):
103
        self.start()
104
        return self
105
106
    def __exit__(self, exc_type, exc_value, traceback):
107 1
        self.stop()
108
109
    def load_certificate(self, path):
110
        """
111
        load server certificate from file, either pem or der
112
        """
113
        self.certificate = uacrypto.load_certificate(path)
114
115 1
    def load_private_key(self, path):
116
        self.private_key = uacrypto.load_private_key(path)
117 1
118
    def disable_clock(self, val=True):
119
        """
120
        for debugging you may want to disable clock that write every second
121 1
        to address space
122 1
        """
123 1
        self.iserver.disabled_clock = val
124 1
125 1
    def set_application_uri(self, uri):
126 1
        """
127
        Set application/server URI.
128 1
        This uri is supposed to be unique. If you intent to register
129
        your server to a discovery server, it really should be unique in
130
        your system!
131
        default is : "urn:freeopcua:python:server"
132
        """
133
        self.application_uri = uri
134
135
    def find_servers(self, uris=None):
136 1
        """
137 1
        find_servers. mainly implemented for simmetry with client
138 1
        """
139 1
        if uris is None:
140 1
            uris = []
141 1
        params = ua.FindServersParameters()
142 1
        params.EndpointUrl = self.endpoint.geturl()
143
        params.ServerUris = uris
144
        return self.iserver.find_servers(params)
145 1
146
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
147
        """
148
        Register to an OPC-UA Discovery server. Registering must be renewed at
149
        least every 10 minutes, so this method will use our asyncio thread to
150
        re-register every period seconds
151
        if period is 0 registration is not automatically renewed
152 1
        """
153
        # FIXME: habe a period per discovery
154
        if url in self._discovery_clients:
155
            self._discovery_clients[url].disconnect()
156
        self._discovery_clients[url] = Client(url)
157 1
        self._discovery_clients[url].connect()
158
        self._discovery_clients[url].register_server(self)
159
        self._discovery_period = period
160
        if period:
161
            self.iserver.loop.call_soon(self._renew_registration)
162
163
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
164
        """
165 1
        stop registration thread
166
        """
167
        # FIXME: is there really no way to deregister?
168
        self._discovery_clients[url].disconnect()
169
170
    def _renew_registration(self):
171 1
        for client in self._discovery_clients.values():
172 1
            client.register_server(self)
173
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
174 1
175 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
176
        """
177 1
        Create a client to discovery server and return it
178
        """
179 1
        client = Client(url)
180 1
        client.connect()
181 1
        return client
182 1
183
    def allow_remote_admin(self, allow):
184 1
        """
185
        Enable or disable the builtin Admin user from network clients
186
        """
187
        self.iserver.allow_remote_admin = allow
188
189 1
    def set_endpoint(self, url):
190
        self.endpoint = urlparse(url)
191 1
192
    def get_endpoints(self):
193
        return self.iserver.get_endpoints()
194
195
    def _setup_server_nodes(self):
196 1
        # to be called just before starting server since it needs all parameters to be setup
197
        self._set_endpoints()
198 1
        self._policies = [ua.SecurityPolicyFactory()]
199
        if self.certificate and self.private_key:
200
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
201
                                ua.MessageSecurityMode.SignAndEncrypt)
202
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
203 1
                                                           ua.MessageSecurityMode.SignAndEncrypt,
204
                                                           self.certificate,
205 1
                                                           self.private_key)
206
                                 )
207
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
208
                                ua.MessageSecurityMode.Sign)
209
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
210
                                                           ua.MessageSecurityMode.Sign,
211 1
                                                           self.certificate,
212 1
                                                           self.private_key)
213 1
                                 )
214 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
215
                                ua.MessageSecurityMode.SignAndEncrypt)
216 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
217 1
                                                           ua.MessageSecurityMode.SignAndEncrypt,
218 1
                                                           self.certificate,
219
                                                           self.private_key)
220 1
                                 )
221 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
222 1
                                ua.MessageSecurityMode.Sign)
223
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
224 1
                                                           ua.MessageSecurityMode.Sign,
225 1
                                                           self.certificate,
226 1
                                                           self.private_key)
227
                                 )
228 1
229 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
230 1
        idtoken = ua.UserTokenPolicy()
231 1
        idtoken.PolicyId = 'anonymous'
232 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
233 1
234
        idtoken2 = ua.UserTokenPolicy()
235 1
        idtoken2.PolicyId = 'certificate_basic256'
236 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
237 1
238 1
        idtoken3 = ua.UserTokenPolicy()
239 1
        idtoken3.PolicyId = 'certificate_basic128'
240 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
241 1
242 1
        idtoken4 = ua.UserTokenPolicy()
243 1
        idtoken4.PolicyId = 'username'
244 1
        idtoken4.TokenType = ua.UserTokenType.UserName
245 1
246
        appdesc = ua.ApplicationDescription()
247 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
248
        appdesc.ApplicationUri = self.application_uri
249
        appdesc.ApplicationType = self.application_type
250 1
        appdesc.ProductUri = self.product_uri
251
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
252
253
        edp = ua.EndpointDescription()
254 1
        edp.EndpointUrl = self.endpoint.geturl()
255 1
        edp.Server = appdesc
256 1
        if self.certificate:
257 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
258 1
        edp.SecurityMode = mode
259
        edp.SecurityPolicyUri = policy.URI
260 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
261
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
262
        edp.SecurityLevel = 0
263
        self.iserver.add_endpoint(edp)
264 1
265 1
    def set_server_name(self, name):
266 1
        self.name = name
267 1
268
    def start(self):
269 1
        """
270
        Start to listen on network
271
        """
272
        self._setup_server_nodes()
273 1
        self.iserver.start()
274
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
275 1
        self.bserver.set_policies(self._policies)
276
        self.bserver.start()
277
278
    def stop(self):
279 1
        """
280
        Stop server
281 1
        """
282
        for client in self._discovery_clients.values():
283
            client.disconnect()
284
        self.bserver.stop()
285 1
        self.iserver.stop()
286
287 1
    def get_root_node(self):
288
        """
289
        Get Root node of server. Returns a Node object.
290
        """
291 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
292
293 1
    def get_objects_node(self):
294
        """
295
        Get Objects node of server. Returns a Node object.
296
        """
297
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
298
299 1
    def get_server_node(self):
300 1
        """
301 1
        Get Server node of server. Returns a Node object.
302 1
        """
303 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
304 1
305 1
    def get_node(self, nodeid):
306 1
        """
307
        Get a specific node using NodeId object or a string representing a NodeId
308 1
        """
309
        return Node(self.iserver.isession, nodeid)
310
311
    def create_subscription(self, period, handler):
312 1
        """
313 1
        Create a subscription.
314
        returns a Subscription object which allow
315 1
        to subscribe to events or data on server
316
        """
317
        params = ua.CreateSubscriptionParameters()
318
        params.RequestedPublishingInterval = period
319 1
        params.RequestedLifetimeCount = 3000
320 1
        params.RequestedMaxKeepAliveCount = 10000
321 1
        params.MaxNotificationsPerPublish = 0
322 1
        params.PublishingEnabled = True
323 1
        params.Priority = 0
324
        return Subscription(self.iserver.isession, params, handler)
325 1
326
    def get_namespace_array(self):
327
        """
328
        get all namespace defined in server
329 1
        """
330 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
331
        return ns_node.get_value()
332 1
333
    def register_namespace(self, uri):
334
        """
335
        Register a new namespace. Nodes should in custom namespace, not 0.
336
        """
337 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
338 1
        uries = ns_node.get_value()
339 1
        if uri in uries:
340
            return uries.index(uri)
341 1
        uries.append(uri)
342
        ns_node.set_value(uries)
343 1
        return len(uries) - 1
344 1
345 1
    def get_namespace_index(self, uri):
346 1
        """
347
        get index of a namespace using its uri
348 1
        """
349
        uries = self.get_namespace_array()
350 1
        return uries.index(uri)
351 1
352 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
353
        """
354 1
        Returns an event object using an event type from address space.
355
        Use this object to fire events
356 1
        """
357
        if not etype:
358
            etype = BaseEvent()
359
        return EventGenerator(self.iserver.isession, etype, source)
360 1
361 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=[]):
362
        return self._create_custom_type(idx, name, basetype, properties, [], [])
363 1
364 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=[]):
365
        return self._create_custom_type(idx, name, basetype, properties, [], [])
366 1
367
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=[], variables=[], methods=[]):
368
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
369 1
370
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
371
        # return self._create_custom_type(idx, name, basetype, properties)
372
373
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=[], variables=[], methods=[]):
374
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
375
376
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
377
        if isinstance(basetype, Node):
378
            base_t = basetype
379
        elif isinstance(basetype, ua.NodeId):
380
            base_t = Node(self.iserver.isession, basetype)
381
        else:
382
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
383
384
        custom_t = base_t.add_object_type(idx, name)
385
        for prop in properties:
386
            datatype = None
387
            if len(prop) > 2:
388
                datatype = prop[2]
389
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
390
        for variable in variables:
391
            datatype = None
392
            if len(variable) > 2:
393
                datatype = variable[2]
394
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
395
        for method in methods:
396
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
397
398
        return custom_t
399
400
    def import_xml(self, path):
401
        """
402
        Import nodes defined in xml
403
        """
404
        importer = xmlimporter.XmlImporter(self)
405
        return importer.import_xml(path)
406
407
    def export_xml(self, nodes, path):
408
        """
409
        Export defined nodes to xml
410
        """
411
        exp = XmlExporter(self)
412
        exp.build_etree(nodes)
413
        return exp.write_xml(path)
414
415
    def export_xml_by_ns(self, path, namespaces=[]):
416
        """
417
        Export nodes of one or more namespaces to an XML file.  
418
        Namespaces used by nodes are always exported for consistency.
419
        Args:
420
            server: opc ua server to use
421
            path: name of the xml file to write
422
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
423
    
424
        Returns:
425
        """
426
        nodes = get_nodes_of_namespace(self, namespaces)
427
        self.export_xml(nodes, path)
428
429
    def delete_nodes(self, nodes, recursive=False):
430
        return delete_nodes(self.iserver.isession, nodes, recursive)
431
432
    def historize_node(self, node):
433
        self.iserver.enable_history(node)
434
435
    def dehistorize_node(self, node):
436
        self.iserver.disable_history(node)
437
438
    def subscribe_server_callback(self, event, handle):
439
        self.iserver.subscribe_server_callback(event, handle)
440
441
    def unsubscribe_server_callback(self, event, handle):
442
        self.iserver.unsubscribe_server_callback(event, handle)
443
444
    def link_method(self, node, callback):
445
        self.iserver.isession.add_method_callback(node.nodeid, callback)
446