Passed
Pull Request — master (#607)
by
unknown
05:33
created

Server._setup_server_nodes()   D

Complexity

Conditions 8

Size

Total Lines 38

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 8

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 8
c 3
b 0
f 0
dl 0
loc 38
ccs 17
cts 17
cp 1
crap 8
rs 4
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta, datetime
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.structures import load_type_definitions
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.xmlimporter import XmlImporter
28 1
from opcua.common.ua_utils import get_nodes_of_namespace
29 1
use_crypto = True
30 1
try:
31 1
    from opcua.crypto import uacrypto
32
except ImportError:
33
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
34
    use_crypto = False
35
36
37 1
class Server(object):
38
39
    """
40
    High level Server class
41
42
    This class creates an opcua server with default values
43
44
    Create your own namespace and then populate your server address space
45
    using use the get_root() or get_objects() to get Node objects.
46
    and get_event_object() to fire events.
47
    Then start server. See example_server.py
48
    All methods are threadsafe
49
50
    If you need more flexibility you call directly the Ua Service methods
51
    on the iserver  or iserver.isession object members.
52
53
    During startup the standard address space will be constructed, which may be
54
    time-consuming when running a server on a less powerful device (e.g. a
55
    Raspberry Pi). In order to improve startup performance, a optional path to a
56
    cache file can be passed to the server constructor.
57
    If the parameter is defined, the address space will be loaded from the
58
    cache file or the file will be created if it does not exist yet.
59
    As a result the first startup will be even slower due to the cache file
60
    generation but all further start ups will be significantly faster.
61
62
    :ivar product_uri:
63
    :vartype product_uri: uri
64
    :ivar name:
65
    :vartype name: string
66
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
67
    :vartype default_timeout: int
68
    :ivar iserver: internal server object
69
    :vartype default_timeout: InternalServer
70
    :ivar bserver: binary protocol server
71
    :vartype bserver: BinaryServer
72
    :ivar nodes: shortcuts to common nodes
73
    :vartype nodes: Shortcuts
74
75
    """
76
77 1
    def __init__(self, shelffile=None, iserver=None, nosecurity=True):
78 1
        self.logger = logging.getLogger(__name__)
79 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self._application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.io:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83 1
        self.manufacturer_name = "FreeOpcUa"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.set_application_uri(self._application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self._application_uri])
102 1
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
103 1
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
104 1
        status = ua.ServerStatusDataType()
105 1
        status.BuildInfo.ProductUri = self.product_uri
106 1
        status.BuildInfo.ManufacturerName = self.manufacturer_name
107 1
        status.BuildInfo.ProductName = self.name
108 1
        status.BuildInfo.SoftwareVersion = "1.0pre"
109 1
        status.BuildInfo.BuildNumber = "0"
110 1
        status.BuildInfo.BuildDate = datetime.now()
111 1
        status.SecondsTillShutdown = 0
112 1
        status_node.set_value(status)
113 1
        build_node.set_value(status.BuildInfo)
114
115
116 1
        self.security_endpoints = ["None", "Basic128Rsa15_Sign",
117
                                    "Basic128Rsa15_SignAndEncrypt", 
118
                                    "Basic256_Sign", "Basic256_SignAndEncrypt"]
119 1
        self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
120
121
122 1
    def __enter__(self):
123 1
        self.start()
124 1
        return self
125
126 1
    def __exit__(self, exc_type, exc_value, traceback):
127 1
        self.stop()
128
129 1
    def load_certificate(self, path):
130
        """
131
        load server certificate from file, either pem or der
132
        """
133 1
        self.certificate = uacrypto.load_certificate(path)
134
135 1
    def load_private_key(self, path):
136 1
        self.private_key = uacrypto.load_private_key(path)
137
138 1
    def disable_clock(self, val=True):
139
        """
140
        for debugging you may want to disable clock that write every second
141
        to address space
142
        """
143
        self.iserver.disabled_clock = val
144
145 1
    def set_application_uri(self, uri):
146
        """
147
        Set application/server URI.
148
        This uri is supposed to be unique. If you intent to register
149
        your server to a discovery server, it really should be unique in
150
        your system!
151
        default is : "urn:freeopcua:python:server"
152
        """
153 1
        self._application_uri = uri
154 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
155 1
        uries = ns_node.get_value()
156 1
        if len(uries) > 1:
157 1
            uries[1] = uri  # application uri is always namespace 1
158
        else:
159 1
            uries.append(uri)
160 1
        ns_node.set_value(uries)
161
162 1
    def find_servers(self, uris=None):
163
        """
164
        find_servers. mainly implemented for symmetry with client
165
        """
166 1
        if uris is None:
167 1
            uris = []
168 1
        params = ua.FindServersParameters()
169 1
        params.EndpointUrl = self.endpoint.geturl()
170 1
        params.ServerUris = uris
171 1
        return self.iserver.find_servers(params)
172
173 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
174
        """
175
        Register to an OPC-UA Discovery server. Registering must be renewed at
176
        least every 10 minutes, so this method will use our asyncio thread to
177
        re-register every period seconds
178
        if period is 0 registration is not automatically renewed
179
        """
180
        # FIXME: have a period per discovery
181 1
        if url in self._discovery_clients:
182 1
            self._discovery_clients[url].disconnect()
183 1
        self._discovery_clients[url] = Client(url)
184 1
        self._discovery_clients[url].connect()
185 1
        self._discovery_clients[url].register_server(self)
186 1
        self._discovery_period = period
187 1
        if period:
188
            self.iserver.loop.call_soon(self._renew_registration)
189
190 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
191
        """
192
        stop registration thread
193
        """
194
        # FIXME: is there really no way to deregister?
195
        self._discovery_clients[url].disconnect()
196
197 1
    def _renew_registration(self):
198
        for client in self._discovery_clients.values():
199
            client.register_server(self)
200
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
201
202 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
203
        """
204
        Create a client to discovery server and return it
205
        """
206
        client = Client(url)
207
        client.connect()
208
        return client
209
210 1
    def allow_remote_admin(self, allow):
211
        """
212
        Enable or disable the builtin Admin user from network clients
213
        """
214
        self.iserver.allow_remote_admin = allow
215
216 1
    def set_endpoint(self, url):
217 1
        self.endpoint = urlparse(url)
218
219 1
    def get_endpoints(self):
220 1
        return self.iserver.get_endpoints()
221
222 1
    def _setup_server_nodes(self):
223
        # to be called just before starting server since it needs all parameters to be setup
224 1
        if "None" in self.security_endpoints:
225 1
            self._set_endpoints()
226 1
            self._policies = [ua.SecurityPolicyFactory()]
227
228 1
        if self.certificate and self.private_key:
229 1
            if "Basic128Rsa15_Sign" in self.security_endpoints:
230 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
231
                                    ua.MessageSecurityMode.SignAndEncrypt)
232 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
233
                                                           ua.MessageSecurityMode.SignAndEncrypt,
234
                                                           self.certificate,
235
                                                           self.private_key)
236
                                 )
237 1
            if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
238 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
239
                                    ua.MessageSecurityMode.Sign)
240 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
241
                                                           ua.MessageSecurityMode.Sign,
242
                                                           self.certificate,
243
                                                           self.private_key)
244
                                 )
245 1
            if "Basic256_Sign" in self.security_endpoints:
246 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
247
                                    ua.MessageSecurityMode.SignAndEncrypt)
248 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
249
                                                           ua.MessageSecurityMode.SignAndEncrypt,
250
                                                           self.certificate,
251
                                                           self.private_key)
252
                                 )
253 1
            if "Basic256_SignAndEncrypt" in self.security_endpoints:
254 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
255
                                    ua.MessageSecurityMode.Sign)
256 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
257
                                                           ua.MessageSecurityMode.Sign,
258
                                                           self.certificate,
259
                                                           self.private_key)
260
                                 )
261 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
262 1
        idtokens = []
263 1
        if "Anonymous" in self.policyIDs:
264 1
            idtoken1 = ua.UserTokenPolicy()
265 1
            idtoken1.PolicyId = 'anonymous'
266 1
            idtoken1.TokenType = ua.UserTokenType.Anonymous
267 1
            idtokens.append(idtoken1)
268
269 1
        if "Basic256" in self.policyIDs:
270 1
            idtoken2 = ua.UserTokenPolicy()
271 1
            idtoken2.PolicyId = 'certificate_basic256'
272 1
            idtoken2.TokenType = ua.UserTokenType.Certificate
273 1
            idtokens.append(idtoken2)
274
275 1
        if "Basic128" in self.policyIDs:
276 1
            idtoken3 = ua.UserTokenPolicy()
277 1
            idtoken3.PolicyId = 'certificate_basic128'
278 1
            idtoken3.TokenType = ua.UserTokenType.Certificate
279 1
            idtokens.append(idtoken3)
280
281 1
        if "Username" in self.policyIDs:
282 1
            idtoken4 = ua.UserTokenPolicy()
283 1
            idtoken4.PolicyId = 'username'
284 1
            idtoken4.TokenType = ua.UserTokenType.UserName
285 1
            idtokens.append(idtoken4)
286
287 1
        appdesc = ua.ApplicationDescription()
288 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
289 1
        appdesc.ApplicationUri = self._application_uri
290 1
        appdesc.ApplicationType = self.application_type
291 1
        appdesc.ProductUri = self.product_uri
292 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
293
294 1
        edp = ua.EndpointDescription()
295 1
        edp.EndpointUrl = self.endpoint.geturl()
296 1
        edp.Server = appdesc
297 1
        if self.certificate:
298 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
299 1
        edp.SecurityMode = mode
300 1
        edp.SecurityPolicyUri = policy.URI
301 1
        edp.UserIdentityTokens = idtokens
302 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
303 1
        edp.SecurityLevel = 0
304 1
        self.iserver.add_endpoint(edp)
305
306 1
    def set_server_name(self, name):
307
        self.name = name
308
309 1
    def start(self):
310
        """
311
        Start to listen on network
312
        """
313 1
        self._setup_server_nodes()
314 1
        self.iserver.start()
315 1
        try:
316 1
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
317 1
            self.bserver.set_policies(self._policies)
318 1
            self.bserver.start()
319 1
        except Exception as exp:
320 1
            self.iserver.stop()
321 1
            raise exp
322
323
324 1
    def stop(self):
325
        """
326
        Stop server
327
        """
328 1
        for client in self._discovery_clients.values():
329 1
            client.disconnect()
330 1
        self.bserver.stop()
331 1
        self.iserver.stop()
332
333 1
    def get_root_node(self):
334
        """
335
        Get Root node of server. Returns a Node object.
336
        """
337 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
338
339 1
    def get_objects_node(self):
340
        """
341
        Get Objects node of server. Returns a Node object.
342
        """
343 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
344
345 1
    def get_server_node(self):
346
        """
347
        Get Server node of server. Returns a Node object.
348
        """
349 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
350
351 1
    def get_node(self, nodeid):
352
        """
353
        Get a specific node using NodeId object or a string representing a NodeId
354
        """
355 1
        return Node(self.iserver.isession, nodeid)
356
357 1
    def create_subscription(self, period, handler):
358
        """
359
        Create a subscription.
360
        returns a Subscription object which allow
361
        to subscribe to events or data on server
362
        period is in milliseconds
363
        handler is a python object with following methods:
364
            def datachange_notification(self, node, val, data):
365
            def event_notification(self, event):
366
            def status_change_notification(self, status):
367
        """
368 1
        params = ua.CreateSubscriptionParameters()
369 1
        params.RequestedPublishingInterval = period
370 1
        params.RequestedLifetimeCount = 3000
371 1
        params.RequestedMaxKeepAliveCount = 10000
372 1
        params.MaxNotificationsPerPublish = 0
373 1
        params.PublishingEnabled = True
374 1
        params.Priority = 0
375 1
        return Subscription(self.iserver.isession, params, handler)
376
377 1
    def get_namespace_array(self):
378
        """
379
        get all namespace defined in server
380
        """
381 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
382 1
        return ns_node.get_value()
383
384 1
    def register_namespace(self, uri):
385
        """
386
        Register a new namespace. Nodes should in custom namespace, not 0.
387
        """
388 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
389 1
        uries = ns_node.get_value()
390 1
        if uri in uries:
391 1
            return uries.index(uri)
392 1
        uries.append(uri)
393 1
        ns_node.set_value(uries)
394 1
        return len(uries) - 1
395
396 1
    def get_namespace_index(self, uri):
397
        """
398
        get index of a namespace using its uri
399
        """
400 1
        uries = self.get_namespace_array()
401 1
        return uries.index(uri)
402
403 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
404
        """
405
        Returns an event object using an event type from address space.
406
        Use this object to fire events
407
        """
408 1
        if not etype:
409 1
            etype = BaseEvent()
410 1
        return EventGenerator(self.iserver.isession, etype, source)
411
412 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
413 1
        if properties is None:
414
            properties = []
415 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
416
417 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
418 1
        if properties is None:
419
            properties = []
420 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
421
422 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
423 1
        if properties is None:
424
            properties = []
425 1
        if variables is None:
426
            variables = []
427 1
        if methods is None:
428
            methods = []
429 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
430
431
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
432
        # return self._create_custom_type(idx, name, basetype, properties)
433
434 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
435 1
        if properties is None:
436
            properties = []
437 1
        if variables is None:
438 1
            variables = []
439 1
        if methods is None:
440 1
            methods = []
441 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
442
443 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
444 1
        if isinstance(basetype, Node):
445 1
            base_t = basetype
446 1
        elif isinstance(basetype, ua.NodeId):
447 1
            base_t = Node(self.iserver.isession, basetype)
448
        else:
449 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
450
451 1
        custom_t = base_t.add_object_type(idx, name)
452 1
        for prop in properties:
453 1
            datatype = None
454 1
            if len(prop) > 2:
455
                datatype = prop[2]
456 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
457 1
        for variable in variables:
458 1
            datatype = None
459 1
            if len(variable) > 2:
460 1
                datatype = variable[2]
461 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
462 1
        for method in methods:
463 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
464
465 1
        return custom_t
466
467 1
    def import_xml(self, path=None, xmlstring=None):
468
        """
469
        Import nodes defined in xml
470
        """
471 1
        importer = XmlImporter(self)
472 1
        return importer.import_xml(path, xmlstring)
473
474 1
    def export_xml(self, nodes, path):
475
        """
476
        Export defined nodes to xml
477
        """
478 1
        exp = XmlExporter(self)
479 1
        exp.build_etree(nodes)
480 1
        return exp.write_xml(path)
481
482 1
    def export_xml_by_ns(self, path, namespaces=None):
483
        """
484
        Export nodes of one or more namespaces to an XML file.
485
        Namespaces used by nodes are always exported for consistency.
486
        Args:
487
            server: opc ua server to use
488
            path: name of the xml file to write
489
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
490
491
        Returns:
492
        """
493
        if namespaces is None:
494
            namespaces = []
495
        nodes = get_nodes_of_namespace(self, namespaces)
496
        self.export_xml(nodes, path)
497
498 1
    def delete_nodes(self, nodes, recursive=False):
499 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
500
501 1
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
502
        """
503
        Start historizing supplied nodes; see history module
504
        Args:
505
            node: node or list of nodes that can be historized (variables/properties)
506
            period: time delta to store the history; older data will be deleted from the storage
507
            count: number of changes to store in the history
508
509
        Returns:
510
        """
511 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
512 1
        for node in nodes:
513 1
            self.iserver.enable_history_data_change(node, period, count)
514
515 1
    def dehistorize_node_data_change(self, node):
516
        """
517
        Stop historizing supplied nodes; see history module
518
        Args:
519
            node: node or list of nodes that can be historized (UA variables/properties)
520
521
        Returns:
522
        """
523
        nodes = node if isinstance(node, (list, tuple)) else [node]
524
        for node in nodes:
525
            self.iserver.disable_history_data_change(node)
526
527 1
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
528
        """
529
        Start historizing events from node (typically a UA object); see history module
530
        Args:
531
            node: node or list of nodes that can be historized (UA objects)
532
            period: time delta to store the history; older data will be deleted from the storage
533
            count: number of events to store in the history
534
535
        Returns:
536
        """
537 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
538 1
        for node in nodes:
539 1
            self.iserver.enable_history_event(node, period, count)
540
541 1
    def dehistorize_node_event(self, node):
542
        """
543
        Stop historizing events from node (typically a UA object); see history module
544
        Args:
545
           node: node or list of nodes that can be historized (UA objects)
546
547
        Returns:
548
        """
549
        nodes = node if isinstance(node, (list, tuple)) else [node]
550
        for node in nodes:
551
            self.iserver.disable_history_event(node)
552
553 1
    def subscribe_server_callback(self, event, handle):
554
        self.iserver.subscribe_server_callback(event, handle)
555
556 1
    def unsubscribe_server_callback(self, event, handle):
557
        self.iserver.unsubscribe_server_callback(event, handle)
558
559 1
    def link_method(self, node, callback):
560
        """
561
        Link a python function to a UA method in the address space; required when a UA method has been imported
562
        to the address space via XML; the python executable must be linked manually
563
        Args:
564
            node: UA method node
565
            callback: python function that the UA method will call
566
567
        Returns:
568
        """
569
        self.iserver.isession.add_method_callback(node.nodeid, callback)
570
571 1
    def load_type_definitions(self, nodes=None):
572
        return load_type_definitions(self, nodes)
573