Completed
Pull Request — master (#607)
by
unknown
08:41
created

Server._setup_server_nodes()   F

Complexity

Conditions 10

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 10

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 10
c 3
b 0
f 0
dl 0
loc 40
ccs 19
cts 19
cp 1
crap 10
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like Server._setup_server_nodes() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta, datetime
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.structures import load_type_definitions
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.xmlimporter import XmlImporter
28 1
from opcua.common.ua_utils import get_nodes_of_namespace
29 1
use_crypto = True
30 1
try:
31 1
    from opcua.crypto import uacrypto
32
except ImportError:
33
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
34
    use_crypto = False
35
36
37 1
class Server(object):
38
39
    """
40
    High level Server class
41
42
    This class creates an opcua server with default values
43
44
    Create your own namespace and then populate your server address space
45
    using use the get_root() or get_objects() to get Node objects.
46
    and get_event_object() to fire events.
47
    Then start server. See example_server.py
48
    All methods are threadsafe
49
50
    If you need more flexibility you call directly the Ua Service methods
51
    on the iserver  or iserver.isession object members.
52
53
    During startup the standard address space will be constructed, which may be
54
    time-consuming when running a server on a less powerful device (e.g. a
55
    Raspberry Pi). In order to improve startup performance, a optional path to a
56
    cache file can be passed to the server constructor.
57
    If the parameter is defined, the address space will be loaded from the
58
    cache file or the file will be created if it does not exist yet.
59
    As a result the first startup will be even slower due to the cache file
60
    generation but all further start ups will be significantly faster.
61
62
    :ivar product_uri:
63
    :vartype product_uri: uri
64
    :ivar name:
65
    :vartype name: string
66
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
67
    :vartype default_timeout: int
68
    :ivar iserver: internal server object
69
    :vartype default_timeout: InternalServer
70
    :ivar bserver: binary protocol server
71
    :vartype bserver: BinaryServer
72
    :ivar nodes: shortcuts to common nodes
73
    :vartype nodes: Shortcuts
74
75
    """
76
77 1
    def __init__(self, shelffile=None, iserver=None):
78 1
        self.logger = logging.getLogger(__name__)
79 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80 1
        self._application_uri = "urn:freeopcua:python:server"
81 1
        self.product_uri = "urn:freeopcua.github.io:python:server"
82 1
        self.name = "FreeOpcUa Python Server"
83 1
        self.manufacturer_name = "FreeOpcUa"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.set_application_uri(self._application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self._application_uri])
102 1
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
103 1
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
104 1
        status = ua.ServerStatusDataType()
105 1
        status.BuildInfo.ProductUri = self.product_uri
106 1
        status.BuildInfo.ManufacturerName = self.manufacturer_name
107 1
        status.BuildInfo.ProductName = self.name
108 1
        status.BuildInfo.SoftwareVersion = "1.0pre"
109 1
        status.BuildInfo.BuildNumber = "0"
110 1
        status.BuildInfo.BuildDate = datetime.now()
111 1
        status.SecondsTillShutdown = 0
112 1
        status_node.set_value(status)
113 1
        build_node.set_value(status.BuildInfo)
114
115
116
        # enable all endpoints by default
117 1
        self._security_policy = ["None", "Basic128Rsa15_Sign",
118
                                    "Basic128Rsa15_SignAndEncrypt", 
119
                                    "Basic256_Sign", "Basic256_SignAndEncrypt"]
120 1
        self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
121
122 1
    def __enter__(self):
123 1
        self.start()
124 1
        return self
125
126 1
    def __exit__(self, exc_type, exc_value, traceback):
127 1
        self.stop()
128
129 1
    def load_certificate(self, path):
130
        """
131
        load server certificate from file, either pem or der
132
        """
133 1
        self.certificate = uacrypto.load_certificate(path)
134
135 1
    def load_private_key(self, path):
136 1
        self.private_key = uacrypto.load_private_key(path)
137
138 1
    def disable_clock(self, val=True):
139
        """
140
        for debugging you may want to disable clock that write every second
141
        to address space
142
        """
143
        self.iserver.disabled_clock = val
144
145 1
    def set_application_uri(self, uri):
146
        """
147
        Set application/server URI.
148
        This uri is supposed to be unique. If you intent to register
149
        your server to a discovery server, it really should be unique in
150
        your system!
151
        default is : "urn:freeopcua:python:server"
152
        """
153 1
        self._application_uri = uri
154 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
155 1
        uries = ns_node.get_value()
156 1
        if len(uries) > 1:
157 1
            uries[1] = uri  # application uri is always namespace 1
158
        else:
159 1
            uries.append(uri)
160 1
        ns_node.set_value(uries)
161
162 1
    def find_servers(self, uris=None):
163
        """
164
        find_servers. mainly implemented for symmetry with client
165
        """
166 1
        if uris is None:
167 1
            uris = []
168 1
        params = ua.FindServersParameters()
169 1
        params.EndpointUrl = self.endpoint.geturl()
170 1
        params.ServerUris = uris
171 1
        return self.iserver.find_servers(params)
172
173 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
174
        """
175
        Register to an OPC-UA Discovery server. Registering must be renewed at
176
        least every 10 minutes, so this method will use our asyncio thread to
177
        re-register every period seconds
178
        if period is 0 registration is not automatically renewed
179
        """
180
        # FIXME: have a period per discovery
181 1
        if url in self._discovery_clients:
182 1
            self._discovery_clients[url].disconnect()
183 1
        self._discovery_clients[url] = Client(url)
184 1
        self._discovery_clients[url].connect()
185 1
        self._discovery_clients[url].register_server(self)
186 1
        self._discovery_period = period
187 1
        if period:
188
            self.iserver.loop.call_soon(self._renew_registration)
189
190 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
191
        """
192
        stop registration thread
193
        """
194
        # FIXME: is there really no way to deregister?
195
        self._discovery_clients[url].disconnect()
196
197 1
    def _renew_registration(self):
198
        for client in self._discovery_clients.values():
199
            client.register_server(self)
200
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
201
202 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
203
        """
204
        Create a client to discovery server and return it
205
        """
206
        client = Client(url)
207
        client.connect()
208
        return client
209
210 1
    def allow_remote_admin(self, allow):
211
        """
212
        Enable or disable the builtin Admin user from network clients
213
        """
214
        self.iserver.allow_remote_admin = allow
215
216 1
    def set_endpoint(self, url):
217 1
        self.endpoint = urlparse(url)
218
219 1
    def get_endpoints(self):
220 1
        return self.iserver.get_endpoints()
221
222 1
    def set_security_policy(self, security_policy):
223
        """
224
            Method setting up the security policies for connections
225
            to the server. During server object initialization, all
226
            possible endpoints are enabled:
227
228
                security_policy = ["None",
229
                                    "Basic128Rsa15_Sign",
230
                                    "Basic128Rsa15_SignAndEncrypt", 
231
                                    "Basic256_Sign",
232
                                    "Basic256_SignAndEncrypt"]
233
234
            where security_policy is a list of strings. "None" enables an 
235
            endpoint without any security.
236
237
            E.g. to limit the number of endpoints and disable no encryption:
238
239
                set_security_policy(["Basic256_Sign", 
240
                                        "Basic256_SignAndEncrypt"])
241
242
        """
243
        self._security_policy = security_policy
244
245
246 1
    def set_security_IDs(self, policyIDs):
247
        """
248
            Method setting up the security endpoints for identification
249
            of clients. During server object initialization, all possible 
250
            endpoints are enabled:
251
252
            self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
253
254
            E.g. to limit the number of IDs and disable anonymous clients:
255
256
                set_security_policy(["Basic256"])
257
258
            (Implementation for ID check is currently not finalized...)
259
260
        """
261
        self._policyIDs = policyIDs
262
263 1
    def _setup_server_nodes(self):
264
        # to be called just before starting server since it needs all parameters to be setup
265 1
        if "None" in self._security_policy:
266 1
            self._set_endpoints()
267 1
            self._policies = [ua.SecurityPolicyFactory()]
268 1
            if (len(self._security_policy)>1) and self.private_key:
269 1
                self.logger.warning("Creating an open endpoint to the server, although encrypted endpoints are enabled.")
270
271 1
        if self.certificate and self.private_key:
272 1
            if "Basic128Rsa15_Sign" in self._security_policy:
273 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
274
                                    ua.MessageSecurityMode.SignAndEncrypt)
275 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
276
                                                           ua.MessageSecurityMode.SignAndEncrypt,
277
                                                           self.certificate,
278
                                                           self.private_key)
279
                                 )
280 1
            if "Basic128Rsa15_SignAndEncrypt" in self._security_policy:
281 1
                self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
282
                                    ua.MessageSecurityMode.Sign)
283 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
284
                                                           ua.MessageSecurityMode.Sign,
285
                                                           self.certificate,
286
                                                           self.private_key)
287
                                 )
288 1
            if "Basic256_Sign" in self._security_policy:
289 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
290
                                    ua.MessageSecurityMode.SignAndEncrypt)
291 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
292
                                                           ua.MessageSecurityMode.SignAndEncrypt,
293
                                                           self.certificate,
294
                                                           self.private_key)
295
                                 )
296 1
            if "Basic256_SignAndEncrypt" in self._security_policy:
297 1
                self._set_endpoints(security_policies.SecurityPolicyBasic256,
298
                                    ua.MessageSecurityMode.Sign)
299 1
                self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
300
                                                           ua.MessageSecurityMode.Sign,
301
                                                           self.certificate,
302
                                                           self.private_key)
303
                                 )
304
305 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
306 1
        idtokens = []
307 1
        if "Anonymous" in self._policyIDs:
308 1
            idtoken = ua.UserTokenPolicy()
309 1
            idtoken.PolicyId = 'anonymous'
310 1
            idtoken.TokenType = ua.UserTokenType.Anonymous
311 1
            idtokens.append(idtoken)
312
313 1
        if "Basic256" in self._policyIDs:
314 1
            idtoken = ua.UserTokenPolicy()
315 1
            idtoken.PolicyId = 'certificate_basic256'
316 1
            idtoken.TokenType = ua.UserTokenType.Certificate
317 1
            idtokens.append(idtoken)
318
319 1
        if "Basic128" in self._policyIDs:
320 1
            idtoken = ua.UserTokenPolicy()
321 1
            idtoken.PolicyId = 'certificate_basic128'
322 1
            idtoken.TokenType = ua.UserTokenType.Certificate
323 1
            idtokens.append(idtoken)
324
325 1
        if "Username" in self._policyIDs:
326 1
            idtoken = ua.UserTokenPolicy()
327 1
            idtoken.PolicyId = 'username'
328 1
            idtoken.TokenType = ua.UserTokenType.UserName
329 1
            idtokens.append(idtoken)
330
331 1
        appdesc = ua.ApplicationDescription()
332 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
333 1
        appdesc.ApplicationUri = self._application_uri
334 1
        appdesc.ApplicationType = self.application_type
335 1
        appdesc.ProductUri = self.product_uri
336 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
337
338 1
        edp = ua.EndpointDescription()
339 1
        edp.EndpointUrl = self.endpoint.geturl()
340 1
        edp.Server = appdesc
341 1
        if self.certificate:
342 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
343 1
        edp.SecurityMode = mode
344 1
        edp.SecurityPolicyUri = policy.URI
345 1
        edp.UserIdentityTokens = idtokens
346 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
347 1
        edp.SecurityLevel = 0
348 1
        self.iserver.add_endpoint(edp)
349
350 1
    def set_server_name(self, name):
351
        self.name = name
352
353 1
    def start(self):
354
        """
355
        Start to listen on network
356
        """
357 1
        self._setup_server_nodes()
358 1
        self.iserver.start()
359 1
        try:
360 1
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
361 1
            self.bserver.set_policies(self._policies)
362 1
            self.bserver.start()
363 1
        except Exception as exp:
364 1
            self.iserver.stop()
365 1
            raise exp
366
367 1
    def stop(self):
368
        """
369
        Stop server
370
        """
371 1
        for client in self._discovery_clients.values():
372 1
            client.disconnect()
373 1
        self.bserver.stop()
374 1
        self.iserver.stop()
375
376 1
    def get_root_node(self):
377
        """
378
        Get Root node of server. Returns a Node object.
379
        """
380 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
381
382 1
    def get_objects_node(self):
383
        """
384
        Get Objects node of server. Returns a Node object.
385
        """
386 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
387
388 1
    def get_server_node(self):
389
        """
390
        Get Server node of server. Returns a Node object.
391
        """
392 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
393
394 1
    def get_node(self, nodeid):
395
        """
396
        Get a specific node using NodeId object or a string representing a NodeId
397
        """
398 1
        return Node(self.iserver.isession, nodeid)
399
400 1
    def create_subscription(self, period, handler):
401
        """
402
        Create a subscription.
403
        returns a Subscription object which allow
404
        to subscribe to events or data on server
405
        period is in milliseconds
406
        handler is a python object with following methods:
407
            def datachange_notification(self, node, val, data):
408
            def event_notification(self, event):
409
            def status_change_notification(self, status):
410
        """
411 1
        params = ua.CreateSubscriptionParameters()
412 1
        params.RequestedPublishingInterval = period
413 1
        params.RequestedLifetimeCount = 3000
414 1
        params.RequestedMaxKeepAliveCount = 10000
415 1
        params.MaxNotificationsPerPublish = 0
416 1
        params.PublishingEnabled = True
417 1
        params.Priority = 0
418 1
        return Subscription(self.iserver.isession, params, handler)
419
420 1
    def get_namespace_array(self):
421
        """
422
        get all namespace defined in server
423
        """
424 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
425 1
        return ns_node.get_value()
426
427 1
    def register_namespace(self, uri):
428
        """
429
        Register a new namespace. Nodes should in custom namespace, not 0.
430
        """
431 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
432 1
        uries = ns_node.get_value()
433 1
        if uri in uries:
434 1
            return uries.index(uri)
435 1
        uries.append(uri)
436 1
        ns_node.set_value(uries)
437 1
        return len(uries) - 1
438
439 1
    def get_namespace_index(self, uri):
440
        """
441
        get index of a namespace using its uri
442
        """
443 1
        uries = self.get_namespace_array()
444 1
        return uries.index(uri)
445
446 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
447
        """
448
        Returns an event object using an event type from address space.
449
        Use this object to fire events
450
        """
451 1
        if not etype:
452 1
            etype = BaseEvent()
453 1
        return EventGenerator(self.iserver.isession, etype, source)
454
455 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None):
456 1
        if properties is None:
457
            properties = []
458 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
459
460 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None):
461 1
        if properties is None:
462
            properties = []
463 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
464
465 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=None, variables=None, methods=None):
466 1
        if properties is None:
467
            properties = []
468 1
        if variables is None:
469
            variables = []
470 1
        if methods is None:
471
            methods = []
472 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
473
474
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
475
        # return self._create_custom_type(idx, name, basetype, properties)
476
477 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=None, variables=None, methods=None):
478 1
        if properties is None:
479
            properties = []
480 1
        if variables is None:
481 1
            variables = []
482 1
        if methods is None:
483 1
            methods = []
484 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
485
486 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
487 1
        if isinstance(basetype, Node):
488 1
            base_t = basetype
489 1
        elif isinstance(basetype, ua.NodeId):
490 1
            base_t = Node(self.iserver.isession, basetype)
491
        else:
492 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
493
494 1
        custom_t = base_t.add_object_type(idx, name)
495 1
        for prop in properties:
496 1
            datatype = None
497 1
            if len(prop) > 2:
498
                datatype = prop[2]
499 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
500 1
        for variable in variables:
501 1
            datatype = None
502 1
            if len(variable) > 2:
503 1
                datatype = variable[2]
504 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
505 1
        for method in methods:
506 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
507
508 1
        return custom_t
509
510 1
    def import_xml(self, path=None, xmlstring=None):
511
        """
512
        Import nodes defined in xml
513
        """
514 1
        importer = XmlImporter(self)
515 1
        return importer.import_xml(path, xmlstring)
516
517 1
    def export_xml(self, nodes, path):
518
        """
519
        Export defined nodes to xml
520
        """
521 1
        exp = XmlExporter(self)
522 1
        exp.build_etree(nodes)
523 1
        return exp.write_xml(path)
524
525 1
    def export_xml_by_ns(self, path, namespaces=None):
526
        """
527
        Export nodes of one or more namespaces to an XML file.
528
        Namespaces used by nodes are always exported for consistency.
529
        Args:
530
            server: opc ua server to use
531
            path: name of the xml file to write
532
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
533
534
        Returns:
535
        """
536
        if namespaces is None:
537
            namespaces = []
538
        nodes = get_nodes_of_namespace(self, namespaces)
539
        self.export_xml(nodes, path)
540
541 1
    def delete_nodes(self, nodes, recursive=False):
542 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
543
544 1
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
545
        """
546
        Start historizing supplied nodes; see history module
547
        Args:
548
            node: node or list of nodes that can be historized (variables/properties)
549
            period: time delta to store the history; older data will be deleted from the storage
550
            count: number of changes to store in the history
551
552
        Returns:
553
        """
554 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
555 1
        for node in nodes:
556 1
            self.iserver.enable_history_data_change(node, period, count)
557
558 1
    def dehistorize_node_data_change(self, node):
559
        """
560
        Stop historizing supplied nodes; see history module
561
        Args:
562
            node: node or list of nodes that can be historized (UA variables/properties)
563
564
        Returns:
565
        """
566
        nodes = node if isinstance(node, (list, tuple)) else [node]
567
        for node in nodes:
568
            self.iserver.disable_history_data_change(node)
569
570 1
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
571
        """
572
        Start historizing events from node (typically a UA object); see history module
573
        Args:
574
            node: node or list of nodes that can be historized (UA objects)
575
            period: time delta to store the history; older data will be deleted from the storage
576
            count: number of events to store in the history
577
578
        Returns:
579
        """
580 1
        nodes = node if isinstance(node, (list, tuple)) else [node]
581 1
        for node in nodes:
582 1
            self.iserver.enable_history_event(node, period, count)
583
584 1
    def dehistorize_node_event(self, node):
585
        """
586
        Stop historizing events from node (typically a UA object); see history module
587
        Args:
588
           node: node or list of nodes that can be historized (UA objects)
589
590
        Returns:
591
        """
592
        nodes = node if isinstance(node, (list, tuple)) else [node]
593
        for node in nodes:
594
            self.iserver.disable_history_event(node)
595
596 1
    def subscribe_server_callback(self, event, handle):
597
        self.iserver.subscribe_server_callback(event, handle)
598
599 1
    def unsubscribe_server_callback(self, event, handle):
600
        self.iserver.unsubscribe_server_callback(event, handle)
601
602 1
    def link_method(self, node, callback):
603
        """
604
        Link a python function to a UA method in the address space; required when a UA method has been imported
605
        to the address space via XML; the python executable must be linked manually
606
        Args:
607
            node: UA method node
608
            callback: python function that the UA method will call
609
610
        Returns:
611
        """
612
        self.iserver.isession.add_method_callback(node.nodeid, callback)
613
614 1
    def load_type_definitions(self, nodes=None):
615
        return load_type_definitions(self, nodes)
616