Test Failed
Pull Request — master (#607)
by
unknown
08:41
created

Server._setup_server_nodes()   D

Complexity

Conditions 8

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 8

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 8
dl 0
loc 40
ccs 17
cts 17
cp 1
crap 8
rs 4
c 2
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta, datetime
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.structures import load_type_definitions
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.xmlimporter import XmlImporter
28 1
from opcua.common.ua_utils import get_nodes_of_namespace
29 1
use_crypto = True
30 1
try:
31 1
    from opcua.crypto import uacrypto
32
except ImportError:
33
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
34
    use_crypto = False
35
36
37 1
class Server(object):
38
39
    """
40
    High level Server class
41
42
    This class creates an opcua server with default values
43
44
    Create your own namespace and then populate your server address space
45
    using use the get_root() or get_objects() to get Node objects.
46
    and get_event_object() to fire events.
47
    Then start server. See example_server.py
48
    All methods are threadsafe
49
50
    If you need more flexibility you call directly the Ua Service methods
51
    on the iserver  or iserver.isession object members.
52
53
    During startup the standard address space will be constructed, which may be
54
    time-consuming when running a server on a less powerful device (e.g. a
55
    Raspberry Pi). In order to improve startup performance, a optional path to a
56
    cache file can be passed to the server constructor.
57
    If the parameter is defined, the address space will be loaded from the
58
    cache file or the file will be created if it does not exist yet.
59
    As a result the first startup will be even slower due to the cache file
60
    generation but all further start ups will be significantly faster.
61
62
    :ivar product_uri:
63
    :vartype product_uri: uri
64
    :ivar name:
65
    :vartype name: string
66
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
67
    :vartype default_timeout: int
68
    :ivar iserver: internal server object
69
    :vartype default_timeout: InternalServer
70
    :ivar bserver: binary protocol server
71
    :vartype bserver: BinaryServer
72
    :ivar nodes: shortcuts to common nodes
73
    :vartype nodes: Shortcuts
74
75
    """
76
77 1
    def __init__(self, shelffile=None, iserver=None, nosecurity=True):
78 1
        self.logger = logging.getLogger(__name__)
79 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self._application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.io:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83 1
        self.manufacturer_name = "FreeOpcUa"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.set_application_uri(self._application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self._application_uri])
102 1
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
103 1
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
104 1
        status = ua.ServerStatusDataType()
105 1
        status.BuildInfo.ProductUri = self.product_uri
106 1
        status.BuildInfo.ManufacturerName = self.manufacturer_name
107 1
        status.BuildInfo.ProductName = self.name
108 1
        status.BuildInfo.SoftwareVersion = "1.0pre"
109 1
        status.BuildInfo.BuildNumber = "0"
110 1
        status.BuildInfo.BuildDate = datetime.now()
111 1
        status.SecondsTillShutdown = 0
112 1
        status_node.set_value(status)
113 1
        build_node.set_value(status.BuildInfo)
114
115
116 1
        # kyp
117 1
        #self.security_endpoints = ["Basic128Rsa15_Sign", "Basic128Rsa15_SignAndEncrypt", "Basic256_Sign", "Basic256_SignAndEncrypt"]
118 1
        self.security_endpoints = ["Basic256_Sign"," Basic256_SignAndEncrypt"]
119
        self.nosecurity = nosecurity
120 1
        #self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
121 1
        self.policyIDs = ["Basic256"]
122
123 1
124
    def __enter__(self):
125
        self.start()
126
        return self
127 1
128
    def __exit__(self, exc_type, exc_value, traceback):
129 1
        self.stop()
130 1
131
    def load_certificate(self, path):
132 1
        """
133
        load server certificate from file, either pem or der
134
        """
135
        self.certificate = uacrypto.load_certificate(path)
136
137
    def load_private_key(self, path):
138
        self.private_key = uacrypto.load_private_key(path)
139 1
140
    def disable_clock(self, val=True):
141
        """
142
        for debugging you may want to disable clock that write every second
143
        to address space
144
        """
145
        self.iserver.disabled_clock = val
146
147 1
    def set_application_uri(self, uri):
148 1
        """
149 1
        Set application/server URI.
150 1
        This uri is supposed to be unique. If you intent to register
151 1
        your server to a discovery server, it really should be unique in
152
        your system!
153 1
        default is : "urn:freeopcua:python:server"
154 1
        """
155
        self._application_uri = uri
156 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
157
        uries = ns_node.get_value()
158
        if len(uries) > 1:
159
            uries[1] = uri  # application uri is always namespace 1
160 1
        else:
161 1
            uries.append(uri)
162 1
        ns_node.set_value(uries)
163 1
164 1
    def find_servers(self, uris=None):
165 1
        """
166
        find_servers. mainly implemented for symmetry with client
167 1
        """
168
        if uris is None:
169
            uris = []
170
        params = ua.FindServersParameters()
171
        params.EndpointUrl = self.endpoint.geturl()
172
        params.ServerUris = uris
173
        return self.iserver.find_servers(params)
174
175 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
176 1
        """
177 1
        Register to an OPC-UA Discovery server. Registering must be renewed at
178 1
        least every 10 minutes, so this method will use our asyncio thread to
179 1
        re-register every period seconds
180 1
        if period is 0 registration is not automatically renewed
181 1
        """
182
        # FIXME: have a period per discovery
183
        if url in self._discovery_clients:
184 1
            self._discovery_clients[url].disconnect()
185
        self._discovery_clients[url] = Client(url)
186
        self._discovery_clients[url].connect()
187
        self._discovery_clients[url].register_server(self)
188
        self._discovery_period = period
189
        if period:
190
            self.iserver.loop.call_soon(self._renew_registration)
191 1
192
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
193
        """
194
        stop registration thread
195
        """
196 1
        # FIXME: is there really no way to deregister?
197
        self._discovery_clients[url].disconnect()
198
199
    def _renew_registration(self):
200
        for client in self._discovery_clients.values():
201
            client.register_server(self)
202
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
203
204 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
205
        """
206
        Create a client to discovery server and return it
207
        """
208
        client = Client(url)
209
        client.connect()
210 1
        return client
211 1
212
    def allow_remote_admin(self, allow):
213 1
        """
214 1
        Enable or disable the builtin Admin user from network clients
215
        """
216 1
        self.iserver.allow_remote_admin = allow
217
218 1
    def set_endpoint(self, url):
219 1
        self.endpoint = urlparse(url)
220 1
221 1
    def get_endpoints(self):
222
        return self.iserver.get_endpoints()
223 1
224
    def _setup_server_nodes(self):
225
        # to be called just before starting server since it needs all parameters to be setup
226
        # comment out kyp
227
        if self.nosecurity:
228 1
            #pdb.set_trace()
229
            self._set_endpoints()
230 1
            self._policies = [ua.SecurityPolicyFactory()]
231
232
        if self.certificate and self.private_key:
233
            if "Basic128Rsa15_Sign" in self.security_endpoints:
234
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
235 1
                                    ua.MessageSecurityMode.SignAndEncrypt)
236
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
237 1
                                                           ua.MessageSecurityMode.SignAndEncrypt,
238
                                                           self.certificate,
239
                                                           self.private_key)
240
                                 )
241
            if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
242 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
243
                                    ua.MessageSecurityMode.Sign)
244 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
245
                                                           ua.MessageSecurityMode.Sign,
246
                                                           self.certificate,
247
                                                           self.private_key)
248
                                 )
249
            if "Basic256_Sign" in self.security_endpoints:
250 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
251 1
                                    ua.MessageSecurityMode.SignAndEncrypt)
252 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
253 1
                                                           ua.MessageSecurityMode.SignAndEncrypt,
254
                                                           self.certificate,
255 1
                                                           self.private_key)
256 1
                                 )
257 1
            if "Basic256_SignAndEncrypt" in self.security_endpoints:
258
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
259 1
                                    ua.MessageSecurityMode.Sign)
260 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
261 1
                                                           ua.MessageSecurityMode.Sign,
262
                                                           self.certificate,
263 1
                                                           self.private_key)
264 1
                                 )
265 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
266
        idtokens = []
267 1
        if "Anonymous" in self.policyIDs:
268 1
            idtoken1 = ua.UserTokenPolicy()
269 1
            idtoken1.PolicyId = 'anonymous'
270 1
            idtoken1.TokenType = ua.UserTokenType.Anonymous
271 1
            idtokens.append(idtoken1)
272 1
273
        if "Basic256" in self.policyIDs:
274 1
            idtoken2 = ua.UserTokenPolicy()
275 1
            idtoken2.PolicyId = 'certificate_basic256'
276 1
            idtoken2.TokenType = ua.UserTokenType.Certificate
277 1
            idtokens.append(idtoken2)
278 1
279 1
        if "Basic128" in self.policyIDs:
280 1
            idtoken3 = ua.UserTokenPolicy()
281 1
            idtoken3.PolicyId = 'certificate_basic128'
282 1
            idtoken3.TokenType = ua.UserTokenType.Certificate
283 1
            idtokens.append(idtoken3)
284 1
285
        if "Username" in self.policyIDs:
286 1
            idtoken4 = ua.UserTokenPolicy()
287
            idtoken4.PolicyId = 'username'
288
            idtoken4.TokenType = ua.UserTokenType.UserName
289 1
            idtokens.append(idtoken4)
290
291
        appdesc = ua.ApplicationDescription()
292
        appdesc.ApplicationName = ua.LocalizedText(self.name)
293 1
        appdesc.ApplicationUri = self._application_uri
294 1
        appdesc.ApplicationType = self.application_type
295 1
        appdesc.ProductUri = self.product_uri
296 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
297 1
298 1
        edp = ua.EndpointDescription()
299 1
        edp.EndpointUrl = self.endpoint.geturl()
300 1
        edp.Server = appdesc
301 1
        if self.certificate:
302
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
303
        edp.SecurityMode = mode
304 1
        edp.SecurityPolicyUri = policy.URI
305
        edp.UserIdentityTokens = idtokens
306
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
307
        edp.SecurityLevel = 0
308 1
        self.iserver.add_endpoint(edp)
309 1
310 1
    def set_server_name(self, name):
311 1
        self.name = name
312
313 1
    def start(self):
314
        """
315
        Start to listen on network
316
        """
317 1
        self._setup_server_nodes()
318
        self.iserver.start()
319 1
        try:
320
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
321
            self.bserver.set_policies(self._policies)
322
            self.bserver.start()
323 1
        except Exception as exp:
324
            self.iserver.stop()
325 1
            raise exp
326
327
328
    def stop(self):
329 1
        """
330
        Stop server
331 1
        """
332
        for client in self._discovery_clients.values():
333
            client.disconnect()
334
        self.bserver.stop()
335 1
        self.iserver.stop()
336
337 1
    def get_root_node(self):
338
        """
339
        Get Root node of server. Returns a Node object.
340
        """
341
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
342
343
    def get_objects_node(self):
344
        """
345
        Get Objects node of server. Returns a Node object.
346
        """
347
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
348 1
349 1
    def get_server_node(self):
350 1
        """
351 1
        Get Server node of server. Returns a Node object.
352 1
        """
353 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
354 1
355 1
    def get_node(self, nodeid):
356
        """
357 1
        Get a specific node using NodeId object or a string representing a NodeId
358
        """
359
        return Node(self.iserver.isession, nodeid)
360
361 1
    def create_subscription(self, period, handler):
362 1
        """
363
        Create a subscription.
364 1
        returns a Subscription object which allow
365
        to subscribe to events or data on server
366
        period is in milliseconds
367
        handler is a python object with following methods:
368 1
            def datachange_notification(self, node, val, data):
369 1
            def event_notification(self, event):
370 1
            def status_change_notification(self, status):
371 1
        """
372 1
        params = ua.CreateSubscriptionParameters()
373 1
        params.RequestedPublishingInterval = period
374 1
        params.RequestedLifetimeCount = 3000
375
        params.RequestedMaxKeepAliveCount = 10000
376 1
        params.MaxNotificationsPerPublish = 0
377
        params.PublishingEnabled = True
378
        params.Priority = 0
379
        return Subscription(self.iserver.isession, params, handler)
380 1
381 1
    def get_namespace_array(self):
382
        """
383 1
        get all namespace defined in server
384
        """
385
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
386
        return ns_node.get_value()
387
388 1
    def register_namespace(self, uri):
389 1
        """
390 1
        Register a new namespace. Nodes should in custom namespace, not 0.
391
        """
392 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
393 1
        uries = ns_node.get_value()
394
        if uri in uries:
395 1
            return uries.index(uri)
396
        uries.append(uri)
397 1
        ns_node.set_value(uries)
398 1
        return len(uries) - 1
399
400 1
    def get_namespace_index(self, uri):
401
        """
402 1
        get index of a namespace using its uri
403 1
        """
404
        uries = self.get_namespace_array()
405 1
        return uries.index(uri)
406
407 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
408
        """
409 1
        Returns an event object using an event type from address space.
410
        Use this object to fire events
411
        """
412
        if not etype:
413
            etype = BaseEvent()
414 1
        return EventGenerator(self.iserver.isession, etype, source)
415 1
416
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
417 1
        if properties is None:
418 1
            properties = []
419 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
420 1
421 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
422
        if properties is None:
423 1
            properties = []
424 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
425 1
426 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
427 1
        if properties is None:
428
            properties = []
429 1
        if variables is None:
430
            variables = []
431 1
        if methods is None:
432 1
            methods = []
433 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
434 1
435
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
436 1
        # return self._create_custom_type(idx, name, basetype, properties)
437 1
438 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
439 1
        if properties is None:
440 1
            properties = []
441 1
        if variables is None:
442 1
            variables = []
443 1
        if methods is None:
444
            methods = []
445 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
446
447 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
448
        if isinstance(basetype, Node):
449
            base_t = basetype
450
        elif isinstance(basetype, ua.NodeId):
451 1
            base_t = Node(self.iserver.isession, basetype)
452 1
        else:
453
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
454 1
455
        custom_t = base_t.add_object_type(idx, name)
456
        for prop in properties:
457
            datatype = None
458 1
            if len(prop) > 2:
459 1
                datatype = prop[2]
460 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
461
        for variable in variables:
462 1
            datatype = None
463
            if len(variable) > 2:
464
                datatype = variable[2]
465
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
466
        for method in methods:
467
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
468
469
        return custom_t
470
471
    def import_xml(self, path=None, xmlstring=None):
472
        """
473
        Import nodes defined in xml
474
        """
475
        importer = XmlImporter(self)
476
        return importer.import_xml(path, xmlstring)
477
478 1
    def export_xml(self, nodes, path):
479 1
        """
480
        Export defined nodes to xml
481 1
        """
482
        exp = XmlExporter(self)
483
        exp.build_etree(nodes)
484
        return exp.write_xml(path)
485
486
    def export_xml_by_ns(self, path, namespaces=None):
487
        """
488
        Export nodes of one or more namespaces to an XML file.
489
        Namespaces used by nodes are always exported for consistency.
490
        Args:
491 1
            server: opc ua server to use
492 1
            path: name of the xml file to write
493 1
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
494
495 1
        Returns:
496
        """
497
        if namespaces is None:
498
            namespaces = []
499
        nodes = get_nodes_of_namespace(self, namespaces)
500
        self.export_xml(nodes, path)
501
502
    def delete_nodes(self, nodes, recursive=False):
503
        return delete_nodes(self.iserver.isession, nodes, recursive)
504
505
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
506
        """
507 1
        Start historizing supplied nodes; see history module
508
        Args:
509
            node: node or list of nodes that can be historized (variables/properties)
510
            period: time delta to store the history; older data will be deleted from the storage
511
            count: number of changes to store in the history
512
513
        Returns:
514
        """
515
        nodes = node if isinstance(node, (list, tuple)) else [node]
516
        for node in nodes:
517 1
            self.iserver.enable_history_data_change(node, period, count)
518 1
519 1
    def dehistorize_node_data_change(self, node):
520
        """
521 1
        Stop historizing supplied nodes; see history module
522
        Args:
523
            node: node or list of nodes that can be historized (UA variables/properties)
524
525
        Returns:
526
        """
527
        nodes = node if isinstance(node, (list, tuple)) else [node]
528
        for node in nodes:
529
            self.iserver.disable_history_data_change(node)
530
531
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
532
        """
533 1
        Start historizing events from node (typically a UA object); see history module
534
        Args:
535
            node: node or list of nodes that can be historized (UA objects)
536 1
            period: time delta to store the history; older data will be deleted from the storage
537
            count: number of events to store in the history
538
539 1
        Returns:
540
        """
541
        nodes = node if isinstance(node, (list, tuple)) else [node]
542
        for node in nodes:
543
            self.iserver.enable_history_event(node, period, count)
544
545
    def dehistorize_node_event(self, node):
546
        """
547
        Stop historizing events from node (typically a UA object); see history module
548
        Args:
549
           node: node or list of nodes that can be historized (UA objects)
550
551 1
        Returns:
552
        """
553
        nodes = node if isinstance(node, (list, tuple)) else [node]
554
        for node in nodes:
555
            self.iserver.disable_history_event(node)
556
557
    def subscribe_server_callback(self, event, handle):
558
        self.iserver.subscribe_server_callback(event, handle)
559
560
    def unsubscribe_server_callback(self, event, handle):
561
        self.iserver.unsubscribe_server_callback(event, handle)
562
563
    def link_method(self, node, callback):
564
        """
565
        Link a python function to a UA method in the address space; required when a UA method has been imported
566
        to the address space via XML; the python executable must be linked manually
567
        Args:
568
            node: UA method node
569
            callback: python function that the UA method will call
570
571
        Returns:
572
        """
573
        self.iserver.isession.add_method_callback(node.nodeid, callback)
574
575
    def load_type_definitions(self, nodes=None):
576
        return load_type_definitions(self, nodes)
577