Completed
Push — master ( 8941cc...63ac44 )
by Olivier
03:39
created

Server.link_method()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
ccs 0
cts 0
cp 0
crap 2
rs 10
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.server.event_generator import EventGenerator
17 1
from opcua.common.node import Node
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
from opcua.common.xmlexporter import XmlExporter
26
use_crypto = True
27
try:
28
    from opcua.crypto import uacrypto
29
except ImportError:
30
    print("cryptography is not installed, use of crypto disabled")
31 1
    use_crypto = False
32
33
34
class Server(object):
35
36
    """
37
    High level Server class
38
39
    This class creates an opcua server with default values
40
41
    Create your own namespace and then populate your server address space
42
    using use the get_root() or get_objects() to get Node objects.
43
    and get_event_object() to fire events.
44
    Then start server. See example_server.py
45
    All methods are threadsafe
46
47
    If you need more flexibility you call directly the Ua Service methods
48
    on the iserver  or iserver.isesssion object members.
49
50
    During startup the standard address space will be constructed, which may be
51
    time-consuming when running a server on a less powerful device (e.g. a
52
    Raspberry Pi). In order to improve startup performance, a optional path to a
53
    cache file can be passed to the server constructor.
54
    If the parameter is defined, the address space will be loaded from the
55
    cache file or the file will be created if it does not exist yet.
56
    As a result the first startup will be even slower due to the cache file
57
    generation but all further startups will be significantly faster.
58
59
    :ivar application_uri:
60
    :vartype application_uri: uri
61
    :ivar product_uri:
62
    :vartype product_uri: uri
63 1
    :ivar name:
64 1
    :vartype name: string
65 1
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
66 1
    :vartype default_timeout: int
67 1
    :ivar iserver: internal server object
68 1
    :vartype default_timeout: InternalServer
69 1
    :ivar bserver: binary protocol server
70 1
    :vartype bserver: BinaryServer
71 1
    :ivar nodes: shortcuts to common nodes
72 1
    :vartype nodes: Shortcuts
73 1
74 1
    """
75 1
76 1
    def __init__(self, cacheFile=None, iserver=None):
77 1
        self.logger = logging.getLogger(__name__)
78
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
79
        self.application_uri = "urn:freeopcua:python:server"
80 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
81 1
        self.name = "FreeOpcUa Python Server"
82 1
        self.application_type = ua.ApplicationType.ClientAndServer
83
        self.default_timeout = 3600000
84 1
        if iserver is not None:
85
            self.iserver = iserver
86
        else:
87
            self.iserver = InternalServer(cacheFile)
88 1
        self.bserver = None
89
        self._discovery_clients = {}
90
        self._discovery_period = 60
91 1
        self.certificate = None
92
        self.private_key = None
93
        self._policies = []
94
        self.nodes = Shortcuts(self.iserver.isession)
95 1
96
        # setup some expected values
97 1
        self.register_namespace(self.application_uri)
98 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
99
        sa_node.set_value([self.application_uri])
100 1
101
    def __enter__(self):
102
        self.start()
103
        return self
104
105
    def __exit__(self, exc_type, exc_value, traceback):
106
        self.stop()
107 1
108
    def load_certificate(self, path):
109
        """
110
        load server certificate from file, either pem or der
111
        """
112
        self.certificate = uacrypto.load_certificate(path)
113
114
    def load_private_key(self, path):
115 1
        self.private_key = uacrypto.load_private_key(path)
116
117 1
    def disable_clock(self, val=True):
118
        """
119
        for debugging you may want to disable clock that write every second
120
        to address space
121 1
        """
122 1
        self.iserver.disabled_clock = val
123 1
124 1
    def set_application_uri(self, uri):
125 1
        """
126 1
        Set application/server URI.
127
        This uri is supposed to be unique. If you intent to register
128 1
        your server to a discovery server, it really should be unique in
129
        your system!
130
        default is : "urn:freeopcua:python:server"
131
        """
132
        self.application_uri = uri
133
134
    def find_servers(self, uris=None):
135
        """
136 1
        find_servers. mainly implemented for simmetry with client
137 1
        """
138 1
        if uris is None:
139 1
            uris = []
140 1
        params = ua.FindServersParameters()
141 1
        params.EndpointUrl = self.endpoint.geturl()
142 1
        params.ServerUris = uris
143
        return self.iserver.find_servers(params)
144
145 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
146
        """
147
        Register to an OPC-UA Discovery server. Registering must be renewed at
148
        least every 10 minutes, so this method will use our asyncio thread to
149
        re-register every period seconds
150
        if period is 0 registration is not automatically renewed
151
        """
152 1
        # FIXME: habe a period per discovery
153
        if url in self._discovery_clients:
154
            self._discovery_clients[url].disconnect()
155
        self._discovery_clients[url] = Client(url)
156
        self._discovery_clients[url].connect()
157 1
        self._discovery_clients[url].register_server(self)
158
        self._discovery_period = period
159
        if period:
160
            self.iserver.loop.call_soon(self._renew_registration)
161
162
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
163
        """
164
        stop registration thread
165 1
        """
166
        # FIXME: is there really no way to deregister?
167
        self._discovery_clients[url].disconnect()
168
169
    def _renew_registration(self):
170
        for client in self._discovery_clients.values():
171 1
            client.register_server(self)
172 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
173
174 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
175 1
        """
176
        Create a client to discovery server and return it
177 1
        """
178
        client = Client(url)
179 1
        client.connect()
180 1
        return client
181 1
182 1
    def allow_remote_admin(self, allow):
183
        """
184 1
        Enable or disable the builtin Admin user from network clients
185
        """
186
        self.iserver.allow_remote_admin = allow
187
188
    def set_endpoint(self, url):
189 1
        self.endpoint = urlparse(url)
190
191 1
    def get_endpoints(self):
192
        return self.iserver.get_endpoints()
193
194
    def _setup_server_nodes(self):
195
        # to be called just before starting server since it needs all parameters to be setup
196 1
        self._set_endpoints()
197
        self._policies = [ua.SecurityPolicyFactory()]
198 1
        if self.certificate and self.private_key:
199
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
200
                                ua.MessageSecurityMode.SignAndEncrypt)
201
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
202
                                                           ua.MessageSecurityMode.SignAndEncrypt,
203 1
                                                           self.certificate,
204
                                                           self.private_key)
205 1
                                 )
206
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
207
                                ua.MessageSecurityMode.Sign)
208
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
209
                                                           ua.MessageSecurityMode.Sign,
210
                                                           self.certificate,
211 1
                                                           self.private_key)
212 1
                                 )
213 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
214 1
                                ua.MessageSecurityMode.SignAndEncrypt)
215
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
216 1
                                                           ua.MessageSecurityMode.SignAndEncrypt,
217 1
                                                           self.certificate,
218 1
                                                           self.private_key)
219
                                 )
220 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
221 1
                                ua.MessageSecurityMode.Sign)
222 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
223
                                                           ua.MessageSecurityMode.Sign,
224 1
                                                           self.certificate,
225 1
                                                           self.private_key)
226 1
                                 )
227
228 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
229 1
        idtoken = ua.UserTokenPolicy()
230 1
        idtoken.PolicyId = 'anonymous'
231 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
232 1
233 1
        idtoken2 = ua.UserTokenPolicy()
234
        idtoken2.PolicyId = 'certificate_basic256'
235 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
236 1
237 1
        idtoken3 = ua.UserTokenPolicy()
238 1
        idtoken3.PolicyId = 'certificate_basic128'
239 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
240 1
241 1
        idtoken4 = ua.UserTokenPolicy()
242 1
        idtoken4.PolicyId = 'username'
243 1
        idtoken4.TokenType = ua.UserTokenType.UserName
244 1
245 1
        appdesc = ua.ApplicationDescription()
246
        appdesc.ApplicationName = ua.LocalizedText(self.name)
247 1
        appdesc.ApplicationUri = self.application_uri
248
        appdesc.ApplicationType = self.application_type
249
        appdesc.ProductUri = self.product_uri
250 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
251
252
        edp = ua.EndpointDescription()
253
        edp.EndpointUrl = self.endpoint.geturl()
254 1
        edp.Server = appdesc
255 1
        if self.certificate:
256 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
257 1
        edp.SecurityMode = mode
258 1
        edp.SecurityPolicyUri = policy.URI
259
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
260 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
261
        edp.SecurityLevel = 0
262
        self.iserver.add_endpoint(edp)
263
264 1
    def set_server_name(self, name):
265 1
        self.name = name
266 1
267 1
    def start(self):
268
        """
269 1
        Start to listen on network
270
        """
271
        self._setup_server_nodes()
272
        self.iserver.start()
273 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
274
        self.bserver.set_policies(self._policies)
275 1
        self.bserver.start()
276
277
    def stop(self):
278
        """
279 1
        Stop server
280
        """
281 1
        for client in self._discovery_clients.values():
282
            client.disconnect()
283
        self.bserver.stop()
284
        self.iserver.stop()
285 1
286
    def get_root_node(self):
287 1
        """
288
        Get Root node of server. Returns a Node object.
289
        """
290
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
291 1
292
    def get_objects_node(self):
293 1
        """
294
        Get Objects node of server. Returns a Node object.
295
        """
296
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
297
298
    def get_server_node(self):
299 1
        """
300 1
        Get Server node of server. Returns a Node object.
301 1
        """
302 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
303 1
304 1
    def get_node(self, nodeid):
305 1
        """
306 1
        Get a specific node using NodeId object or a string representing a NodeId
307
        """
308 1
        return Node(self.iserver.isession, nodeid)
309
310
    def create_subscription(self, period, handler):
311
        """
312 1
        Create a subscription.
313 1
        returns a Subscription object which allow
314
        to subscribe to events or data on server
315 1
        """
316
        params = ua.CreateSubscriptionParameters()
317
        params.RequestedPublishingInterval = period
318
        params.RequestedLifetimeCount = 3000
319 1
        params.RequestedMaxKeepAliveCount = 10000
320 1
        params.MaxNotificationsPerPublish = 0
321 1
        params.PublishingEnabled = True
322 1
        params.Priority = 0
323 1
        return Subscription(self.iserver.isession, params, handler)
324
325 1
    def get_namespace_array(self):
326
        """
327
        get all namespace defined in server
328
        """
329 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
330 1
        return ns_node.get_value()
331
332 1
    def register_namespace(self, uri):
333
        """
334
        Register a new namespace. Nodes should in custom namespace, not 0.
335
        """
336
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
337 1
        uries = ns_node.get_value()
338 1
        if uri in uries:
339 1
            return uries.index(uri)
340
        uries.append(uri)
341 1
        ns_node.set_value(uries)
342
        return len(uries) - 1
343 1
344 1
    def get_namespace_index(self, uri):
345 1
        """
346 1
        get index of a namespace using its uri
347
        """
348 1
        uries = self.get_namespace_array()
349
        return uries.index(uri)
350 1
351 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
352 1
        """
353
        Returns an event object using an event type from address space.
354 1
        Use this object to fire events
355
        """
356 1
        if not etype:
357
            etype = BaseEvent()
358
        return EventGenerator(self.iserver.isession, etype, source)
359
360 1
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=[]):
361 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
362
363 1
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=[]):
364 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
365
366 1
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=[], variables=[], methods=[]):
367
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
368
369 1
    #def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
370
        #return self._create_custom_type(idx, name, basetype, properties)
371
372
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=[], variables=[], methods=[]):
373
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
374
375
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
376
        if isinstance(basetype, Node):
377
            base_t = basetype
378
        elif isinstance(basetype, ua.NodeId):
379
            base_t = Node(self.iserver.isession, basetype)
380
        else:
381
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
382
383
        custom_t = base_t.add_object_type(idx, name)
384
        for property in properties:
385
            datatype = None
386
            if len(property) > 2:
387
                datatype = property[2]
388
            custom_t.add_property(idx, property[0], None, varianttype=property[1], datatype=datatype)
389
        for variable in variables:
390
            datatype = None
391
            if len(variable) > 2:
392
                datatype = variable[2]
393
            custom_t.add_variable(idx, variable[0], None, varianttype=variable[1], datatype=datatype)
394
        for method in methods:
395
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
396
397
        return custom_t
398
399
    def import_xml(self, path):
400
        """
401
        Import nodes defined in xml
402
        """
403
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
404
        return importer.import_xml(path, self)
405
406
    def export_xml(self, nodes, path):
407
        """
408
        Export defined nodes to xml
409
        """
410
        exp = XmlExporter(self)
411
        uris = self.get_namespace_array()[2:]
412
        exp.build_etree(nodes, uris=uris)
413
        return exp.write_xml(path)
414
415
    def delete_nodes(self, nodes, recursive=False):
416
        return delete_nodes(self.iserver.isession, nodes, recursive)
417
418
    def historize_node(self, node):
419
        self.iserver.enable_history(node)
420
421
    def dehistorize_node(self, node):
422
        self.iserver.disable_history(node)
423
424
    def subscribe_server_callback(self, event, handle):
425
        self.iserver.subscribe_server_callback(event, handle)
426
427
    def unsubscribe_server_callback(self, event, handle):
428
        self.iserver.unsubscribe_server_callback(event, handle)
429
430
    def link_method(self, node, callback):
431
        self.iserver.isession.add_method_callback(node.nodeid, callback)
432