Completed
Pull Request — master (#607)
by
unknown
12:31 queued 06:51
created

Server._setup_server_nodes()   D

Complexity

Conditions 8

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 8

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 8
dl 0
loc 40
ccs 17
cts 17
cp 1
crap 8
rs 4
c 2
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta, datetime
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.structures import load_type_definitions
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.xmlimporter import XmlImporter
28 1
from opcua.common.ua_utils import get_nodes_of_namespace
29 1
use_crypto = True
30 1
try:
31 1
    from opcua.crypto import uacrypto
32
except ImportError:
33
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
34
    use_crypto = False
35
36
37 1
class Server(object):
38
39
    """
40
    High level Server class
41
42
    This class creates an opcua server with default values
43
44
    Create your own namespace and then populate your server address space
45
    using use the get_root() or get_objects() to get Node objects.
46
    and get_event_object() to fire events.
47
    Then start server. See example_server.py
48
    All methods are threadsafe
49
50
    If you need more flexibility you call directly the Ua Service methods
51
    on the iserver  or iserver.isession object members.
52
53
    During startup the standard address space will be constructed, which may be
54
    time-consuming when running a server on a less powerful device (e.g. a
55
    Raspberry Pi). In order to improve startup performance, a optional path to a
56
    cache file can be passed to the server constructor.
57
    If the parameter is defined, the address space will be loaded from the
58
    cache file or the file will be created if it does not exist yet.
59
    As a result the first startup will be even slower due to the cache file
60
    generation but all further start ups will be significantly faster.
61
62
    :ivar product_uri:
63
    :vartype product_uri: uri
64
    :ivar name:
65
    :vartype name: string
66
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
67
    :vartype default_timeout: int
68
    :ivar iserver: internal server object
69
    :vartype default_timeout: InternalServer
70
    :ivar bserver: binary protocol server
71
    :vartype bserver: BinaryServer
72
    :ivar nodes: shortcuts to common nodes
73
    :vartype nodes: Shortcuts
74
75
    """
76
77 1
    def __init__(self, shelffile=None, iserver=None, nosecurity=True):
78 1
        self.logger = logging.getLogger(__name__)
79 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self._application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.io:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83 1
        self.manufacturer_name = "FreeOpcUa"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.set_application_uri(self._application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self._application_uri])
102 1
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
103 1
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
104 1
        status = ua.ServerStatusDataType()
105 1
        status.BuildInfo.ProductUri = self.product_uri
106 1
        status.BuildInfo.ManufacturerName = self.manufacturer_name
107 1
        status.BuildInfo.ProductName = self.name
108 1
        status.BuildInfo.SoftwareVersion = "1.0pre"
109 1
        status.BuildInfo.BuildNumber = "0"
110 1
        status.BuildInfo.BuildDate = datetime.now()
111 1
        status.SecondsTillShutdown = 0
112 1
        status_node.set_value(status)
113 1
        build_node.set_value(status.BuildInfo)
114
115
116
        # kyp
117 1
        self.security_endpoints = ["Basic128Rsa15_Sign", "Basic128Rsa15_SignAndEncrypt", "Basic256_Sign", "Basic256_SignAndEncrypt"]
118
        #self.security_endpoints = ["Basic256_Sign"," Basic256_SignAndEncrypt"]
119 1
        self.nosecurity = nosecurity
120 1
        self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
121
        #self.policyIDs = ["Basic256"]
122
123
124 1
    def __enter__(self):
125 1
        self.start()
126 1
        return self
127
128 1
    def __exit__(self, exc_type, exc_value, traceback):
129 1
        self.stop()
130
131 1
    def load_certificate(self, path):
132
        """
133
        load server certificate from file, either pem or der
134
        """
135 1
        self.certificate = uacrypto.load_certificate(path)
136
137 1
    def load_private_key(self, path):
138 1
        self.private_key = uacrypto.load_private_key(path)
139
140 1
    def disable_clock(self, val=True):
141
        """
142
        for debugging you may want to disable clock that write every second
143
        to address space
144
        """
145
        self.iserver.disabled_clock = val
146
147 1
    def set_application_uri(self, uri):
148
        """
149
        Set application/server URI.
150
        This uri is supposed to be unique. If you intent to register
151
        your server to a discovery server, it really should be unique in
152
        your system!
153
        default is : "urn:freeopcua:python:server"
154
        """
155 1
        self._application_uri = uri
156 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
157 1
        uries = ns_node.get_value()
158 1
        if len(uries) > 1:
159 1
            uries[1] = uri  # application uri is always namespace 1
160
        else:
161 1
            uries.append(uri)
162 1
        ns_node.set_value(uries)
163
164 1
    def find_servers(self, uris=None):
165
        """
166
        find_servers. mainly implemented for symmetry with client
167
        """
168 1
        if uris is None:
169 1
            uris = []
170 1
        params = ua.FindServersParameters()
171 1
        params.EndpointUrl = self.endpoint.geturl()
172 1
        params.ServerUris = uris
173 1
        return self.iserver.find_servers(params)
174
175 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
176
        """
177
        Register to an OPC-UA Discovery server. Registering must be renewed at
178
        least every 10 minutes, so this method will use our asyncio thread to
179
        re-register every period seconds
180
        if period is 0 registration is not automatically renewed
181
        """
182
        # FIXME: have a period per discovery
183 1
        if url in self._discovery_clients:
184 1
            self._discovery_clients[url].disconnect()
185 1
        self._discovery_clients[url] = Client(url)
186 1
        self._discovery_clients[url].connect()
187 1
        self._discovery_clients[url].register_server(self)
188 1
        self._discovery_period = period
189 1
        if period:
190
            self.iserver.loop.call_soon(self._renew_registration)
191
192 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
193
        """
194
        stop registration thread
195
        """
196
        # FIXME: is there really no way to deregister?
197
        self._discovery_clients[url].disconnect()
198
199 1
    def _renew_registration(self):
200
        for client in self._discovery_clients.values():
201
            client.register_server(self)
202
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
203
204 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
205
        """
206
        Create a client to discovery server and return it
207
        """
208
        client = Client(url)
209
        client.connect()
210
        return client
211
212 1
    def allow_remote_admin(self, allow):
213
        """
214
        Enable or disable the builtin Admin user from network clients
215
        """
216
        self.iserver.allow_remote_admin = allow
217
218 1
    def set_endpoint(self, url):
219 1
        self.endpoint = urlparse(url)
220
221 1
    def get_endpoints(self):
222 1
        return self.iserver.get_endpoints()
223
224 1
    def _setup_server_nodes(self):
225
        # to be called just before starting server since it needs all parameters to be setup
226
        # comment out kyp
227 1
        if self.nosecurity:
228
            #pdb.set_trace()
229 1
            self._set_endpoints()
230 1
            self._policies = [ua.SecurityPolicyFactory()]
231
232 1
        if self.certificate and self.private_key:
233 1
            if "Basic128Rsa15_Sign" in self.security_endpoints:
234 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
235
                                    ua.MessageSecurityMode.SignAndEncrypt)
236 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
237
                                                           ua.MessageSecurityMode.SignAndEncrypt,
238
                                                           self.certificate,
239
                                                           self.private_key)
240
                                 )
241 1
            if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
242 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
243
                                    ua.MessageSecurityMode.Sign)
244 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
245
                                                           ua.MessageSecurityMode.Sign,
246
                                                           self.certificate,
247
                                                           self.private_key)
248
                                 )
249 1
            if "Basic256_Sign" in self.security_endpoints:
250 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
251
                                    ua.MessageSecurityMode.SignAndEncrypt)
252 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
253
                                                           ua.MessageSecurityMode.SignAndEncrypt,
254
                                                           self.certificate,
255
                                                           self.private_key)
256
                                 )
257 1
            if "Basic256_SignAndEncrypt" in self.security_endpoints:
258 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
259
                                    ua.MessageSecurityMode.Sign)
260 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
261
                                                           ua.MessageSecurityMode.Sign,
262
                                                           self.certificate,
263
                                                           self.private_key)
264
                                 )
265 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
266 1
        idtokens = []
267 1
        if "Anonymous" in self.policyIDs:
268 1
            idtoken1 = ua.UserTokenPolicy()
269 1
            idtoken1.PolicyId = 'anonymous'
270 1
            idtoken1.TokenType = ua.UserTokenType.Anonymous
271 1
            idtokens.append(idtoken1)
272
273 1
        if "Basic256" in self.policyIDs:
274 1
            idtoken2 = ua.UserTokenPolicy()
275 1
            idtoken2.PolicyId = 'certificate_basic256'
276 1
            idtoken2.TokenType = ua.UserTokenType.Certificate
277 1
            idtokens.append(idtoken2)
278
279 1
        if "Basic128" in self.policyIDs:
280 1
            idtoken3 = ua.UserTokenPolicy()
281 1
            idtoken3.PolicyId = 'certificate_basic128'
282 1
            idtoken3.TokenType = ua.UserTokenType.Certificate
283 1
            idtokens.append(idtoken3)
284
285 1
        if "Username" in self.policyIDs:
286 1
            idtoken4 = ua.UserTokenPolicy()
287 1
            idtoken4.PolicyId = 'username'
288 1
            idtoken4.TokenType = ua.UserTokenType.UserName
289 1
            idtokens.append(idtoken4)
290
291 1
        appdesc = ua.ApplicationDescription()
292 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
293 1
        appdesc.ApplicationUri = self._application_uri
294 1
        appdesc.ApplicationType = self.application_type
295 1
        appdesc.ProductUri = self.product_uri
296 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
297
298 1
        edp = ua.EndpointDescription()
299 1
        edp.EndpointUrl = self.endpoint.geturl()
300 1
        edp.Server = appdesc
301 1
        if self.certificate:
302 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
303 1
        edp.SecurityMode = mode
304 1
        edp.SecurityPolicyUri = policy.URI
305 1
        edp.UserIdentityTokens = idtokens
306 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
307 1
        edp.SecurityLevel = 0
308 1
        self.iserver.add_endpoint(edp)
309
310 1
    def set_server_name(self, name):
311
        self.name = name
312
313 1
    def start(self):
314
        """
315
        Start to listen on network
316
        """
317 1
        self._setup_server_nodes()
318 1
        self.iserver.start()
319 1
        try:
320 1
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
321 1
            self.bserver.set_policies(self._policies)
322 1
            self.bserver.start()
323 1
        except Exception as exp:
324 1
            self.iserver.stop()
325 1
            raise exp
326
327
328 1
    def stop(self):
329
        """
330
        Stop server
331
        """
332 1
        for client in self._discovery_clients.values():
333 1
            client.disconnect()
334 1
        self.bserver.stop()
335 1
        self.iserver.stop()
336
337 1
    def get_root_node(self):
338
        """
339
        Get Root node of server. Returns a Node object.
340
        """
341 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
342
343 1
    def get_objects_node(self):
344
        """
345
        Get Objects node of server. Returns a Node object.
346
        """
347 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
348
349 1
    def get_server_node(self):
350
        """
351
        Get Server node of server. Returns a Node object.
352
        """
353 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
354
355 1
    def get_node(self, nodeid):
356
        """
357
        Get a specific node using NodeId object or a string representing a NodeId
358
        """
359 1
        return Node(self.iserver.isession, nodeid)
360
361 1
    def create_subscription(self, period, handler):
362
        """
363
        Create a subscription.
364
        returns a Subscription object which allow
365
        to subscribe to events or data on server
366
        period is in milliseconds
367
        handler is a python object with following methods:
368
            def datachange_notification(self, node, val, data):
369
            def event_notification(self, event):
370
            def status_change_notification(self, status):
371
        """
372 1
        params = ua.CreateSubscriptionParameters()
373 1
        params.RequestedPublishingInterval = period
374 1
        params.RequestedLifetimeCount = 3000
375 1
        params.RequestedMaxKeepAliveCount = 10000
376 1
        params.MaxNotificationsPerPublish = 0
377 1
        params.PublishingEnabled = True
378 1
        params.Priority = 0
379 1
        return Subscription(self.iserver.isession, params, handler)
380
381 1
    def get_namespace_array(self):
382
        """
383
        get all namespace defined in server
384
        """
385 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
386 1
        return ns_node.get_value()
387
388 1
    def register_namespace(self, uri):
389
        """
390
        Register a new namespace. Nodes should in custom namespace, not 0.
391
        """
392 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
393 1
        uries = ns_node.get_value()
394 1
        if uri in uries:
395 1
            return uries.index(uri)
396 1
        uries.append(uri)
397 1
        ns_node.set_value(uries)
398 1
        return len(uries) - 1
399
400 1
    def get_namespace_index(self, uri):
401
        """
402
        get index of a namespace using its uri
403
        """
404 1
        uries = self.get_namespace_array()
405 1
        return uries.index(uri)
406
407 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
408
        """
409
        Returns an event object using an event type from address space.
410
        Use this object to fire events
411
        """
412 1
        if not etype:
413 1
            etype = BaseEvent()
414 1
        return EventGenerator(self.iserver.isession, etype, source)
415
416 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
417 1
        if properties is None:
418
            properties = []
419 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
420
421 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
422 1
        if properties is None:
423
            properties = []
424 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
425
426 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
427 1
        if properties is None:
428
            properties = []
429 1
        if variables is None:
430
            variables = []
431 1
        if methods is None:
432
            methods = []
433 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
434
435
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
436
        # return self._create_custom_type(idx, name, basetype, properties)
437
438 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
439 1
        if properties is None:
440
            properties = []
441 1
        if variables is None:
442 1
            variables = []
443 1
        if methods is None:
444 1
            methods = []
445 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
446
447 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
448 1
        if isinstance(basetype, Node):
449 1
            base_t = basetype
450 1
        elif isinstance(basetype, ua.NodeId):
451 1
            base_t = Node(self.iserver.isession, basetype)
452
        else:
453 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
454
455 1
        custom_t = base_t.add_object_type(idx, name)
456 1
        for prop in properties:
457 1
            datatype = None
458 1
            if len(prop) > 2:
459
                datatype = prop[2]
460 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
461 1
        for variable in variables:
462 1
            datatype = None
463 1
            if len(variable) > 2:
464 1
                datatype = variable[2]
465 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
466 1
        for method in methods:
467 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
468
469 1
        return custom_t
470
471 1
    def import_xml(self, path=None, xmlstring=None):
472
        """
473
        Import nodes defined in xml
474
        """
475 1
        importer = XmlImporter(self)
476 1
        return importer.import_xml(path, xmlstring)
477
478 1
    def export_xml(self, nodes, path):
479
        """
480
        Export defined nodes to xml
481
        """
482 1
        exp = XmlExporter(self)
483 1
        exp.build_etree(nodes)
484 1
        return exp.write_xml(path)
485
486 1
    def export_xml_by_ns(self, path, namespaces=None):
487
        """
488
        Export nodes of one or more namespaces to an XML file.
489
        Namespaces used by nodes are always exported for consistency.
490
        Args:
491
            server: opc ua server to use
492
            path: name of the xml file to write
493
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
494
495
        Returns:
496
        """
497
        if namespaces is None:
498
            namespaces = []
499
        nodes = get_nodes_of_namespace(self, namespaces)
500
        self.export_xml(nodes, path)
501
502 1
    def delete_nodes(self, nodes, recursive=False):
503 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
504
505 1
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
506
        """
507
        Start historizing supplied nodes; see history module
508
        Args:
509
            node: node or list of nodes that can be historized (variables/properties)
510
            period: time delta to store the history; older data will be deleted from the storage
511
            count: number of changes to store in the history
512
513
        Returns:
514
        """
515 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
516 1
        for node in nodes:
517 1
            self.iserver.enable_history_data_change(node, period, count)
518
519 1
    def dehistorize_node_data_change(self, node):
520
        """
521
        Stop historizing supplied nodes; see history module
522
        Args:
523
            node: node or list of nodes that can be historized (UA variables/properties)
524
525
        Returns:
526
        """
527
        nodes = node if isinstance(node, (list, tuple)) else [node]
528
        for node in nodes:
529
            self.iserver.disable_history_data_change(node)
530
531 1
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
532
        """
533
        Start historizing events from node (typically a UA object); see history module
534
        Args:
535
            node: node or list of nodes that can be historized (UA objects)
536
            period: time delta to store the history; older data will be deleted from the storage
537
            count: number of events to store in the history
538
539
        Returns:
540
        """
541 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
542 1
        for node in nodes:
543 1
            self.iserver.enable_history_event(node, period, count)
544
545 1
    def dehistorize_node_event(self, node):
546
        """
547
        Stop historizing events from node (typically a UA object); see history module
548
        Args:
549
           node: node or list of nodes that can be historized (UA objects)
550
551
        Returns:
552
        """
553
        nodes = node if isinstance(node, (list, tuple)) else [node]
554
        for node in nodes:
555
            self.iserver.disable_history_event(node)
556
557 1
    def subscribe_server_callback(self, event, handle):
558
        self.iserver.subscribe_server_callback(event, handle)
559
560 1
    def unsubscribe_server_callback(self, event, handle):
561
        self.iserver.unsubscribe_server_callback(event, handle)
562
563 1
    def link_method(self, node, callback):
564
        """
565
        Link a python function to a UA method in the address space; required when a UA method has been imported
566
        to the address space via XML; the python executable must be linked manually
567
        Args:
568
            node: UA method node
569
            callback: python function that the UA method will call
570
571
        Returns:
572
        """
573
        self.iserver.isession.add_method_callback(node.nodeid, callback)
574
575 1
    def load_type_definitions(self, nodes=None):
576
        return load_type_definitions(self, nodes)
577