Passed
Push — master ( 1da59b...33bf38 )
by Olivier
03:50
created

Server._set_endpoints()   B

Complexity

Conditions 6

Size

Total Lines 44

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 39
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 44
ccs 39
cts 39
cp 1
crap 6
rs 7.5384
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta, datetime
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.structures import load_type_definitions
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.xmlimporter import XmlImporter
28 1
from opcua.common.ua_utils import get_nodes_of_namespace
29 1
use_crypto = True
30 1
try:
31 1
    from opcua.crypto import uacrypto
32
except ImportError:
33
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
34
    use_crypto = False
35
36
37 1
class Server(object):
38
39
    """
40
    High level Server class
41
42
    This class creates an opcua server with default values
43
44
    Create your own namespace and then populate your server address space
45
    using use the get_root() or get_objects() to get Node objects.
46
    and get_event_object() to fire events.
47
    Then start server. See example_server.py
48
    All methods are threadsafe
49
50
    If you need more flexibility you call directly the Ua Service methods
51
    on the iserver  or iserver.isession object members.
52
53
    During startup the standard address space will be constructed, which may be
54
    time-consuming when running a server on a less powerful device (e.g. a
55
    Raspberry Pi). In order to improve startup performance, a optional path to a
56
    cache file can be passed to the server constructor.
57
    If the parameter is defined, the address space will be loaded from the
58
    cache file or the file will be created if it does not exist yet.
59
    As a result the first startup will be even slower due to the cache file
60
    generation but all further start ups will be significantly faster.
61
62
    :ivar product_uri:
63
    :vartype product_uri: uri
64
    :ivar name:
65
    :vartype name: string
66
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
67
    :vartype default_timeout: int
68
    :ivar iserver: internal server object
69
    :vartype default_timeout: InternalServer
70
    :ivar bserver: binary protocol server
71
    :vartype bserver: BinaryServer
72
    :ivar nodes: shortcuts to common nodes
73
    :vartype nodes: Shortcuts
74
75
    """
76
77 1
    def __init__(self, shelffile=None, iserver=None):
78 1
        self.logger = logging.getLogger(__name__)
79 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self._application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.io:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83 1
        self.manufacturer_name = "FreeOpcUa"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.set_application_uri(self._application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self._application_uri])
102 1
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
103 1
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
104 1
        status = ua.ServerStatusDataType()
105 1
        status.BuildInfo.ProductUri = self.product_uri
106 1
        status.BuildInfo.ManufacturerName = self.manufacturer_name
107 1
        status.BuildInfo.ProductName = self.name
108 1
        status.BuildInfo.SoftwareVersion = "1.0pre"
109 1
        status.BuildInfo.BuildNumber = "0"
110 1
        status.BuildInfo.BuildDate = datetime.now()
111 1
        status.SecondsTillShutdown = 0
112 1
        status_node.set_value(status)
113 1
        build_node.set_value(status.BuildInfo)
114
115
116
        # enable all endpoints by default
117 1
        self._security_policy = [
118
                        ua.SecurityPolicyType.NoSecurity,
119
                        ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
120
                        ua.SecurityPolicyType.Basic128Rsa15_Sign,
121
                        ua.SecurityPolicyType.Basic256_SignAndEncrypt,
122
                        ua.SecurityPolicyType.Basic256_Sign
123
                                ]
124 1
        self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
125
126 1
    def __enter__(self):
127 1
        self.start()
128 1
        return self
129
130 1
    def __exit__(self, exc_type, exc_value, traceback):
131 1
        self.stop()
132
133 1
    def load_certificate(self, path):
134
        """
135
        load server certificate from file, either pem or der
136
        """
137 1
        self.certificate = uacrypto.load_certificate(path)
138
139 1
    def load_private_key(self, path):
140 1
        self.private_key = uacrypto.load_private_key(path)
141
142 1
    def disable_clock(self, val=True):
143
        """
144
        for debugging you may want to disable clock that write every second
145
        to address space
146
        """
147
        self.iserver.disabled_clock = val
148
149 1
    def set_application_uri(self, uri):
150
        """
151
        Set application/server URI.
152
        This uri is supposed to be unique. If you intent to register
153
        your server to a discovery server, it really should be unique in
154
        your system!
155
        default is : "urn:freeopcua:python:server"
156
        """
157 1
        self._application_uri = uri
158 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
159 1
        uries = ns_node.get_value()
160 1
        if len(uries) > 1:
161 1
            uries[1] = uri  # application uri is always namespace 1
162
        else:
163 1
            uries.append(uri)
164 1
        ns_node.set_value(uries)
165
166 1
    def find_servers(self, uris=None):
167
        """
168
        find_servers. mainly implemented for symmetry with client
169
        """
170 1
        if uris is None:
171 1
            uris = []
172 1
        params = ua.FindServersParameters()
173 1
        params.EndpointUrl = self.endpoint.geturl()
174 1
        params.ServerUris = uris
175 1
        return self.iserver.find_servers(params)
176
177 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
178
        """
179
        Register to an OPC-UA Discovery server. Registering must be renewed at
180
        least every 10 minutes, so this method will use our asyncio thread to
181
        re-register every period seconds
182
        if period is 0 registration is not automatically renewed
183
        """
184
        # FIXME: have a period per discovery
185 1
        if url in self._discovery_clients:
186 1
            self._discovery_clients[url].disconnect()
187 1
        self._discovery_clients[url] = Client(url)
188 1
        self._discovery_clients[url].connect()
189 1
        self._discovery_clients[url].register_server(self)
190 1
        self._discovery_period = period
191 1
        if period:
192
            self.iserver.loop.call_soon(self._renew_registration)
193
194 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
195
        """
196
        stop registration thread
197
        """
198
        # FIXME: is there really no way to deregister?
199
        self._discovery_clients[url].disconnect()
200
201 1
    def _renew_registration(self):
202
        for client in self._discovery_clients.values():
203
            client.register_server(self)
204
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
205
206 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
207
        """
208
        Create a client to discovery server and return it
209
        """
210
        client = Client(url)
211
        client.connect()
212
        return client
213
214 1
    def allow_remote_admin(self, allow):
215
        """
216
        Enable or disable the builtin Admin user from network clients
217
        """
218
        self.iserver.allow_remote_admin = allow
219
220 1
    def set_endpoint(self, url):
221 1
        self.endpoint = urlparse(url)
222
223 1
    def get_endpoints(self):
224 1
        return self.iserver.get_endpoints()
225
226 1
    def set_security_policy(self, security_policy):
227
        """
228
            Method setting up the security policies for connections
229
            to the server, where security_policy is a list of integers.
230
            During server initialization, all endpoints are enabled:
231
232
                security_policy = [
233
                            ua.SecurityPolicyType.NoSecurity,
234
                            ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
235
                            ua.SecurityPolicyType.Basic128Rsa15_Sign,
236
                            ua.SecurityPolicyType.Basic256_SignAndEncrypt,
237
                            ua.SecurityPolicyType.Basic256_Sign
238
                                ]
239
240
            E.g. to limit the number of endpoints and disable no encryption:
241
242
                set_security_policy([
243
                            ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt
244
                            ua.SecurityPolicyType.Basic256_SignAndEncrypt])
245
246
        """
247
        self._security_policy = security_policy
248
249 1
    def set_security_IDs(self, policyIDs):
250
        """
251
            Method setting up the security endpoints for identification
252
            of clients. During server object initialization, all possible 
253
            endpoints are enabled:
254
255
            self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
256
257
            E.g. to limit the number of IDs and disable anonymous clients:
258
259
                set_security_policy(["Basic256"])
260
261
            (Implementation for ID check is currently not finalized...)
262
263
        """
264
        self._policyIDs = policyIDs
265
266 1
    def _setup_server_nodes(self):
267
        # to be called just before starting server since it needs all parameters to be setup
268 1
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
269 1
            self._set_endpoints()
270 1
            self._policies = [ua.SecurityPolicyFactory()]
271
272 1
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
273 1
            if not (self.certificate and self.private_key):
274 1
                self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
275 1
                return
276
277 1
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
278 1
                self.logger.warning("Creating an open endpoint to the server, although encrypted endpoints are enabled.")
279
280 1
            if ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt in self._security_policy:
281 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
282
                                    ua.MessageSecurityMode.SignAndEncrypt)
283 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
284
                                                               ua.MessageSecurityMode.SignAndEncrypt,
285
                                                               self.certificate,
286
                                                               self.private_key)
287
                                     )
288 1
            if ua.SecurityPolicyType.Basic128Rsa15_Sign in self._security_policy:
289 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
290
                                    ua.MessageSecurityMode.Sign)
291 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
292
                                                               ua.MessageSecurityMode.Sign,
293
                                                               self.certificate,
294
                                                               self.private_key)
295
                                     )
296 1
            if ua.SecurityPolicyType.Basic256_SignAndEncrypt in self._security_policy:
297 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
298
                                    ua.MessageSecurityMode.SignAndEncrypt)
299 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
300
                                                               ua.MessageSecurityMode.SignAndEncrypt,
301
                                                               self.certificate,
302
                                                               self.private_key)
303
                                     )
304 1
            if ua.SecurityPolicyType.Basic256_Sign in self._security_policy:
305 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
306
                                    ua.MessageSecurityMode.Sign)
307 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
308
                                                               ua.MessageSecurityMode.Sign,
309
                                                               self.certificate,
310
                                                               self.private_key)
311
                                     )
312
313 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
314 1
        idtokens = []
315 1
        if "Anonymous" in self._policyIDs:
316 1
            idtoken = ua.UserTokenPolicy()
317 1
            idtoken.PolicyId = 'anonymous'
318 1
            idtoken.TokenType = ua.UserTokenType.Anonymous
319 1
            idtokens.append(idtoken)
320
321 1
        if "Basic256" in self._policyIDs:
322 1
            idtoken = ua.UserTokenPolicy()
323 1
            idtoken.PolicyId = 'certificate_basic256'
324 1
            idtoken.TokenType = ua.UserTokenType.Certificate
325 1
            idtokens.append(idtoken)
326
327 1
        if "Basic128" in self._policyIDs:
328 1
            idtoken = ua.UserTokenPolicy()
329 1
            idtoken.PolicyId = 'certificate_basic128'
330 1
            idtoken.TokenType = ua.UserTokenType.Certificate
331 1
            idtokens.append(idtoken)
332
333 1
        if "Username" in self._policyIDs:
334 1
            idtoken = ua.UserTokenPolicy()
335 1
            idtoken.PolicyId = 'username'
336 1
            idtoken.TokenType = ua.UserTokenType.UserName
337 1
            idtokens.append(idtoken)
338
339 1
        appdesc = ua.ApplicationDescription()
340 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
341 1
        appdesc.ApplicationUri = self._application_uri
342 1
        appdesc.ApplicationType = self.application_type
343 1
        appdesc.ProductUri = self.product_uri
344 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
345
346 1
        edp = ua.EndpointDescription()
347 1
        edp.EndpointUrl = self.endpoint.geturl()
348 1
        edp.Server = appdesc
349 1
        if self.certificate:
350 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
351 1
        edp.SecurityMode = mode
352 1
        edp.SecurityPolicyUri = policy.URI
353 1
        edp.UserIdentityTokens = idtokens
354 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
355 1
        edp.SecurityLevel = 0
356 1
        self.iserver.add_endpoint(edp)
357
358 1
    def set_server_name(self, name):
359
        self.name = name
360
361 1
    def start(self):
362
        """
363
        Start to listen on network
364
        """
365 1
        self._setup_server_nodes()
366 1
        self.iserver.start()
367 1
        try:
368 1
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
369 1
            self.bserver.set_policies(self._policies)
370 1
            self.bserver.start()
371 1
        except Exception as exp:
372 1
            self.iserver.stop()
373 1
            raise exp
374
375 1
    def stop(self):
376
        """
377
        Stop server
378
        """
379 1
        for client in self._discovery_clients.values():
380 1
            client.disconnect()
381 1
        self.bserver.stop()
382 1
        self.iserver.stop()
383
384 1
    def get_root_node(self):
385
        """
386
        Get Root node of server. Returns a Node object.
387
        """
388 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
389
390 1
    def get_objects_node(self):
391
        """
392
        Get Objects node of server. Returns a Node object.
393
        """
394 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
395
396 1
    def get_server_node(self):
397
        """
398
        Get Server node of server. Returns a Node object.
399
        """
400 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
401
402 1
    def get_node(self, nodeid):
403
        """
404
        Get a specific node using NodeId object or a string representing a NodeId
405
        """
406 1
        return Node(self.iserver.isession, nodeid)
407
408 1
    def create_subscription(self, period, handler):
409
        """
410
        Create a subscription.
411
        returns a Subscription object which allow
412
        to subscribe to events or data on server
413
        period is in milliseconds
414
        handler is a python object with following methods:
415
            def datachange_notification(self, node, val, data):
416
            def event_notification(self, event):
417
            def status_change_notification(self, status):
418
        """
419 1
        params = ua.CreateSubscriptionParameters()
420 1
        params.RequestedPublishingInterval = period
421 1
        params.RequestedLifetimeCount = 3000
422 1
        params.RequestedMaxKeepAliveCount = 10000
423 1
        params.MaxNotificationsPerPublish = 0
424 1
        params.PublishingEnabled = True
425 1
        params.Priority = 0
426 1
        return Subscription(self.iserver.isession, params, handler)
427
428 1
    def get_namespace_array(self):
429
        """
430
        get all namespace defined in server
431
        """
432 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
433 1
        return ns_node.get_value()
434
435 1
    def register_namespace(self, uri):
436
        """
437
        Register a new namespace. Nodes should in custom namespace, not 0.
438
        """
439 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
440 1
        uries = ns_node.get_value()
441 1
        if uri in uries:
442 1
            return uries.index(uri)
443 1
        uries.append(uri)
444 1
        ns_node.set_value(uries)
445 1
        return len(uries) - 1
446
447 1
    def get_namespace_index(self, uri):
448
        """
449
        get index of a namespace using its uri
450
        """
451 1
        uries = self.get_namespace_array()
452 1
        return uries.index(uri)
453
454 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
455
        """
456
        Returns an event object using an event type from address space.
457
        Use this object to fire events
458
        """
459 1
        if not etype:
460 1
            etype = BaseEvent()
461 1
        return EventGenerator(self.iserver.isession, etype, source)
462
463 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
464 1
        if properties is None:
465
            properties = []
466 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
467
468 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
469 1
        if properties is None:
470
            properties = []
471 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
472
473 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
474 1
        if properties is None:
475
            properties = []
476 1
        if variables is None:
477
            variables = []
478 1
        if methods is None:
479
            methods = []
480 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
481
482
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
483
        # return self._create_custom_type(idx, name, basetype, properties)
484
485 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
486 1
        if properties is None:
487
            properties = []
488 1
        if variables is None:
489 1
            variables = []
490 1
        if methods is None:
491 1
            methods = []
492 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
493
494 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
495 1
        if isinstance(basetype, Node):
496 1
            base_t = basetype
497 1
        elif isinstance(basetype, ua.NodeId):
498 1
            base_t = Node(self.iserver.isession, basetype)
499
        else:
500 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
501
502 1
        custom_t = base_t.add_object_type(idx, name)
503 1
        for prop in properties:
504 1
            datatype = None
505 1
            if len(prop) > 2:
506
                datatype = prop[2]
507 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
508 1
        for variable in variables:
509 1
            datatype = None
510 1
            if len(variable) > 2:
511 1
                datatype = variable[2]
512 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
513 1
        for method in methods:
514 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
515
516 1
        return custom_t
517
518 1
    def import_xml(self, path=None, xmlstring=None):
519
        """
520
        Import nodes defined in xml
521
        """
522 1
        importer = XmlImporter(self)
523 1
        return importer.import_xml(path, xmlstring)
524
525 1
    def export_xml(self, nodes, path):
526
        """
527
        Export defined nodes to xml
528
        """
529 1
        exp = XmlExporter(self)
530 1
        exp.build_etree(nodes)
531 1
        return exp.write_xml(path)
532
533 1
    def export_xml_by_ns(self, path, namespaces=None):
534
        """
535
        Export nodes of one or more namespaces to an XML file.
536
        Namespaces used by nodes are always exported for consistency.
537
        Args:
538
            server: opc ua server to use
539
            path: name of the xml file to write
540
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
541
542
        Returns:
543
        """
544
        if namespaces is None:
545
            namespaces = []
546
        nodes = get_nodes_of_namespace(self, namespaces)
547
        self.export_xml(nodes, path)
548
549 1
    def delete_nodes(self, nodes, recursive=False):
550 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
551
552 1
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
553
        """
554
        Start historizing supplied nodes; see history module
555
        Args:
556
            node: node or list of nodes that can be historized (variables/properties)
557
            period: time delta to store the history; older data will be deleted from the storage
558
            count: number of changes to store in the history
559
560
        Returns:
561
        """
562 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
563 1
        for node in nodes:
564 1
            self.iserver.enable_history_data_change(node, period, count)
565
566 1
    def dehistorize_node_data_change(self, node):
567
        """
568
        Stop historizing supplied nodes; see history module
569
        Args:
570
            node: node or list of nodes that can be historized (UA variables/properties)
571
572
        Returns:
573
        """
574
        nodes = node if isinstance(node, (list, tuple)) else [node]
575
        for node in nodes:
576
            self.iserver.disable_history_data_change(node)
577
578 1
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
579
        """
580
        Start historizing events from node (typically a UA object); see history module
581
        Args:
582
            node: node or list of nodes that can be historized (UA objects)
583
            period: time delta to store the history; older data will be deleted from the storage
584
            count: number of events to store in the history
585
586
        Returns:
587
        """
588 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
589 1
        for node in nodes:
590 1
            self.iserver.enable_history_event(node, period, count)
591
592 1
    def dehistorize_node_event(self, node):
593
        """
594
        Stop historizing events from node (typically a UA object); see history module
595
        Args:
596
           node: node or list of nodes that can be historized (UA objects)
597
598
        Returns:
599
        """
600
        nodes = node if isinstance(node, (list, tuple)) else [node]
601
        for node in nodes:
602
            self.iserver.disable_history_event(node)
603
604 1
    def subscribe_server_callback(self, event, handle):
605
        self.iserver.subscribe_server_callback(event, handle)
606
607 1
    def unsubscribe_server_callback(self, event, handle):
608
        self.iserver.unsubscribe_server_callback(event, handle)
609
610 1
    def link_method(self, node, callback):
611
        """
612
        Link a python function to a UA method in the address space; required when a UA method has been imported
613
        to the address space via XML; the python executable must be linked manually
614
        Args:
615
            node: UA method node
616
            callback: python function that the UA method will call
617
618
        Returns:
619
        """
620
        self.iserver.isession.add_method_callback(node.nodeid, callback)
621
622 1
    def load_type_definitions(self, nodes=None):
623
        return load_type_definitions(self, nodes)
624