Completed
Push — master ( 6083f5...66575e )
by Olivier
03:46
created

Server.dehistorize_node_event()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 3.6875

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 11
ccs 1
cts 4
cp 0.25
crap 3.6875
rs 9.4285
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
from datetime import timedelta
7 1
try:
8 1
    from urllib.parse import urlparse
9
except ImportError:
10
    from urlparse import urlparse
11
12
13 1
from opcua import ua
14
# from opcua.binary_server import BinaryServer
15 1
from opcua.server.binary_server_asyncio import BinaryServer
16 1
from opcua.server.internal_server import InternalServer
17 1
from opcua.server.event_generator import EventGenerator
18 1
from opcua.common.node import Node
19 1
from opcua.common.subscription import Subscription
20 1
from opcua.common import xmlimporter
21 1
from opcua.common.manage_nodes import delete_nodes
22 1
from opcua.client.client import Client
23 1
from opcua.crypto import security_policies
24 1
from opcua.common.event_objects import BaseEvent
25 1
from opcua.common.shortcuts import Shortcuts
26 1
from opcua.common.xmlexporter import XmlExporter
27 1
from opcua.common.ua_utils import get_nodes_of_namespace
28 1
use_crypto = True
29 1
try:
30 1
    from opcua.crypto import uacrypto
31
except ImportError:
32
    print("cryptography is not installed, use of crypto disabled")
33
    use_crypto = False
34
35
36 1
class Server(object):
37
38
    """
39
    High level Server class
40
41
    This class creates an opcua server with default values
42
43
    Create your own namespace and then populate your server address space
44
    using use the get_root() or get_objects() to get Node objects.
45
    and get_event_object() to fire events.
46
    Then start server. See example_server.py
47
    All methods are threadsafe
48
49
    If you need more flexibility you call directly the Ua Service methods
50
    on the iserver  or iserver.isesssion object members.
51
52
    During startup the standard address space will be constructed, which may be
53
    time-consuming when running a server on a less powerful device (e.g. a
54
    Raspberry Pi). In order to improve startup performance, a optional path to a
55
    cache file can be passed to the server constructor.
56
    If the parameter is defined, the address space will be loaded from the
57
    cache file or the file will be created if it does not exist yet.
58
    As a result the first startup will be even slower due to the cache file
59
    generation but all further start ups will be significantly faster.
60
61
    :ivar application_uri:
62
    :vartype application_uri: uri
63
    :ivar product_uri:
64
    :vartype product_uri: uri
65
    :ivar name:
66
    :vartype name: string
67
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
68
    :vartype default_timeout: int
69
    :ivar iserver: internal server object
70
    :vartype default_timeout: InternalServer
71
    :ivar bserver: binary protocol server
72
    :vartype bserver: BinaryServer
73
    :ivar nodes: shortcuts to common nodes
74
    :vartype nodes: Shortcuts
75
76
    """
77
78 1
    def __init__(self, shelffile=None, iserver=None):
79 1
        self.logger = logging.getLogger(__name__)
80 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
81 1
        self.application_uri = "urn:freeopcua:python:server"
82 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
83 1
        self.name = "FreeOpcUa Python Server"
84 1
        self.application_type = ua.ApplicationType.ClientAndServer
85 1
        self.default_timeout = 3600000
86 1
        if iserver is not None:
87
            self.iserver = iserver
88
        else:
89 1
            self.iserver = InternalServer(shelffile)
90 1
        self.bserver = None
91 1
        self._discovery_clients = {}
92 1
        self._discovery_period = 60
93 1
        self.certificate = None
94 1
        self.private_key = None
95 1
        self._policies = []
96 1
        self.nodes = Shortcuts(self.iserver.isession)
97
98
        # setup some expected values
99 1
        self.register_namespace(self.application_uri)
100 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
101 1
        sa_node.set_value([self.application_uri])
102
103 1
    def __enter__(self):
104 1
        self.start()
105 1
        return self
106
107 1
    def __exit__(self, exc_type, exc_value, traceback):
108 1
        self.stop()
109
110 1
    def load_certificate(self, path):
111
        """
112
        load server certificate from file, either pem or der
113
        """
114 1
        self.certificate = uacrypto.load_certificate(path)
115
116 1
    def load_private_key(self, path):
117 1
        self.private_key = uacrypto.load_private_key(path)
118
119 1
    def disable_clock(self, val=True):
120
        """
121
        for debugging you may want to disable clock that write every second
122
        to address space
123
        """
124
        self.iserver.disabled_clock = val
125
126 1
    def set_application_uri(self, uri):
127
        """
128
        Set application/server URI.
129
        This uri is supposed to be unique. If you intent to register
130
        your server to a discovery server, it really should be unique in
131
        your system!
132
        default is : "urn:freeopcua:python:server"
133
        """
134 1
        self.application_uri = uri
135
136 1
    def find_servers(self, uris=None):
137
        """
138
        find_servers. mainly implemented for symmetry with client
139
        """
140 1
        if uris is None:
141 1
            uris = []
142 1
        params = ua.FindServersParameters()
143 1
        params.EndpointUrl = self.endpoint.geturl()
144 1
        params.ServerUris = uris
145 1
        return self.iserver.find_servers(params)
146
147 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
148
        """
149
        Register to an OPC-UA Discovery server. Registering must be renewed at
150
        least every 10 minutes, so this method will use our asyncio thread to
151
        re-register every period seconds
152
        if period is 0 registration is not automatically renewed
153
        """
154
        # FIXME: have a period per discovery
155 1
        if url in self._discovery_clients:
156 1
            self._discovery_clients[url].disconnect()
157 1
        self._discovery_clients[url] = Client(url)
158 1
        self._discovery_clients[url].connect()
159 1
        self._discovery_clients[url].register_server(self)
160 1
        self._discovery_period = period
161 1
        if period:
162
            self.iserver.loop.call_soon(self._renew_registration)
163
164 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
165
        """
166
        stop registration thread
167
        """
168
        # FIXME: is there really no way to deregister?
169
        self._discovery_clients[url].disconnect()
170
171 1
    def _renew_registration(self):
172
        for client in self._discovery_clients.values():
173
            client.register_server(self)
174
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
175
176 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
177
        """
178
        Create a client to discovery server and return it
179
        """
180
        client = Client(url)
181
        client.connect()
182
        return client
183
184 1
    def allow_remote_admin(self, allow):
185
        """
186
        Enable or disable the builtin Admin user from network clients
187
        """
188
        self.iserver.allow_remote_admin = allow
189
190 1
    def set_endpoint(self, url):
191 1
        self.endpoint = urlparse(url)
192
193 1
    def get_endpoints(self):
194 1
        return self.iserver.get_endpoints()
195
196 1
    def _setup_server_nodes(self):
197
        # to be called just before starting server since it needs all parameters to be setup
198 1
        self._set_endpoints()
199 1
        self._policies = [ua.SecurityPolicyFactory()]
200 1
        if self.certificate and self.private_key:
201 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
202
                                ua.MessageSecurityMode.SignAndEncrypt)
203 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
204
                                                           ua.MessageSecurityMode.SignAndEncrypt,
205
                                                           self.certificate,
206
                                                           self.private_key)
207
                                 )
208 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
209
                                ua.MessageSecurityMode.Sign)
210 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
211
                                                           ua.MessageSecurityMode.Sign,
212
                                                           self.certificate,
213
                                                           self.private_key)
214
                                 )
215 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
216
                                ua.MessageSecurityMode.SignAndEncrypt)
217 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
218
                                                           ua.MessageSecurityMode.SignAndEncrypt,
219
                                                           self.certificate,
220
                                                           self.private_key)
221
                                 )
222 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
223
                                ua.MessageSecurityMode.Sign)
224 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
225
                                                           ua.MessageSecurityMode.Sign,
226
                                                           self.certificate,
227
                                                           self.private_key)
228
                                 )
229
230 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
231 1
        idtoken = ua.UserTokenPolicy()
232 1
        idtoken.PolicyId = 'anonymous'
233 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
234
235 1
        idtoken2 = ua.UserTokenPolicy()
236 1
        idtoken2.PolicyId = 'certificate_basic256'
237 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
238
239 1
        idtoken3 = ua.UserTokenPolicy()
240 1
        idtoken3.PolicyId = 'certificate_basic128'
241 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
242
243 1
        idtoken4 = ua.UserTokenPolicy()
244 1
        idtoken4.PolicyId = 'username'
245 1
        idtoken4.TokenType = ua.UserTokenType.UserName
246
247 1
        appdesc = ua.ApplicationDescription()
248 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
249 1
        appdesc.ApplicationUri = self.application_uri
250 1
        appdesc.ApplicationType = self.application_type
251 1
        appdesc.ProductUri = self.product_uri
252 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
253
254 1
        edp = ua.EndpointDescription()
255 1
        edp.EndpointUrl = self.endpoint.geturl()
256 1
        edp.Server = appdesc
257 1
        if self.certificate:
258 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
259 1
        edp.SecurityMode = mode
260 1
        edp.SecurityPolicyUri = policy.URI
261 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
262 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
263 1
        edp.SecurityLevel = 0
264 1
        self.iserver.add_endpoint(edp)
265
266 1
    def set_server_name(self, name):
267
        self.name = name
268
269 1
    def start(self):
270
        """
271
        Start to listen on network
272
        """
273 1
        self._setup_server_nodes()
274 1
        self.iserver.start()
275 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
276 1
        self.bserver.set_policies(self._policies)
277 1
        self.bserver.start()
278
279 1
    def stop(self):
280
        """
281
        Stop server
282
        """
283 1
        for client in self._discovery_clients.values():
284 1
            client.disconnect()
285 1
        self.bserver.stop()
286 1
        self.iserver.stop()
287
288 1
    def get_root_node(self):
289
        """
290
        Get Root node of server. Returns a Node object.
291
        """
292 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
293
294 1
    def get_objects_node(self):
295
        """
296
        Get Objects node of server. Returns a Node object.
297
        """
298 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
299
300 1
    def get_server_node(self):
301
        """
302
        Get Server node of server. Returns a Node object.
303
        """
304 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
305
306 1
    def get_node(self, nodeid):
307
        """
308
        Get a specific node using NodeId object or a string representing a NodeId
309
        """
310 1
        return Node(self.iserver.isession, nodeid)
311
312 1
    def create_subscription(self, period, handler):
313
        """
314
        Create a subscription.
315
        returns a Subscription object which allow
316
        to subscribe to events or data on server
317
        """
318 1
        params = ua.CreateSubscriptionParameters()
319 1
        params.RequestedPublishingInterval = period
320 1
        params.RequestedLifetimeCount = 3000
321 1
        params.RequestedMaxKeepAliveCount = 10000
322 1
        params.MaxNotificationsPerPublish = 0
323 1
        params.PublishingEnabled = True
324 1
        params.Priority = 0
325 1
        return Subscription(self.iserver.isession, params, handler)
326
327 1
    def get_namespace_array(self):
328
        """
329
        get all namespace defined in server
330
        """
331 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
332 1
        return ns_node.get_value()
333
334 1
    def register_namespace(self, uri):
335
        """
336
        Register a new namespace. Nodes should in custom namespace, not 0.
337
        """
338 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
339 1
        uries = ns_node.get_value()
340 1
        if uri in uries:
341 1
            return uries.index(uri)
342 1
        uries.append(uri)
343 1
        ns_node.set_value(uries)
344 1
        return len(uries) - 1
345
346 1
    def get_namespace_index(self, uri):
347
        """
348
        get index of a namespace using its uri
349
        """
350 1
        uries = self.get_namespace_array()
351 1
        return uries.index(uri)
352
353 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
354
        """
355
        Returns an event object using an event type from address space.
356
        Use this object to fire events
357
        """
358 1
        if not etype:
359 1
            etype = BaseEvent()
360 1
        return EventGenerator(self.iserver.isession, etype, source)
361
362 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=[]):
363 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
364
365 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=[]):
366 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
367
368 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=[], variables=[], methods=[]):
369 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
370
371
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
372
        # return self._create_custom_type(idx, name, basetype, properties)
373
374 1
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=[], variables=[], methods=[]):
375 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
376
377 1
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
378 1
        if isinstance(basetype, Node):
379 1
            base_t = basetype
380 1
        elif isinstance(basetype, ua.NodeId):
381 1
            base_t = Node(self.iserver.isession, basetype)
382
        else:
383 1
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
384
385 1
        custom_t = base_t.add_object_type(idx, name)
386 1
        for prop in properties:
387 1
            datatype = None
388 1
            if len(prop) > 2:
389
                datatype = prop[2]
390 1
            custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
391 1
        for variable in variables:
392 1
            datatype = None
393 1
            if len(variable) > 2:
394 1
                datatype = variable[2]
395 1
            custom_t.add_variable(idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
396 1
        for method in methods:
397 1
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
398
399 1
        return custom_t
400
401 1
    def import_xml(self, path):
402
        """
403
        Import nodes defined in xml
404
        """
405 1
        importer = xmlimporter.XmlImporter(self)
406 1
        return importer.import_xml(path)
407
408 1
    def export_xml(self, nodes, path):
409
        """
410
        Export defined nodes to xml
411
        """
412 1
        exp = XmlExporter(self)
413 1
        exp.build_etree(nodes)
414 1
        return exp.write_xml(path)
415
416 1
    def export_xml_by_ns(self, path, namespaces=[]):
417
        """
418
        Export nodes of one or more namespaces to an XML file.  
419
        Namespaces used by nodes are always exported for consistency.
420
        Args:
421
            server: opc ua server to use
422
            path: name of the xml file to write
423
            namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
424
    
425
        Returns:
426
        """
427
        nodes = get_nodes_of_namespace(self, namespaces)
428
        self.export_xml(nodes, path)
429
430 1
    def delete_nodes(self, nodes, recursive=False):
431 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
432
433 1
    def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
434
        """
435
        Start historizing supplied nodes; see history module
436
        Args:
437
            node: node or list of nodes that can be historized (variables/properties)
438
            period: time delta to store the history; older data will be deleted from the storage
439
            count: number of changes to store in the history
440
441
        Returns:
442
        """
443 1
        nodes = [node]
444 1
        for node in nodes:
445 1
            self.iserver.enable_history_data_change(node, period, count)
446
447 1
    def dehistorize_node_data_change(self, node):
448
        """
449
        Stop historizing supplied nodes; see history module
450
        Args:
451
            node: node or list of nodes that can be historized (UA variables/properties)
452
453
        Returns:
454
        """
455
        nodes = [node]
456
        for node in nodes:
457
            self.iserver.disable_history_data_change(node)
458
459 1
    def historize_node_event(self, node, period=timedelta(days=7), count=0):
460
        """
461
        Start historizing events from node (typically a UA object); see history module
462
        Args:
463
            node: node or list of nodes that can be historized (UA objects)
464
            period: time delta to store the history; older data will be deleted from the storage
465
            count: number of events to store in the history
466
467
        Returns:
468
        """
469 1
        nodes = [node]
470 1
        for node in nodes:
471 1
            self.iserver.enable_history_event(node, period, count)
472
473 1
    def dehistorize_node_event(self, node):
474
        """
475
        Stop historizing events from node (typically a UA object); see history module
476
        Args:
477
           node: node or list of nodes that can be historized (UA objects)
478
479
        Returns:
480
        """
481
        nodes = [node]
482
        for node in nodes:
483
            self.iserver.disable_history_event(node)
484
485 1
    def subscribe_server_callback(self, event, handle):
486
        self.iserver.subscribe_server_callback(event, handle)
487
488 1
    def unsubscribe_server_callback(self, event, handle):
489
        self.iserver.unsubscribe_server_callback(event, handle)
490
491 1
    def link_method(self, node, callback):
492
        """
493
        Link a python function to a UA method in the address space; required when a UA method has been imported
494
        to the address space via XML; the python executable must be linked manually
495
        Args:
496
            node: UA method node
497
            callback: python function that the UA method will call
498
499
        Returns:
500
        """
501
        self.iserver.isession.add_method_callback(node.nodeid, callback)
502