Passed
Push — master ( 31db04...b65a1f )
by Olivier
02:29
created

asyncua.server.server._get_node()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine, Optional
10
11
from asyncua import ua
12
from .binary_server_asyncio import BinaryServer
13
from .internal_server import InternalServer
14
from .event_generator import EventGenerator
15
from ..client import Client
16
from ..common.node import Node
17
from ..common.subscription import Subscription
18
from ..common.xmlimporter import XmlImporter
19
from ..common.xmlexporter import XmlExporter
20
from ..common.manage_nodes import delete_nodes
21
from ..common.event_objects import BaseEvent
22
from ..common.shortcuts import Shortcuts
23
from ..common.structures import load_type_definitions, load_enums
24
from ..common.ua_utils import get_nodes_of_namespace
25
26
from ..crypto import security_policies, uacrypto
27
28
_logger = logging.getLogger(__name__)
29
30
31
def _get_node(isession, whatever):
32
    if isinstance(whatever, Node):
33
        return whatever
34
    if isinstance(whatever, ua.NodeId):
35
        return Node(isession, whatever)
36
    return Node(isession, ua.NodeId(whatever))
37
38
39
class Server:
40
    """
41
    High level Server class
42
43
    This class creates an asyncua server with default values
44
45
    Create your own namespace and then populate your server address space
46
    using use the get_root() or get_objects() to get Node objects.
47
    and get_event_object() to fire events.
48
    Then start server. See example_server.py
49
    All methods are threadsafe
50
51
    If you need more flexibility you call directly the Ua Service methods
52
    on the iserver  or iserver.isession object members.
53
54
    During startup the standard address space will be constructed, which may be
55
    time-consuming when running a server on a less powerful device (e.g. a
56
    Raspberry Pi). In order to improve startup performance, a optional path to a
57
    cache file can be passed to the server constructor.
58
    If the parameter is defined, the address space will be loaded from the
59
    cache file or the file will be created if it does not exist yet.
60
    As a result the first startup will be even slower due to the cache file
61
    generation but all further start ups will be significantly faster.
62
    ┌────────┐
63
    │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor
64
    │        │ ── InternalServer ── InternalSession
65
    └────────┘                   ── SubscriptionService
66
67
    :ivar product_uri:
68
    :ivar name:
69
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
70
    :ivar iserver: `InternalServer` instance
71
    :ivar bserver: binary protocol server `BinaryServer`
72
    :ivar nodes: shortcuts to common nodes - `Shortcuts` instance
73
    """
74
75
    def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
76
        self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
77
        self.logger = logging.getLogger(__name__)
78
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
79
        self._application_uri = "urn:freeopcua:python:server"
80
        self.product_uri = "urn:freeopcua.github.io:python:server"
81
        self.name: str = "FreeOpcUa Python Server"
82
        self.manufacturer_name = "FreeOpcUa"
83
        self.application_type = ua.ApplicationType.ClientAndServer
84
        self.default_timeout: int = 60 * 60 * 1000
85
        self.iserver = iserver if iserver else InternalServer(self.loop)
86
        self.bserver: Optional[BinaryServer] = None
87
        self._discovery_clients = {}
88
        self._discovery_period = 60
89
        self._policies = []
90
        self.nodes = Shortcuts(self.iserver.isession)
91
        # enable all endpoints by default
92
        self._security_policy = [
93
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
94
            ua.SecurityPolicyType.Basic256Sha256_Sign
95
        ]
96
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
97
        self.certificate = None
98
99
    async def init(self, shelf_file=None):
100
        await self.iserver.init(shelf_file)
101
        # setup some expected values
102
        await self.set_application_uri(self._application_uri)
103
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
104
        await sa_node.set_value([self._application_uri])
105
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
106
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
107
        status = ua.ServerStatusDataType()
108
        status.BuildInfo.ProductUri = self.product_uri
109
        status.BuildInfo.ManufacturerName = self.manufacturer_name
110
        status.BuildInfo.ProductName = self.name
111
        status.BuildInfo.SoftwareVersion = "1.0pre"
112
        status.BuildInfo.BuildNumber = "0"
113
        status.BuildInfo.BuildDate = datetime.now()
114
        status.SecondsTillShutdown = 0
115
        await status_node.set_value(status)
116
        await build_node.set_value(status.BuildInfo)
117
118
    async def __aenter__(self):
119
        await self.start()
120
121
    async def __aexit__(self, exc_type, exc_value, traceback):
122
        await self.stop()
123
124
    async def load_certificate(self, path: str):
125
        """
126
        load server certificate from file, either pem or der
127
        """
128
        self.certificate = await uacrypto.load_certificate(path)
129
130
    async def load_private_key(self, path):
131
        self.iserver.private_key = await uacrypto.load_private_key(path)
132
133
    def disable_clock(self, val: bool = True):
134
        """
135
        for debugging you may want to disable clock that write every second
136
        to address space
137
        """
138
        self.iserver.disabled_clock = val
139
140
    def get_application_uri(self):
141
        return self._application_uri
142
143
    async def set_application_uri(self, uri: str):
144
        """
145
        Set application/server URI.
146
        This uri is supposed to be unique. If you intent to register
147
        your server to a discovery server, it really should be unique in
148
        your system!
149
        default is : "urn:freeopcua:python:server"
150
        """
151
        self._application_uri = uri
152
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
153
        uries = await ns_node.get_value()
154
        if len(uries) > 1:
155
            uries[1] = uri  # application uri is always namespace 1
156
        else:
157
            uries.append(uri)
158
        await ns_node.set_value(uries)
159
160
    async def find_servers(self, uris=None):
161
        """
162
        find_servers. mainly implemented for symmetry with client
163
        """
164
        if uris is None:
165
            uris = []
166
        params = ua.FindServersParameters()
167
        params.EndpointUrl = self.endpoint.geturl()
168
        params.ServerUris = uris
169
        return self.iserver.find_servers(params)
170
171
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
172
        """
173
        Register to an OPC-UA Discovery server. Registering must be renewed at
174
        least every 10 minutes, so this method will use our asyncio thread to
175
        re-register every period seconds
176
        if period is 0 registration is not automatically renewed
177
        """
178
        # FIXME: have a period per discovery
179
        if url in self._discovery_clients:
180
            await self._discovery_clients[url].disconnect()
181
        self._discovery_clients[url] = Client(url)
182
        await self._discovery_clients[url].connect()
183
        await self._discovery_clients[url].register_server(self)
184
        self._discovery_period = period
185
        if period:
186
            self.loop.call_soon(self._schedule_renew_registration)
187
188
    def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
189
        """
190
        stop registration thread
191
        """
192
        # FIXME: is there really no way to deregister?
193
        self._discovery_clients[url].disconnect()
194
195
    def _schedule_renew_registration(self):
196
        self.loop.create_task(self._renew_registration())
197
        self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
198
199
    async def _renew_registration(self):
200
        for client in self._discovery_clients.values():
201
            await client.register_server(self)
202
203
    def allow_remote_admin(self, allow):
204
        """
205
        Enable or disable the builtin Admin user from network clients
206
        """
207
        self.iserver.allow_remote_admin = allow
208
209
    def set_endpoint(self, url):
210
        self.endpoint = urlparse(url)
211
212
    def get_endpoints(self) -> Coroutine:
213
        return self.iserver.get_endpoints()
214
215
    def set_security_policy(self, security_policy):
216
        """
217
        Method setting up the security policies for connections
218
        to the server, where security_policy is a list of integers.
219
        During server initialization, all endpoints are enabled:
220
221
                security_policy = [
222
                            ua.SecurityPolicyType.NoSecurity,
223
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
224
                            ua.SecurityPolicyType.Basic256Sha256_Sign
225
                                ]
226
227
        E.g. to limit the number of endpoints and disable no encryption:
228
229
                set_security_policy([
230
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
231
232
        """
233
        self._security_policy = security_policy
234
235
    def set_security_IDs(self, policy_ids):
236
        """
237
            Method setting up the security endpoints for identification
238
            of clients. During server object initialization, all possible
239
            endpoints are enabled:
240
241
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
242
243
            E.g. to limit the number of IDs and disable anonymous clients:
244
245
                set_security_policy(["Basic256Sha256"])
246
247
        (Implementation for ID check is currently not finalized...)
248
        """
249
        self._policyIDs = policy_ids
250
251
    async def _setup_server_nodes(self):
252
        # to be called just before starting server since it needs all parameters to be setup
253
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
254
            self._set_endpoints()
255
            self._policies = [ua.SecurityPolicyFactory()]
256
257
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
258
            if not (self.certificate and self.iserver.private_key):
259
                self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
260
                return
261
262
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
263
                self.logger.warning(
264
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
265
266
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
267
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
268
                                    ua.MessageSecurityMode.SignAndEncrypt)
269
                self._policies.append(
270
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
271
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
272
                                             self.iserver.private_key))
273
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
274
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
275
                self._policies.append(
276
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
277
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
278
279
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
280
        idtokens = []
281
        if "Anonymous" in self._policyIDs:
282
            idtoken = ua.UserTokenPolicy()
283
            idtoken.PolicyId = "anonymous"
284
            idtoken.TokenType = ua.UserTokenType.Anonymous
285
            idtokens.append(idtoken)
286
287
        if "Basic256Sha256" in self._policyIDs:
288
            idtoken = ua.UserTokenPolicy()
289
            idtoken.PolicyId = 'certificate_basic256sha256'
290
            idtoken.TokenType = ua.UserTokenType.Certificate
291
            idtokens.append(idtoken)
292
293
        if "Username" in self._policyIDs:
294
            idtoken = ua.UserTokenPolicy()
295
            idtoken.PolicyId = "username"
296
            idtoken.TokenType = ua.UserTokenType.UserName
297
            idtokens.append(idtoken)
298
299
        appdesc = ua.ApplicationDescription()
300
        appdesc.ApplicationName = ua.LocalizedText(self.name)
301
        appdesc.ApplicationUri = self._application_uri
302
        appdesc.ApplicationType = self.application_type
303
        appdesc.ProductUri = self.product_uri
304
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
305
306
        edp = ua.EndpointDescription()
307
        edp.EndpointUrl = self.endpoint.geturl()
308
        edp.Server = appdesc
309
        if self.certificate:
310
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
311
        edp.SecurityMode = mode
312
        edp.SecurityPolicyUri = policy.URI
313
        edp.UserIdentityTokens = idtokens
314
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
315
        edp.SecurityLevel = 0
316
        self.iserver.add_endpoint(edp)
317
318
    def set_server_name(self, name):
319
        self.name = name
320
321
    async def start(self):
322
        """
323
        Start to listen on network
324
        """
325
        await self._setup_server_nodes()
326
        await self.iserver.start()
327
        try:
328
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
329
            self.bserver.set_policies(self._policies)
330
            await self.bserver.start()
331
        except Exception as exp:
332
            await self.iserver.stop()
333
            raise exp
334
335
    async def stop(self):
336
        """
337
        Stop server
338
        """
339
        if self._discovery_clients:
340
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
341
        await self.bserver.stop()
342
        await self.iserver.stop()
343
344
    def get_root_node(self):
345
        """
346
        Get Root node of server. Returns a Node object.
347
        """
348
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
349
350
    def get_objects_node(self):
351
        """
352
        Get Objects node of server. Returns a Node object.
353
        """
354
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
355
356
    def get_server_node(self):
357
        """
358
        Get Server node of server. Returns a Node object.
359
        """
360
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
361
362
    def get_node(self, nodeid):
363
        """
364
        Get a specific node using NodeId object or a string representing a NodeId
365
        """
366
        return Node(self.iserver.isession, nodeid)
367
368
    async def create_subscription(self, period, handler):
369
        """
370
        Create a subscription.
371
        Returns a Subscription object which allow to subscribe to events or data changes on server
372
        :param period: Period in milliseconds
373
        :param handler: A class instance - see `SubHandler` base class for details
374
        """
375
        params = ua.CreateSubscriptionParameters()
376
        params.RequestedPublishingInterval = period
377
        params.RequestedLifetimeCount = 3000
378
        params.RequestedMaxKeepAliveCount = 10000
379
        params.MaxNotificationsPerPublish = 0
380
        params.PublishingEnabled = True
381
        params.Priority = 0
382
        subscription = Subscription(self.iserver.isession, params, handler)
383
        await subscription.init()
384
        return subscription
385
386
    def get_namespace_array(self) -> Coroutine:
387
        """
388
        get all namespace defined in server
389
        """
390
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
391
        return ns_node.get_value()
392
393 View Code Duplication
    async def register_namespace(self, uri) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
394
        """
395
        Register a new namespace. Nodes should in custom namespace, not 0.
396
        """
397
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
398
        uries = await ns_node.get_value()
399
        if uri in uries:
400
            return uries.index(uri)
401
        uries.append(uri)
402
        await ns_node.set_value(uries)
403
        return len(uries) - 1
404
405
    async def get_namespace_index(self, uri):
406
        """
407
        get index of a namespace using its uri
408
        """
409
        uries = await self.get_namespace_array()
410
        return uries.index(uri)
411
412
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
413
        """
414
        Returns an event object using an event type from address space.
415
        Use this object to fire events
416
        """
417
        if not etype:
418
            etype = BaseEvent()
419
        ev_gen = EventGenerator(self.iserver.isession)
420
        await ev_gen.init(etype, emitting_node=emitting_node)
421
        return ev_gen
422
423
    async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None, description=None) -> Coroutine:
424
        if properties is None:
425
            properties = []
426
        base_t = _get_node(self.iserver.isession, basetype)
427
428
        custom_t = await base_t.add_data_type(idx, name, description)
429
        for prop in properties:
430
            datatype = None
431
            if len(prop) > 2:
432
                datatype = prop[2]
433
            await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
434
        return custom_t
435
436
    async def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
437
        if properties is None:
438
            properties = []
439
        return await self._create_custom_type(idx, name, basetype, properties, [], [])
440
441
    async def create_custom_object_type(self,
442
                                  idx,
443
                                  name,
444
                                  basetype=ua.ObjectIds.BaseObjectType,
445
                                  properties=None,
446
                                  variables=None,
447
                                  methods=None) -> Coroutine:
448
        if properties is None:
449
            properties = []
450
        if variables is None:
451
            variables = []
452
        if methods is None:
453
            methods = []
454
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
455
456
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
457
    # return self._create_custom_type(idx, name, basetype, properties)
458
459
    async def create_custom_variable_type(self,
460
                                    idx,
461
                                    name,
462
                                    basetype=ua.ObjectIds.BaseVariableType,
463
                                    properties=None,
464
                                    variables=None,
465
                                    methods=None) -> Coroutine:
466
        if properties is None:
467
            properties = []
468
        if variables is None:
469
            variables = []
470
        if methods is None:
471
            methods = []
472
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
473
474
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
475
        base_t = _get_node(self.iserver.isession, basetype)
476
        custom_t = await base_t.add_object_type(idx, name)
477
        for prop in properties:
478
            datatype = None
479
            if len(prop) > 2:
480
                datatype = prop[2]
481
            await custom_t.add_property(
482
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
483
        for variable in variables:
484
            datatype = None
485
            if len(variable) > 2:
486
                datatype = variable[2]
487
            await custom_t.add_variable(
488
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
489
        for method in methods:
490
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
491
        return custom_t
492
493
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
494
        """
495
        Import nodes defined in xml
496
        """
497
        importer = XmlImporter(self)
498
        return importer.import_xml(path, xmlstring)
499
500
    async def export_xml(self, nodes, path):
501
        """
502
        Export defined nodes to xml
503
        """
504
        exp = XmlExporter(self)
505
        await exp.build_etree(nodes)
506
        await exp.write_xml(path)
507
508
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
509
        """
510
        Export nodes of one or more namespaces to an XML file.
511
        Namespaces used by nodes are always exported for consistency.
512
        :param path: name of the xml file to write
513
        :param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
514
        """
515
        if namespaces is None:
516
            namespaces = []
517
        nodes = await get_nodes_of_namespace(self, namespaces)
518
        await self.export_xml(nodes, path)
519
520
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
521
        return delete_nodes(self.iserver.isession, nodes, recursive)
522
523
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
524
        """
525
        Start historizing supplied nodes; see history module
526
        :param node: node or list of nodes that can be historized (variables/properties)
527
        :param period: time delta to store the history; older data will be deleted from the storage
528
        :param count: number of changes to store in the history
529
        """
530
        nodes = node if isinstance(node, (list, tuple)) else [node]
531
        for n in nodes:
532
            await self.iserver.enable_history_data_change(n, period, count)
533
534
    async def dehistorize_node_data_change(self, node):
535
        """
536
        Stop historizing supplied nodes; see history module
537
        :param node: node or list of nodes that can be historized (UA variables/properties)
538
        """
539
        nodes = node if isinstance(node, (list, tuple)) else [node]
540
        for n in nodes:
541
            await self.iserver.disable_history_data_change(n)
542
543
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
544
        """
545
        Start historizing events from node (typically a UA object); see history module
546
        :param node: node or list of nodes that can be historized (UA objects)
547
        :param period: time delta to store the history; older data will be deleted from the storage
548
        :param count: number of events to store in the history
549
        """
550
        nodes = node if isinstance(node, (list, tuple)) else [node]
551
        for n in nodes:
552
            await self.iserver.enable_history_event(n, period, count)
553
554
    async def dehistorize_node_event(self, node):
555
        """
556
        Stop historizing events from node (typically a UA object); see history module
557
        :param node: node or list of nodes that can be historized (UA objects)
558
        """
559
        nodes = node if isinstance(node, (list, tuple)) else [node]
560
        for n in nodes:
561
            await self.iserver.disable_history_event(n)
562
563
    def subscribe_server_callback(self, event, handle):
564
        self.iserver.subscribe_server_callback(event, handle)
565
566
    def unsubscribe_server_callback(self, event, handle):
567
        self.iserver.unsubscribe_server_callback(event, handle)
568
569
    def link_method(self, node, callback):
570
        """
571
        Link a python function to a UA method in the address space; required when a UA method has been imported
572
        to the address space via XML; the python executable must be linked manually
573
        :param node: UA method node
574
        :param callback: python function that the UA method will call
575
        """
576
        self.iserver.isession.add_method_callback(node.nodeid, callback)
577
578
    def load_type_definitions(self, nodes=None) -> Coroutine:
579
        """
580
        load custom structures from our server.
581
        Server side this can be used to create python objects from custom structures
582
        imported through xml into server
583
        """
584
        return load_type_definitions(self, nodes)
585
586
    def load_enums(self) -> Coroutine:
587
        """
588
        load UA structures and generate python Enums in ua module for custom enums in server
589
        """
590
        return load_enums(self)
591
592
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
593
        """
594
        directly write datavalue to the Attribute, bypasing some checks and structure creation
595
        so it is a little faster
596
        """
597
        return self.iserver.set_attribute_value(nodeid, datavalue, attr)
598