Passed
Push — master ( e09c16...3cc1cc )
by Olivier
02:23
created

asyncua.server.server.Server.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine, Optional
10
11
from asyncua import ua
12
from .binary_server_asyncio import BinaryServer
13
from .internal_server import InternalServer
14
from .event_generator import EventGenerator
15
from ..client import Client
16
from ..common.node import Node
17
from ..common.subscription import Subscription
18
from ..common.xmlimporter import XmlImporter
19
from ..common.xmlexporter import XmlExporter
20
from ..common.manage_nodes import delete_nodes
21
from ..common.event_objects import BaseEvent
22
from ..common.shortcuts import Shortcuts
23
from ..common.structures import load_type_definitions, load_enums
24
from ..common.ua_utils import get_nodes_of_namespace
25
26
from ..crypto import security_policies, uacrypto
27
28
_logger = logging.getLogger(__name__)
29
30
31
def _get_node(isession, whatever):
32
    if isinstance(whatever, Node):
33
        return whatever
34
    if isinstance(whatever, ua.NodeId):
35
        return Node(isession, whatever)
36
    return Node(isession, ua.NodeId(whatever))
37
38
39
class Server:
40
    """
41
    High level Server class
42
43
    This class creates an asyncua server with default values
44
45
    Create your own namespace and then populate your server address space
46
    using use the get_root() or get_objects() to get Node objects.
47
    and get_event_object() to fire events.
48
    Then start server. See example_server.py
49
    All methods are threadsafe
50
51
    If you need more flexibility you call directly the Ua Service methods
52
    on the iserver  or iserver.isession object members.
53
54
    During startup the standard address space will be constructed, which may be
55
    time-consuming when running a server on a less powerful device (e.g. a
56
    Raspberry Pi). In order to improve startup performance, a optional path to a
57
    cache file can be passed to the server constructor.
58
    If the parameter is defined, the address space will be loaded from the
59
    cache file or the file will be created if it does not exist yet.
60
    As a result the first startup will be even slower due to the cache file
61
    generation but all further start ups will be significantly faster.
62
    ┌────────┐
63
    │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor
64
    │        │ ── InternalServer ── InternalSession
65
    └────────┘                   ── SubscriptionService
66
67
    :ivar product_uri:
68
    :ivar name:
69
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
70
    :ivar iserver: `InternalServer` instance
71
    :ivar bserver: binary protocol server `BinaryServer`
72
    :ivar nodes: shortcuts to common nodes - `Shortcuts` instance
73
    """
74
75
    def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
76
        self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
77
        self.logger = logging.getLogger(__name__)
78
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
79
        self._application_uri = "urn:freeopcua:python:server"
80
        self.product_uri = "urn:freeopcua.github.io:python:server"
81
        self.name: str = "FreeOpcUa Python Server"
82
        self.manufacturer_name = "FreeOpcUa"
83
        self.application_type = ua.ApplicationType.ClientAndServer
84
        self.default_timeout: int = 60 * 60 * 1000
85
        self.iserver = iserver if iserver else InternalServer(self.loop)
86
        self.bserver: Optional[BinaryServer] = None
87
        self._discovery_clients = {}
88
        self._discovery_period = 60
89
        self._policies = []
90
        self.nodes = Shortcuts(self.iserver.isession)
91
        # enable all endpoints by default
92
        self._security_policy = [
93
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
94
            ua.SecurityPolicyType.Basic256Sha256_Sign
95
        ]
96
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
97
        self.certificate = None
98
99
    async def init(self, shelf_file=None):
100
        await self.iserver.init(shelf_file)
101
        # setup some expected values
102
        await self.set_application_uri(self._application_uri)
103
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
104
        await sa_node.set_value([self._application_uri])
105
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
106
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
107
        status = ua.ServerStatusDataType()
108
        status.BuildInfo.ProductUri = self.product_uri
109
        status.BuildInfo.ManufacturerName = self.manufacturer_name
110
        status.BuildInfo.ProductName = self.name
111
        status.BuildInfo.SoftwareVersion = "1.0pre"
112
        status.BuildInfo.BuildNumber = "0"
113
        status.BuildInfo.BuildDate = datetime.now()
114
        status.SecondsTillShutdown = 0
115
        await status_node.set_value(status)
116
        await build_node.set_value(status.BuildInfo)
117
118
    async def __aenter__(self):
119
        await self.start()
120
121
    async def __aexit__(self, exc_type, exc_value, traceback):
122
        await self.stop()
123
124
    def __str__(self):
125
        return f"OPC UA Server({self.endpoint.geturl()})"
126
    __repr__ = __str__
127
128
    async def load_certificate(self, path: str):
129
        """
130
        load server certificate from file, either pem or der
131
        """
132
        self.certificate = await uacrypto.load_certificate(path)
133
134
    async def load_private_key(self, path):
135
        self.iserver.private_key = await uacrypto.load_private_key(path)
136
137
    def disable_clock(self, val: bool = True):
138
        """
139
        for debugging you may want to disable clock that write every second
140
        to address space
141
        """
142
        self.iserver.disabled_clock = val
143
144
    def get_application_uri(self):
145
        return self._application_uri
146
147
    async def set_application_uri(self, uri: str):
148
        """
149
        Set application/server URI.
150
        This uri is supposed to be unique. If you intent to register
151
        your server to a discovery server, it really should be unique in
152
        your system!
153
        default is : "urn:freeopcua:python:server"
154
        """
155
        self._application_uri = uri
156
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
157
        uries = await ns_node.get_value()
158
        if len(uries) > 1:
159
            uries[1] = uri  # application uri is always namespace 1
160
        else:
161
            uries.append(uri)
162
        await ns_node.set_value(uries)
163
164
    async def find_servers(self, uris=None):
165
        """
166
        find_servers. mainly implemented for symmetry with client
167
        """
168
        if uris is None:
169
            uris = []
170
        params = ua.FindServersParameters()
171
        params.EndpointUrl = self.endpoint.geturl()
172
        params.ServerUris = uris
173
        return self.iserver.find_servers(params)
174
175
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
176
        """
177
        Register to an OPC-UA Discovery server. Registering must be renewed at
178
        least every 10 minutes, so this method will use our asyncio thread to
179
        re-register every period seconds
180
        if period is 0 registration is not automatically renewed
181
        """
182
        # FIXME: have a period per discovery
183
        if url in self._discovery_clients:
184
            await self._discovery_clients[url].disconnect()
185
        self._discovery_clients[url] = Client(url)
186
        await self._discovery_clients[url].connect()
187
        await self._discovery_clients[url].register_server(self)
188
        self._discovery_period = period
189
        if period:
190
            self.loop.call_soon(self._schedule_renew_registration)
191
192
    def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
193
        """
194
        stop registration thread
195
        """
196
        # FIXME: is there really no way to deregister?
197
        self._discovery_clients[url].disconnect()
198
199
    def _schedule_renew_registration(self):
200
        self.loop.create_task(self._renew_registration())
201
        self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
202
203
    async def _renew_registration(self):
204
        for client in self._discovery_clients.values():
205
            await client.register_server(self)
206
207
    def allow_remote_admin(self, allow):
208
        """
209
        Enable or disable the builtin Admin user from network clients
210
        """
211
        self.iserver.allow_remote_admin = allow
212
213
    def set_endpoint(self, url):
214
        self.endpoint = urlparse(url)
215
216
    def get_endpoints(self) -> Coroutine:
217
        return self.iserver.get_endpoints()
218
219
    def set_security_policy(self, security_policy):
220
        """
221
        Method setting up the security policies for connections
222
        to the server, where security_policy is a list of integers.
223
        During server initialization, all endpoints are enabled:
224
225
                security_policy = [
226
                            ua.SecurityPolicyType.NoSecurity,
227
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
228
                            ua.SecurityPolicyType.Basic256Sha256_Sign
229
                                ]
230
231
        E.g. to limit the number of endpoints and disable no encryption:
232
233
                set_security_policy([
234
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
235
236
        """
237
        self._security_policy = security_policy
238
239
    def set_security_IDs(self, policy_ids):
240
        """
241
            Method setting up the security endpoints for identification
242
            of clients. During server object initialization, all possible
243
            endpoints are enabled:
244
245
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
246
247
            E.g. to limit the number of IDs and disable anonymous clients:
248
249
                set_security_policy(["Basic256Sha256"])
250
251
        (Implementation for ID check is currently not finalized...)
252
        """
253
        self._policyIDs = policy_ids
254
255
    async def _setup_server_nodes(self):
256
        # to be called just before starting server since it needs all parameters to be setup
257
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
258
            self._set_endpoints()
259
            self._policies = [ua.SecurityPolicyFactory()]
260
261
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
262
            if not (self.certificate and self.iserver.private_key):
263
                self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
264
                return
265
266
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
267
                self.logger.warning(
268
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
269
270
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
271
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
272
                                    ua.MessageSecurityMode.SignAndEncrypt)
273
                self._policies.append(
274
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
275
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
276
                                             self.iserver.private_key))
277
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
278
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
279
                self._policies.append(
280
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
281
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
282
283
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
284
        idtokens = []
285
        if "Anonymous" in self._policyIDs:
286
            idtoken = ua.UserTokenPolicy()
287
            idtoken.PolicyId = "anonymous"
288
            idtoken.TokenType = ua.UserTokenType.Anonymous
289
            idtokens.append(idtoken)
290
291
        if "Basic256Sha256" in self._policyIDs:
292
            idtoken = ua.UserTokenPolicy()
293
            idtoken.PolicyId = 'certificate_basic256sha256'
294
            idtoken.TokenType = ua.UserTokenType.Certificate
295
            idtokens.append(idtoken)
296
297
        if "Username" in self._policyIDs:
298
            idtoken = ua.UserTokenPolicy()
299
            idtoken.PolicyId = "username"
300
            idtoken.TokenType = ua.UserTokenType.UserName
301
            idtokens.append(idtoken)
302
303
        appdesc = ua.ApplicationDescription()
304
        appdesc.ApplicationName = ua.LocalizedText(self.name)
305
        appdesc.ApplicationUri = self._application_uri
306
        appdesc.ApplicationType = self.application_type
307
        appdesc.ProductUri = self.product_uri
308
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
309
310
        edp = ua.EndpointDescription()
311
        edp.EndpointUrl = self.endpoint.geturl()
312
        edp.Server = appdesc
313
        if self.certificate:
314
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
315
        edp.SecurityMode = mode
316
        edp.SecurityPolicyUri = policy.URI
317
        edp.UserIdentityTokens = idtokens
318
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
319
        edp.SecurityLevel = 0
320
        self.iserver.add_endpoint(edp)
321
322
    def set_server_name(self, name):
323
        self.name = name
324
325
    async def start(self):
326
        """
327
        Start to listen on network
328
        """
329
        await self._setup_server_nodes()
330
        await self.iserver.start()
331
        try:
332
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
333
            self.bserver.set_policies(self._policies)
334
            await self.bserver.start()
335
        except Exception as exp:
336
            await self.iserver.stop()
337
            raise exp
338
339
    async def stop(self):
340
        """
341
        Stop server
342
        """
343
        if self._discovery_clients:
344
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
345
        await self.bserver.stop()
346
        await self.iserver.stop()
347
348
    def get_root_node(self):
349
        """
350
        Get Root node of server. Returns a Node object.
351
        """
352
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
353
354
    def get_objects_node(self):
355
        """
356
        Get Objects node of server. Returns a Node object.
357
        """
358
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
359
360
    def get_server_node(self):
361
        """
362
        Get Server node of server. Returns a Node object.
363
        """
364
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
365
366
    def get_node(self, nodeid):
367
        """
368
        Get a specific node using NodeId object or a string representing a NodeId
369
        """
370
        return Node(self.iserver.isession, nodeid)
371
372
    async def create_subscription(self, period, handler):
373
        """
374
        Create a subscription.
375
        Returns a Subscription object which allow to subscribe to events or data changes on server
376
        :param period: Period in milliseconds
377
        :param handler: A class instance - see `SubHandler` base class for details
378
        """
379
        params = ua.CreateSubscriptionParameters()
380
        params.RequestedPublishingInterval = period
381
        params.RequestedLifetimeCount = 3000
382
        params.RequestedMaxKeepAliveCount = 10000
383
        params.MaxNotificationsPerPublish = 0
384
        params.PublishingEnabled = True
385
        params.Priority = 0
386
        subscription = Subscription(self.iserver.isession, params, handler)
387
        await subscription.init()
388
        return subscription
389
390
    def get_namespace_array(self) -> Coroutine:
391
        """
392
        get all namespace defined in server
393
        """
394
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
395
        return ns_node.get_value()
396
397 View Code Duplication
    async def register_namespace(self, uri) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
398
        """
399
        Register a new namespace. Nodes should in custom namespace, not 0.
400
        """
401
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
402
        uries = await ns_node.get_value()
403
        if uri in uries:
404
            return uries.index(uri)
405
        uries.append(uri)
406
        await ns_node.set_value(uries)
407
        return len(uries) - 1
408
409
    async def get_namespace_index(self, uri):
410
        """
411
        get index of a namespace using its uri
412
        """
413
        uries = await self.get_namespace_array()
414
        return uries.index(uri)
415
416
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
417
        """
418
        Returns an event object using an event type from address space.
419
        Use this object to fire events
420
        """
421
        if not etype:
422
            etype = BaseEvent()
423
        ev_gen = EventGenerator(self.iserver.isession)
424
        await ev_gen.init(etype, emitting_node=emitting_node)
425
        return ev_gen
426
427
    async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None, description=None) -> Coroutine:
428
        if properties is None:
429
            properties = []
430
        base_t = _get_node(self.iserver.isession, basetype)
431
432
        custom_t = await base_t.add_data_type(idx, name, description)
433
        for prop in properties:
434
            datatype = None
435
            if len(prop) > 2:
436
                datatype = prop[2]
437
            await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
438
        return custom_t
439
440
    async def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
441
        if properties is None:
442
            properties = []
443
        return await self._create_custom_type(idx, name, basetype, properties, [], [])
444
445
    async def create_custom_object_type(self,
446
                                  idx,
447
                                  name,
448
                                  basetype=ua.ObjectIds.BaseObjectType,
449
                                  properties=None,
450
                                  variables=None,
451
                                  methods=None) -> Coroutine:
452
        if properties is None:
453
            properties = []
454
        if variables is None:
455
            variables = []
456
        if methods is None:
457
            methods = []
458
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
459
460
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
461
    # return self._create_custom_type(idx, name, basetype, properties)
462
463
    async def create_custom_variable_type(self,
464
                                    idx,
465
                                    name,
466
                                    basetype=ua.ObjectIds.BaseVariableType,
467
                                    properties=None,
468
                                    variables=None,
469
                                    methods=None) -> Coroutine:
470
        if properties is None:
471
            properties = []
472
        if variables is None:
473
            variables = []
474
        if methods is None:
475
            methods = []
476
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
477
478
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
479
        base_t = _get_node(self.iserver.isession, basetype)
480
        custom_t = await base_t.add_object_type(idx, name)
481
        for prop in properties:
482
            datatype = None
483
            if len(prop) > 2:
484
                datatype = prop[2]
485
            await custom_t.add_property(
486
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
487
        for variable in variables:
488
            datatype = None
489
            if len(variable) > 2:
490
                datatype = variable[2]
491
            await custom_t.add_variable(
492
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
493
        for method in methods:
494
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
495
        return custom_t
496
497
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
498
        """
499
        Import nodes defined in xml
500
        """
501
        importer = XmlImporter(self)
502
        return importer.import_xml(path, xmlstring)
503
504
    async def export_xml(self, nodes, path):
505
        """
506
        Export defined nodes to xml
507
        """
508
        exp = XmlExporter(self)
509
        await exp.build_etree(nodes)
510
        await exp.write_xml(path)
511
512
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
513
        """
514
        Export nodes of one or more namespaces to an XML file.
515
        Namespaces used by nodes are always exported for consistency.
516
        :param path: name of the xml file to write
517
        :param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
518
        """
519
        if namespaces is None:
520
            namespaces = []
521
        nodes = await get_nodes_of_namespace(self, namespaces)
522
        await self.export_xml(nodes, path)
523
524
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
525
        return delete_nodes(self.iserver.isession, nodes, recursive)
526
527
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
528
        """
529
        Start historizing supplied nodes; see history module
530
        :param node: node or list of nodes that can be historized (variables/properties)
531
        :param period: time delta to store the history; older data will be deleted from the storage
532
        :param count: number of changes to store in the history
533
        """
534
        nodes = node if isinstance(node, (list, tuple)) else [node]
535
        for n in nodes:
536
            await self.iserver.enable_history_data_change(n, period, count)
537
538
    async def dehistorize_node_data_change(self, node):
539
        """
540
        Stop historizing supplied nodes; see history module
541
        :param node: node or list of nodes that can be historized (UA variables/properties)
542
        """
543
        nodes = node if isinstance(node, (list, tuple)) else [node]
544
        for n in nodes:
545
            await self.iserver.disable_history_data_change(n)
546
547
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
548
        """
549
        Start historizing events from node (typically a UA object); see history module
550
        :param node: node or list of nodes that can be historized (UA objects)
551
        :param period: time delta to store the history; older data will be deleted from the storage
552
        :param count: number of events to store in the history
553
        """
554
        nodes = node if isinstance(node, (list, tuple)) else [node]
555
        for n in nodes:
556
            await self.iserver.enable_history_event(n, period, count)
557
558
    async def dehistorize_node_event(self, node):
559
        """
560
        Stop historizing events from node (typically a UA object); see history module
561
        :param node: node or list of nodes that can be historized (UA objects)
562
        """
563
        nodes = node if isinstance(node, (list, tuple)) else [node]
564
        for n in nodes:
565
            await self.iserver.disable_history_event(n)
566
567
    def subscribe_server_callback(self, event, handle):
568
        self.iserver.subscribe_server_callback(event, handle)
569
570
    def unsubscribe_server_callback(self, event, handle):
571
        self.iserver.unsubscribe_server_callback(event, handle)
572
573
    def link_method(self, node, callback):
574
        """
575
        Link a python function to a UA method in the address space; required when a UA method has been imported
576
        to the address space via XML; the python executable must be linked manually
577
        :param node: UA method node
578
        :param callback: python function that the UA method will call
579
        """
580
        self.iserver.isession.add_method_callback(node.nodeid, callback)
581
582
    def load_type_definitions(self, nodes=None) -> Coroutine:
583
        """
584
        load custom structures from our server.
585
        Server side this can be used to create python objects from custom structures
586
        imported through xml into server
587
        """
588
        return load_type_definitions(self, nodes)
589
590
    def load_enums(self) -> Coroutine:
591
        """
592
        load UA structures and generate python Enums in ua module for custom enums in server
593
        """
594
        return load_enums(self)
595
596
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
597
        """
598
        directly write datavalue to the Attribute, bypasing some checks and structure creation
599
        so it is a little faster
600
        """
601
        return self.iserver.set_attribute_value(nodeid, datavalue, attr)
602