Passed
Push — master ( 30b2c1...d285f3 )
by Olivier
02:02
created

asyncua.server.server.Server.set_endpoint()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine
10
11
from asyncua import ua
12
# from asyncua.binary_server import BinaryServer
13
from .binary_server_asyncio import BinaryServer
14
from .internal_server import InternalServer
15
from .event_generator import EventGenerator
16
from ..client import Client
17
from ..common.node import Node
18
from ..common.subscription import Subscription
19
from ..common.xmlimporter import XmlImporter
20
from ..common.xmlexporter import XmlExporter
21
from ..common.manage_nodes import delete_nodes
22
from ..common.event_objects import BaseEvent
23
from ..common.shortcuts import Shortcuts
24
from ..common.structures import load_type_definitions, load_enums
25
from ..common.ua_utils import get_nodes_of_namespace
26
27
from ..crypto import security_policies, uacrypto
28
29
_logger = logging.getLogger(__name__)
30
31
32
class Server:
33
    """
34
    High level Server class
35
36
    This class creates an asyncua server with default values
37
38
    Create your own namespace and then populate your server address space
39
    using use the get_root() or get_objects() to get Node objects.
40
    and get_event_object() to fire events.
41
    Then start server. See example_server.py
42
    All methods are threadsafe
43
44
    If you need more flexibility you call directly the Ua Service methods
45
    on the iserver  or iserver.isession object members.
46
47
    During startup the standard address space will be constructed, which may be
48
    time-consuming when running a server on a less powerful device (e.g. a
49
    Raspberry Pi). In order to improve startup performance, a optional path to a
50
    cache file can be passed to the server constructor.
51
    If the parameter is defined, the address space will be loaded from the
52
    cache file or the file will be created if it does not exist yet.
53
    As a result the first startup will be even slower due to the cache file
54
    generation but all further start ups will be significantly faster.
55
56
    :ivar product_uri:
57
    :vartype product_uri: uri
58
    :ivar name:
59
    :vartype name: string
60
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
61
    :vartype default_timeout: int
62
    :ivar iserver: internal server object
63
    :vartype default_timeout: InternalServer
64
    :ivar bserver: binary protocol server
65
    :vartype bserver: BinaryServer
66
    :ivar nodes: shortcuts to common nodes
67
    :vartype nodes: Shortcuts
68
    """
69
70
    def __init__(self, iserver: InternalServer = None, loop=None):
71
        self.loop = loop or asyncio.get_event_loop()
72
        self.logger = logging.getLogger(__name__)
73
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
74
        self._application_uri = "urn:freeopcua:python:server"
75
        self.product_uri = "urn:freeopcua.github.io:python:server"
76
        self.name = "FreeOpcUa Python Server"
77
        self.manufacturer_name = "FreeOpcUa"
78
        self.application_type = ua.ApplicationType.ClientAndServer
79
        self.default_timeout = 60 * 60 * 1000
80
        self.iserver = iserver if iserver else InternalServer(self.loop)
81
        self.bserver: BinaryServer = None
82
        self._discovery_clients = {}
83
        self._discovery_period = 60
84
        self._policies = []
85
        self.nodes = Shortcuts(self.iserver.isession)
86
        # enable all endpoints by default
87
        self._security_policy = [
88
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
89
            ua.SecurityPolicyType.Basic256Sha256_Sign
90
        ]
91
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
92
        self.certificate = None
93
94
    async def init(self, shelf_file=None):
95
        await self.iserver.init(shelf_file)
96
        # setup some expected values
97
        await self.set_application_uri(self._application_uri)
98
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
99
        await sa_node.set_value([self._application_uri])
100
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
101
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
102
        status = ua.ServerStatusDataType()
103
        status.BuildInfo.ProductUri = self.product_uri
104
        status.BuildInfo.ManufacturerName = self.manufacturer_name
105
        status.BuildInfo.ProductName = self.name
106
        status.BuildInfo.SoftwareVersion = "1.0pre"
107
        status.BuildInfo.BuildNumber = "0"
108
        status.BuildInfo.BuildDate = datetime.now()
109
        status.SecondsTillShutdown = 0
110
        await status_node.set_value(status)
111
        await build_node.set_value(status.BuildInfo)
112
113
    async def __aenter__(self):
114
        await self.start()
115
116
    async def __aexit__(self, exc_type, exc_value, traceback):
117
        await self.stop()
118
119
    async def load_certificate(self, path: str):
120
        """
121
        load server certificate from file, either pem or der
122
        """
123
        self.certificate = await uacrypto.load_certificate(path)
124
125
    async def load_private_key(self, path):
126
        self.iserver.private_key = await uacrypto.load_private_key(path)
127
128
    def disable_clock(self, val: bool = True):
129
        """
130
        for debugging you may want to disable clock that write every second
131
        to address space
132
        """
133
        self.iserver.disabled_clock = val
134
135
    def get_application_uri(self):
136
        return self._application_uri
137
138
    async def set_application_uri(self, uri: str):
139
        """
140
        Set application/server URI.
141
        This uri is supposed to be unique. If you intent to register
142
        your server to a discovery server, it really should be unique in
143
        your system!
144
        default is : "urn:freeopcua:python:server"
145
        """
146
        self._application_uri = uri
147
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
148
        uries = await ns_node.get_value()
149
        if len(uries) > 1:
150
            uries[1] = uri  # application uri is always namespace 1
151
        else:
152
            uries.append(uri)
153
        await ns_node.set_value(uries)
154
155
    async def find_servers(self, uris=None):
156
        """
157
        find_servers. mainly implemented for symmetry with client
158
        """
159
        if uris is None:
160
            uris = []
161
        params = ua.FindServersParameters()
162
        params.EndpointUrl = self.endpoint.geturl()
163
        params.ServerUris = uris
164
        return self.iserver.find_servers(params)
165
166
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
167
        """
168
        Register to an OPC-UA Discovery server. Registering must be renewed at
169
        least every 10 minutes, so this method will use our asyncio thread to
170
        re-register every period seconds
171
        if period is 0 registration is not automatically renewed
172
        """
173
        # FIXME: have a period per discovery
174
        if url in self._discovery_clients:
175
            await self._discovery_clients[url].disconnect()
176
        self._discovery_clients[url] = Client(url)
177
        await self._discovery_clients[url].connect()
178
        await self._discovery_clients[url].register_server(self)
179
        self._discovery_period = period
180
        if period:
181
            self.loop.call_soon(self._schedule_renew_registration)
182
183
    def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
184
        """
185
        stop registration thread
186
        """
187
        # FIXME: is there really no way to deregister?
188
        self._discovery_clients[url].disconnect()
189
190
    def _schedule_renew_registration(self):
191
        self.loop.create_task(self._renew_registration())
192
        self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
193
194
    async def _renew_registration(self):
195
        for client in self._discovery_clients.values():
196
            await client.register_server(self)
197
198
    def allow_remote_admin(self, allow):
199
        """
200
        Enable or disable the builtin Admin user from network clients
201
        """
202
        self.iserver.allow_remote_admin = allow
203
204
    def set_endpoint(self, url):
205
        self.endpoint = urlparse(url)
206
207
    def get_endpoints(self) -> Coroutine:
208
        return self.iserver.get_endpoints()
209
210
    def set_security_policy(self, security_policy):
211
        """
212
        Method setting up the security policies for connections
213
        to the server, where security_policy is a list of integers.
214
        During server initialization, all endpoints are enabled:
215
216
                security_policy = [
217
                            ua.SecurityPolicyType.NoSecurity,
218
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
219
                            ua.SecurityPolicyType.Basic256Sha256_Sign
220
                                ]
221
222
        E.g. to limit the number of endpoints and disable no encryption:
223
224
                set_security_policy([
225
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
226
227
        """
228
        self._security_policy = security_policy
229
230
    def set_security_IDs(self, policy_ids):
231
        """
232
            Method setting up the security endpoints for identification
233
            of clients. During server object initialization, all possible
234
            endpoints are enabled:
235
236
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
237
238
            E.g. to limit the number of IDs and disable anonymous clients:
239
240
                set_security_policy(["Basic256Sha256"])
241
242
        (Implementation for ID check is currently not finalized...)
243
        """
244
        self._policyIDs = policy_ids
245
246
    async def _setup_server_nodes(self):
247
        # to be called just before starting server since it needs all parameters to be setup
248
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
249
            self._set_endpoints()
250
            self._policies = [ua.SecurityPolicyFactory()]
251
252
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
253
            if not (self.certificate and self.iserver.private_key):
254
                self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
255
                return
256
257
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
258
                self.logger.warning(
259
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
260
261
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
262
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
263
                                    ua.MessageSecurityMode.SignAndEncrypt)
264
                self._policies.append(
265
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
266
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
267
                                             self.iserver.private_key))
268
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
269
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
270
                self._policies.append(
271
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
272
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
273
274
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
275
        idtokens = []
276
        if "Anonymous" in self._policyIDs:
277
            idtoken = ua.UserTokenPolicy()
278
            idtoken.PolicyId = "anonymous"
279
            idtoken.TokenType = ua.UserTokenType.Anonymous
280
            idtokens.append(idtoken)
281
282
        if "Basic256Sha256" in self._policyIDs:
283
            idtoken = ua.UserTokenPolicy()
284
            idtoken.PolicyId = 'certificate_basic256sha256'
285
            idtoken.TokenType = ua.UserTokenType.Certificate
286
            idtokens.append(idtoken)
287
288
        if "Username" in self._policyIDs:
289
            idtoken = ua.UserTokenPolicy()
290
            idtoken.PolicyId = "username"
291
            idtoken.TokenType = ua.UserTokenType.UserName
292
            idtokens.append(idtoken)
293
294
        appdesc = ua.ApplicationDescription()
295
        appdesc.ApplicationName = ua.LocalizedText(self.name)
296
        appdesc.ApplicationUri = self._application_uri
297
        appdesc.ApplicationType = self.application_type
298
        appdesc.ProductUri = self.product_uri
299
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
300
301
        edp = ua.EndpointDescription()
302
        edp.EndpointUrl = self.endpoint.geturl()
303
        edp.Server = appdesc
304
        if self.certificate:
305
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
306
        edp.SecurityMode = mode
307
        edp.SecurityPolicyUri = policy.URI
308
        edp.UserIdentityTokens = idtokens
309
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
310
        edp.SecurityLevel = 0
311
        self.iserver.add_endpoint(edp)
312
313
    def set_server_name(self, name):
314
        self.name = name
315
316
    async def start(self):
317
        """
318
        Start to listen on network
319
        """
320
        await self._setup_server_nodes()
321
        await self.iserver.start()
322
        try:
323
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
324
            self.bserver.set_policies(self._policies)
325
            await self.bserver.start()
326
        except Exception as exp:
327
            await self.iserver.stop()
328
            raise exp
329
330
    async def stop(self):
331
        """
332
        Stop server
333
        """
334
        if self._discovery_clients:
335
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
336
        await self.bserver.stop()
337
        await self.iserver.stop()
338
339
    def get_root_node(self):
340
        """
341
        Get Root node of server. Returns a Node object.
342
        """
343
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
344
345
    def get_objects_node(self):
346
        """
347
        Get Objects node of server. Returns a Node object.
348
        """
349
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
350
351
    def get_server_node(self):
352
        """
353
        Get Server node of server. Returns a Node object.
354
        """
355
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
356
357
    def get_node(self, nodeid):
358
        """
359
        Get a specific node using NodeId object or a string representing a NodeId
360
        """
361
        return Node(self.iserver.isession, nodeid)
362
363
    async def create_subscription(self, period, handler):
364
        """
365
        Create a subscription.
366
        returns a Subscription object which allow
367
        to subscribe to events or data on server
368
        period is in milliseconds
369
        handler is a python object with following methods:
370
            def datachange_notification(self, node, val, data):
371
            def event_notification(self, event):
372
            def status_change_notification(self, status):
373
        """
374
        params = ua.CreateSubscriptionParameters()
375
        params.RequestedPublishingInterval = period
376
        params.RequestedLifetimeCount = 3000
377
        params.RequestedMaxKeepAliveCount = 10000
378
        params.MaxNotificationsPerPublish = 0
379
        params.PublishingEnabled = True
380
        params.Priority = 0
381
        subscription = Subscription(self.iserver.isession, params, handler)
382
        await subscription.init()
383
        return subscription
384
385
    def get_namespace_array(self) -> Coroutine:
386
        """
387
        get all namespace defined in server
388
        """
389
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
390
        return ns_node.get_value()
391
392 View Code Duplication
    async def register_namespace(self, uri) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
393
        """
394
        Register a new namespace. Nodes should in custom namespace, not 0.
395
        """
396
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
397
        uries = await ns_node.get_value()
398
        if uri in uries:
399
            return uries.index(uri)
400
        uries.append(uri)
401
        await ns_node.set_value(uries)
402
        return len(uries) - 1
403
404
    async def get_namespace_index(self, uri):
405
        """
406
        get index of a namespace using its uri
407
        """
408
        uries = await self.get_namespace_array()
409
        return uries.index(uri)
410
411
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
412
        """
413
        Returns an event object using an event type from address space.
414
        Use this object to fire events
415
        """
416
        if not etype:
417
            etype = BaseEvent()
418
        ev_gen = EventGenerator(self.iserver.isession)
419
        await ev_gen.init(etype, emitting_node=emitting_node)
420
        return ev_gen
421
422
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None) -> Coroutine:
423
        if properties is None:
424
            properties = []
425
        return self._create_custom_type(idx, name, basetype, properties, [], [])
426
427
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
428
        if properties is None:
429
            properties = []
430
        return self._create_custom_type(idx, name, basetype, properties, [], [])
431
432
    def create_custom_object_type(self,
433
                                  idx,
434
                                  name,
435
                                  basetype=ua.ObjectIds.BaseObjectType,
436
                                  properties=None,
437
                                  variables=None,
438
                                  methods=None) -> Coroutine:
439
        if properties is None:
440
            properties = []
441
        if variables is None:
442
            variables = []
443
        if methods is None:
444
            methods = []
445
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
446
447
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
448
    # return self._create_custom_type(idx, name, basetype, properties)
449
450
    def create_custom_variable_type(self,
451
                                    idx,
452
                                    name,
453
                                    basetype=ua.ObjectIds.BaseVariableType,
454
                                    properties=None,
455
                                    variables=None,
456
                                    methods=None) -> Coroutine:
457
        if properties is None:
458
            properties = []
459
        if variables is None:
460
            variables = []
461
        if methods is None:
462
            methods = []
463
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
464
465
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
466
        if isinstance(basetype, Node):
467
            base_t = basetype
468
        elif isinstance(basetype, ua.NodeId):
469
            base_t = Node(self.iserver.isession, basetype)
470
        else:
471
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
472
        custom_t = await base_t.add_object_type(idx, name)
473
        for prop in properties:
474
            datatype = None
475
            if len(prop) > 2:
476
                datatype = prop[2]
477
            await custom_t.add_property(
478
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
479
        for variable in variables:
480
            datatype = None
481
            if len(variable) > 2:
482
                datatype = variable[2]
483
            await custom_t.add_variable(
484
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
485
        for method in methods:
486
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
487
        return custom_t
488
489
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
490
        """
491
        Import nodes defined in xml
492
        """
493
        importer = XmlImporter(self)
494
        return importer.import_xml(path, xmlstring)
495
496
    async def export_xml(self, nodes, path):
497
        """
498
        Export defined nodes to xml
499
        """
500
        exp = XmlExporter(self)
501
        await exp.build_etree(nodes)
502
        await exp.write_xml(path)
503
504
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
505
        """
506
        Export nodes of one or more namespaces to an XML file.
507
        Namespaces used by nodes are always exported for consistency.
508
        :param path: name of the xml file to write
509
        :param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
510
        """
511
        if namespaces is None:
512
            namespaces = []
513
        nodes = await get_nodes_of_namespace(self, namespaces)
514
        await self.export_xml(nodes, path)
515
516
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
517
        return delete_nodes(self.iserver.isession, nodes, recursive)
518
519
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
520
        """
521
        Start historizing supplied nodes; see history module
522
        :param node: node or list of nodes that can be historized (variables/properties)
523
        :param period: time delta to store the history; older data will be deleted from the storage
524
        :param count: number of changes to store in the history
525
        """
526
        nodes = node if isinstance(node, (list, tuple)) else [node]
527
        for n in nodes:
528
            await self.iserver.enable_history_data_change(n, period, count)
529
530
    async def dehistorize_node_data_change(self, node):
531
        """
532
        Stop historizing supplied nodes; see history module
533
        :param node: node or list of nodes that can be historized (UA variables/properties)
534
        """
535
        nodes = node if isinstance(node, (list, tuple)) else [node]
536
        for n in nodes:
537
            await self.iserver.disable_history_data_change(n)
538
539
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
540
        """
541
        Start historizing events from node (typically a UA object); see history module
542
        :param node: node or list of nodes that can be historized (UA objects)
543
        :param period: time delta to store the history; older data will be deleted from the storage
544
        :param count: number of events to store in the history
545
        """
546
        nodes = node if isinstance(node, (list, tuple)) else [node]
547
        for n in nodes:
548
            await self.iserver.enable_history_event(n, period, count)
549
550
    async def dehistorize_node_event(self, node):
551
        """
552
        Stop historizing events from node (typically a UA object); see history module
553
        :param node: node or list of nodes that can be historized (UA objects)
554
        """
555
        nodes = node if isinstance(node, (list, tuple)) else [node]
556
        for n in nodes:
557
            await self.iserver.disable_history_event(n)
558
559
    def subscribe_server_callback(self, event, handle):
560
        self.iserver.subscribe_server_callback(event, handle)
561
562
    def unsubscribe_server_callback(self, event, handle):
563
        self.iserver.unsubscribe_server_callback(event, handle)
564
565
    def link_method(self, node, callback):
566
        """
567
        Link a python function to a UA method in the address space; required when a UA method has been imported
568
        to the address space via XML; the python executable must be linked manually
569
        :param node: UA method node
570
        :param callback: python function that the UA method will call
571
        """
572
        self.iserver.isession.add_method_callback(node.nodeid, callback)
573
574
    def load_type_definitions(self, nodes=None) -> Coroutine:
575
        """
576
        load custom structures from our server.
577
        Server side this can be used to create python objects from custom structures
578
        imported through xml into server
579
        """
580
        return load_type_definitions(self, nodes)
581
582
    def load_enums(self) -> Coroutine:
583
        """
584
        load UA structures and generate python Enums in ua module for custom enums in server
585
        """
586
        return load_enums(self)
587
588
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
589
        """
590
        directly write datavalue to the Attribute, bypasing some checks and structure creation
591
        so it is a little faster
592
        """
593
        return self.iserver.set_attribute_value(nodeid, datavalue, attr)
594