Test Failed
Pull Request — master (#51)
by Olivier
05:02
created

asyncua.server.server.Server.allow_remote_admin()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine, Optional
10
11
from asyncua import ua
12
from .binary_server_asyncio import BinaryServer
13
from .internal_server import InternalServer
14
from .event_generator import EventGenerator
15
from ..client import Client
16
from ..common.node import Node
17
from ..common.subscription import Subscription
18
from ..common.xmlimporter import XmlImporter
19
from ..common.xmlexporter import XmlExporter
20
from ..common.manage_nodes import delete_nodes
21
from ..common.event_objects import BaseEvent
22
from ..common.shortcuts import Shortcuts
23
from ..common.structures import load_type_definitions, load_enums
24
from ..common.ua_utils import get_nodes_of_namespace
25
26
from ..crypto import security_policies, uacrypto
27
28
_logger = logging.getLogger(__name__)
29
30
31
class Server:
32
    """
33
    High level Server class
34
35
    This class creates an asyncua server with default values
36
37
    Create your own namespace and then populate your server address space
38
    using use the get_root() or get_objects() to get Node objects.
39
    and get_event_object() to fire events.
40
    Then start server. See example_server.py
41
    All methods are threadsafe
42
43
    If you need more flexibility you call directly the Ua Service methods
44
    on the iserver  or iserver.isession object members.
45
46
    During startup the standard address space will be constructed, which may be
47
    time-consuming when running a server on a less powerful device (e.g. a
48
    Raspberry Pi). In order to improve startup performance, a optional path to a
49
    cache file can be passed to the server constructor.
50
    If the parameter is defined, the address space will be loaded from the
51
    cache file or the file will be created if it does not exist yet.
52
    As a result the first startup will be even slower due to the cache file
53
    generation but all further start ups will be significantly faster.
54
    ┌────────┐
55
    │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor
56
    │        │ ── InternalServer ── InternalSession
57
    └────────┘                   ── SubscriptionService
58
59
    :ivar product_uri:
60
    :ivar name:
61
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
62
    :ivar iserver: `InternalServer` instance
63
    :ivar bserver: binary protocol server `BinaryServer`
64
    :ivar nodes: shortcuts to common nodes - `Shortcuts` instance
65
    """
66
67
    def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
68
        self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
69
        self.logger = logging.getLogger(__name__)
70
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
71
        self._application_uri = "urn:freeopcua:python:server"
72
        self.product_uri = "urn:freeopcua.github.io:python:server"
73
        self.name: str = "FreeOpcUa Python Server"
74
        self.manufacturer_name = "FreeOpcUa"
75
        self.application_type = ua.ApplicationType.ClientAndServer
76
        self.default_timeout: int = 60 * 60 * 1000
77
        self.iserver = iserver if iserver else InternalServer(self.loop)
78
        self.bserver: Optional[BinaryServer] = None
79
        self._discovery_clients = {}
80
        self._discovery_period = 60
81
        self._policies = []
82
        self.nodes = Shortcuts(self.iserver.isession)
83
        # enable all endpoints by default
84
        self._security_policy = [
85
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
86
            ua.SecurityPolicyType.Basic256Sha256_Sign
87
        ]
88
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
89
        self.certificate = None
90
91
    async def init(self, shelf_file=None):
92
        await self.iserver.init(shelf_file)
93
        # setup some expected values
94
        await self.set_application_uri(self._application_uri)
95
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
96
        await sa_node.set_value([self._application_uri])
97
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
98
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
99
        status = ua.ServerStatusDataType()
100
        status.BuildInfo.ProductUri = self.product_uri
101
        status.BuildInfo.ManufacturerName = self.manufacturer_name
102
        status.BuildInfo.ProductName = self.name
103
        status.BuildInfo.SoftwareVersion = "1.0pre"
104
        status.BuildInfo.BuildNumber = "0"
105
        status.BuildInfo.BuildDate = datetime.now()
106
        status.SecondsTillShutdown = 0
107
        await status_node.set_value(status)
108
        await build_node.set_value(status.BuildInfo)
109
110
    async def __aenter__(self):
111
        await self.start()
112
113
    async def __aexit__(self, exc_type, exc_value, traceback):
114
        await self.stop()
115
116
    def __str__(self):
117
        return f"OPC UA Server({self.endpoint.geturl()})"
118
    __repr__ = __str__
119
120
    async def load_certificate(self, path: str):
121
        """
122
        load server certificate from file, either pem or der
123
        """
124
        self.certificate = await uacrypto.load_certificate(path)
125
126
    async def load_private_key(self, path):
127
        self.iserver.private_key = await uacrypto.load_private_key(path)
128
129
    def disable_clock(self, val: bool = True):
130
        """
131
        for debugging you may want to disable clock that write every second
132
        to address space
133
        """
134
        self.iserver.disabled_clock = val
135
136
    def get_application_uri(self):
137
        return self._application_uri
138
139
    async def set_application_uri(self, uri: str):
140
        """
141
        Set application/server URI.
142
        This uri is supposed to be unique. If you intent to register
143
        your server to a discovery server, it really should be unique in
144
        your system!
145
        default is : "urn:freeopcua:python:server"
146
        """
147
        self._application_uri = uri
148
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
149
        uries = await ns_node.get_value()
150
        if len(uries) > 1:
151
            uries[1] = uri  # application uri is always namespace 1
152
        else:
153
            uries.append(uri)
154
        await ns_node.set_value(uries)
155
156
    async def find_servers(self, uris=None):
157
        """
158
        find_servers. mainly implemented for symmetry with client
159
        """
160
        if uris is None:
161
            uris = []
162
        params = ua.FindServersParameters()
163
        params.EndpointUrl = self.endpoint.geturl()
164
        params.ServerUris = uris
165
        return self.iserver.find_servers(params)
166
167
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
168
        """
169
        Register to an OPC-UA Discovery server. Registering must be renewed at
170
        least every 10 minutes, so this method will use our asyncio thread to
171
        re-register every period seconds
172
        if period is 0 registration is not automatically renewed
173
        """
174
        # FIXME: have a period per discovery
175
        if url in self._discovery_clients:
176
            await self._discovery_clients[url].disconnect()
177
        self._discovery_clients[url] = Client(url)
178
        await self._discovery_clients[url].connect()
179
        await self._discovery_clients[url].register_server(self)
180
        self._discovery_period = period
181
        if period:
182
            self.loop.call_soon(self._schedule_renew_registration)
183
184
    def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
185
        """
186
        stop registration thread
187
        """
188
        # FIXME: is there really no way to deregister?
189
        self._discovery_clients[url].disconnect()
190
191
    def _schedule_renew_registration(self):
192
        self.loop.create_task(self._renew_registration())
193
        self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
194
195
    async def _renew_registration(self):
196
        for client in self._discovery_clients.values():
197
            await client.register_server(self)
198
199
    def allow_remote_admin(self, allow):
200
        """
201
        Enable or disable the builtin Admin user from network clients
202
        """
203
        self.iserver.allow_remote_admin = allow
204
205
    def set_endpoint(self, url):
206
        self.endpoint = urlparse(url)
207
208
    def get_endpoints(self) -> Coroutine:
209
        return self.iserver.get_endpoints()
210
211
    def set_security_policy(self, security_policy):
212
        """
213
        Method setting up the security policies for connections
214
        to the server, where security_policy is a list of integers.
215
        During server initialization, all endpoints are enabled:
216
217
                security_policy = [
218
                            ua.SecurityPolicyType.NoSecurity,
219
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
220
                            ua.SecurityPolicyType.Basic256Sha256_Sign
221
                                ]
222
223
        E.g. to limit the number of endpoints and disable no encryption:
224
225
                set_security_policy([
226
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
227
228
        """
229
        self._security_policy = security_policy
230
231
    def set_security_IDs(self, policy_ids):
232
        """
233
            Method setting up the security endpoints for identification
234
            of clients. During server object initialization, all possible
235
            endpoints are enabled:
236
237
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
238
239
            E.g. to limit the number of IDs and disable anonymous clients:
240
241
                set_security_policy(["Basic256Sha256"])
242
243
        (Implementation for ID check is currently not finalized...)
244
        """
245
        self._policyIDs = policy_ids
246
247
    async def _setup_server_nodes(self):
248
        # to be called just before starting server since it needs all parameters to be setup
249
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
250
            self._set_endpoints()
251
            self._policies = [ua.SecurityPolicyFactory()]
252
253
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
254
            if not (self.certificate and self.iserver.private_key):
255
                self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
256
                return
257
258
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
259
                self.logger.warning(
260
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
261
262
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
263
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
264
                                    ua.MessageSecurityMode.SignAndEncrypt)
265
                self._policies.append(
266
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
267
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
268
                                             self.iserver.private_key))
269
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
270
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
271
                self._policies.append(
272
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
273
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
274
275
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
276
        idtokens = []
277
        if "Anonymous" in self._policyIDs:
278
            idtoken = ua.UserTokenPolicy()
279
            idtoken.PolicyId = "anonymous"
280
            idtoken.TokenType = ua.UserTokenType.Anonymous
281
            idtokens.append(idtoken)
282
283
        if "Basic256Sha256" in self._policyIDs:
284
            idtoken = ua.UserTokenPolicy()
285
            idtoken.PolicyId = 'certificate_basic256sha256'
286
            idtoken.TokenType = ua.UserTokenType.Certificate
287
            idtokens.append(idtoken)
288
289
        if "Username" in self._policyIDs:
290
            idtoken = ua.UserTokenPolicy()
291
            idtoken.PolicyId = "username"
292
            idtoken.TokenType = ua.UserTokenType.UserName
293
            idtokens.append(idtoken)
294
295
        appdesc = ua.ApplicationDescription()
296
        appdesc.ApplicationName = ua.LocalizedText(self.name)
297
        appdesc.ApplicationUri = self._application_uri
298
        appdesc.ApplicationType = self.application_type
299
        appdesc.ProductUri = self.product_uri
300
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
301
302
        edp = ua.EndpointDescription()
303
        edp.EndpointUrl = self.endpoint.geturl()
304
        edp.Server = appdesc
305
        if self.certificate:
306
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
307
        edp.SecurityMode = mode
308
        edp.SecurityPolicyUri = policy.URI
309
        edp.UserIdentityTokens = idtokens
310
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
311
        edp.SecurityLevel = 0
312
        self.iserver.add_endpoint(edp)
313
314
    def set_server_name(self, name):
315
        self.name = name
316
317
    async def start(self):
318
        """
319
        Start to listen on network
320
        """
321
        await self._setup_server_nodes()
322
        await self.iserver.start()
323
        try:
324
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
325
            self.bserver.set_policies(self._policies)
326
            await self.bserver.start()
327
        except Exception as exp:
328
            await self.iserver.stop()
329
            raise exp
330
331
    async def stop(self):
332
        """
333
        Stop server
334
        """
335
        if self._discovery_clients:
336
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
337
        await self.bserver.stop()
338
        await self.iserver.stop()
339
340
    def get_root_node(self):
341
        """
342
        Get Root node of server. Returns a Node object.
343
        """
344
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
345
346
    def get_objects_node(self):
347
        """
348
        Get Objects node of server. Returns a Node object.
349
        """
350
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
351
352
    def get_server_node(self):
353
        """
354
        Get Server node of server. Returns a Node object.
355
        """
356
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
357
358
    def get_node(self, nodeid):
359
        """
360
        Get a specific node using NodeId object or a string representing a NodeId
361
        """
362
        return Node(self.iserver.isession, nodeid)
363
364
    async def create_subscription(self, period, handler):
365
        """
366
        Create a subscription.
367
        Returns a Subscription object which allow to subscribe to events or data changes on server
368
        :param period: Period in milliseconds
369
        :param handler: A class instance - see `SubHandler` base class for details
370
        """
371
        params = ua.CreateSubscriptionParameters()
372
        params.RequestedPublishingInterval = period
373
        params.RequestedLifetimeCount = 3000
374
        params.RequestedMaxKeepAliveCount = 10000
375
        params.MaxNotificationsPerPublish = 0
376
        params.PublishingEnabled = True
377
        params.Priority = 0
378
        subscription = Subscription(self.iserver.isession, params, handler)
379
        await subscription.init()
380
        return subscription
381
382
    def get_namespace_array(self) -> Coroutine:
383
        """
384
        get all namespace defined in server
385
        """
386
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
387
        return ns_node.get_value()
388
389 View Code Duplication
    async def register_namespace(self, uri) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
390
        """
391
        Register a new namespace. Nodes should in custom namespace, not 0.
392
        """
393
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
394
        uries = await ns_node.get_value()
395
        if uri in uries:
396
            return uries.index(uri)
397
        uries.append(uri)
398
        await ns_node.set_value(uries)
399
        return len(uries) - 1
400
401
    async def get_namespace_index(self, uri):
402
        """
403
        get index of a namespace using its uri
404
        """
405
        uries = await self.get_namespace_array()
406
        return uries.index(uri)
407
408
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
409
        """
410
        Returns an event object using an event type from address space.
411
        Use this object to fire events
412
        """
413
        if not etype:
414
            etype = BaseEvent()
415
        ev_gen = EventGenerator(self.iserver.isession)
416
        await ev_gen.init(etype, emitting_node=emitting_node)
417
        return ev_gen
418
419
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None) -> Coroutine:
420
        if properties is None:
421
            properties = []
422
        return self._create_custom_type(idx, name, basetype, properties, [], [])
423
424
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
425
        if properties is None:
426
            properties = []
427
        return self._create_custom_type(idx, name, basetype, properties, [], [])
428
429
    def create_custom_object_type(self,
430
                                  idx,
431
                                  name,
432
                                  basetype=ua.ObjectIds.BaseObjectType,
433
                                  properties=None,
434
                                  variables=None,
435
                                  methods=None) -> Coroutine:
436
        if properties is None:
437
            properties = []
438
        if variables is None:
439
            variables = []
440
        if methods is None:
441
            methods = []
442
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
443
444
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
445
    # return self._create_custom_type(idx, name, basetype, properties)
446
447
    def create_custom_variable_type(self,
448
                                    idx,
449
                                    name,
450
                                    basetype=ua.ObjectIds.BaseVariableType,
451
                                    properties=None,
452
                                    variables=None,
453
                                    methods=None) -> Coroutine:
454
        if properties is None:
455
            properties = []
456
        if variables is None:
457
            variables = []
458
        if methods is None:
459
            methods = []
460
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
461
462
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
463
        if isinstance(basetype, Node):
464
            base_t = basetype
465
        elif isinstance(basetype, ua.NodeId):
466
            base_t = Node(self.iserver.isession, basetype)
467
        else:
468
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
469
        custom_t = await base_t.add_object_type(idx, name)
470
        for prop in properties:
471
            datatype = None
472
            if len(prop) > 2:
473
                datatype = prop[2]
474
            await custom_t.add_property(
475
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
476
        for variable in variables:
477
            datatype = None
478
            if len(variable) > 2:
479
                datatype = variable[2]
480
            await custom_t.add_variable(
481
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
482
        for method in methods:
483
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
484
        return custom_t
485
486
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
487
        """
488
        Import nodes defined in xml
489
        """
490
        importer = XmlImporter(self)
491
        return importer.import_xml(path, xmlstring)
492
493
    async def export_xml(self, nodes, path):
494
        """
495
        Export defined nodes to xml
496
        """
497
        exp = XmlExporter(self)
498
        await exp.build_etree(nodes)
499
        await exp.write_xml(path)
500
501
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
502
        """
503
        Export nodes of one or more namespaces to an XML file.
504
        Namespaces used by nodes are always exported for consistency.
505
        :param path: name of the xml file to write
506
        :param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
507
        """
508
        if namespaces is None:
509
            namespaces = []
510
        nodes = await get_nodes_of_namespace(self, namespaces)
511
        await self.export_xml(nodes, path)
512
513
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
514
        return delete_nodes(self.iserver.isession, nodes, recursive)
515
516
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
517
        """
518
        Start historizing supplied nodes; see history module
519
        :param node: node or list of nodes that can be historized (variables/properties)
520
        :param period: time delta to store the history; older data will be deleted from the storage
521
        :param count: number of changes to store in the history
522
        """
523
        nodes = node if isinstance(node, (list, tuple)) else [node]
524
        for n in nodes:
525
            await self.iserver.enable_history_data_change(n, period, count)
526
527
    async def dehistorize_node_data_change(self, node):
528
        """
529
        Stop historizing supplied nodes; see history module
530
        :param node: node or list of nodes that can be historized (UA variables/properties)
531
        """
532
        nodes = node if isinstance(node, (list, tuple)) else [node]
533
        for n in nodes:
534
            await self.iserver.disable_history_data_change(n)
535
536
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
537
        """
538
        Start historizing events from node (typically a UA object); see history module
539
        :param node: node or list of nodes that can be historized (UA objects)
540
        :param period: time delta to store the history; older data will be deleted from the storage
541
        :param count: number of events to store in the history
542
        """
543
        nodes = node if isinstance(node, (list, tuple)) else [node]
544
        for n in nodes:
545
            await self.iserver.enable_history_event(n, period, count)
546
547
    async def dehistorize_node_event(self, node):
548
        """
549
        Stop historizing events from node (typically a UA object); see history module
550
        :param node: node or list of nodes that can be historized (UA objects)
551
        """
552
        nodes = node if isinstance(node, (list, tuple)) else [node]
553
        for n in nodes:
554
            await self.iserver.disable_history_event(n)
555
556
    def subscribe_server_callback(self, event, handle):
557
        self.iserver.subscribe_server_callback(event, handle)
558
559
    def unsubscribe_server_callback(self, event, handle):
560
        self.iserver.unsubscribe_server_callback(event, handle)
561
562
    def link_method(self, node, callback):
563
        """
564
        Link a python function to a UA method in the address space; required when a UA method has been imported
565
        to the address space via XML; the python executable must be linked manually
566
        :param node: UA method node
567
        :param callback: python function that the UA method will call
568
        """
569
        self.iserver.isession.add_method_callback(node.nodeid, callback)
570
571
    def load_type_definitions(self, nodes=None) -> Coroutine:
572
        """
573
        load custom structures from our server.
574
        Server side this can be used to create python objects from custom structures
575
        imported through xml into server
576
        """
577
        return load_type_definitions(self, nodes)
578
579
    def load_enums(self) -> Coroutine:
580
        """
581
        load UA structures and generate python Enums in ua module for custom enums in server
582
        """
583
        return load_enums(self)
584
585
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
586
        """
587
        directly write datavalue to the Attribute, bypasing some checks and structure creation
588
        so it is a little faster
589
        """
590
        return self.iserver.set_attribute_value(nodeid, datavalue, attr)
591