Completed
Pull Request — master (#312)
by
unknown
03:36
created

Server.link_method()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
ccs 0
cts 0
cp 0
crap 2
rs 10
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.server.event_generator import EventGenerator
17 1
from opcua.common.node import Node
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
from opcua.common.shortcuts import Shortcuts
25 1
use_crypto = True
26
try:
27
    from opcua.crypto import uacrypto
28
except ImportError:
29
    print("cryptography is not installed, use of crypto disabled")
30
    use_crypto = False
31 1
32
33
class Server(object):
34
35
    """
36
    High level Server class
37
38
    This class creates an opcua server with default values
39
40
    Create your own namespace and then populate your server address space
41
    using use the get_root() or get_objects() to get Node objects.
42
    and get_event_object() to fire events.
43
    Then start server. See example_server.py
44
    All methods are threadsafe
45
46
    If you need more flexibility you call directly the Ua Service methods
47
    on the iserver  or iserver.isesssion object members.
48
49
    During startup the standard address space will be constructed, which may be
50
    time-consuming when running a server on a less powerful device (e.g. a
51
    Raspberry Pi). In order to improve startup performance, a optional path to a
52
    cache file can be passed to the server constructor.
53
    If the parameter is defined, the address space will be loaded from the
54
    cache file or the file will be created if it does not exist yet.
55
    As a result the first startup will be even slower due to the cache file
56
    generation but all further startups will be significantly faster.
57
58
    :ivar application_uri:
59
    :vartype application_uri: uri
60
    :ivar product_uri:
61
    :vartype product_uri: uri
62
    :ivar name:
63 1
    :vartype name: string
64 1
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
65 1
    :vartype default_timeout: int
66 1
    :ivar iserver: internal server object
67 1
    :vartype default_timeout: InternalServer
68 1
    :ivar bserver: binary protocol server
69 1
    :vartype bserver: BinaryServer
70 1
    :ivar nodes: shortcuts to common nodes
71 1
    :vartype nodes: Shortcuts
72 1
73 1
    """
74 1
75 1
    def __init__(self, cacheFile=None, iserver=None):
76 1
        self.logger = logging.getLogger(__name__)
77 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
78
        self.application_uri = "urn:freeopcua:python:server"
79
        self.product_uri = "urn:freeopcua.github.no:python:server"
80 1
        self.name = "FreeOpcUa Python Server"
81 1
        self.application_type = ua.ApplicationType.ClientAndServer
82 1
        self.default_timeout = 3600000
83
        if iserver is not None:
84 1
            self.iserver = iserver
85
        else:
86
            self.iserver = InternalServer(cacheFile)
87
        self.bserver = None
88 1
        self._discovery_clients = {}
89
        self._discovery_period = 60
90
        self.certificate = None
91 1
        self.private_key = None
92
        self._policies = []
93
        self.nodes = Shortcuts(self.iserver.isession)
94
95 1
        # setup some expected values
96
        self.register_namespace(self.application_uri)
97 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
98 1
        sa_node.set_value([self.application_uri])
99
100 1
    def __enter__(self):
101
        self.start()
102
        return self
103
104
    def __exit__(self, exc_type, exc_value, traceback):
105
        self.stop()
106
107 1
    def load_certificate(self, path):
108
        """
109
        load server certificate from file, either pem or der
110
        """
111
        self.certificate = uacrypto.load_certificate(path)
112
113
    def load_private_key(self, path):
114
        self.private_key = uacrypto.load_private_key(path)
115 1
116
    def disable_clock(self, val=True):
117 1
        """
118
        for debugging you may want to disable clock that write every second
119
        to address space
120
        """
121 1
        self.iserver.disabled_clock = val
122 1
123 1
    def set_application_uri(self, uri):
124 1
        """
125 1
        Set application/server URI.
126 1
        This uri is supposed to be unique. If you intent to register
127
        your server to a discovery server, it really should be unique in
128 1
        your system!
129
        default is : "urn:freeopcua:python:server"
130
        """
131
        self.application_uri = uri
132
133
    def find_servers(self, uris=None):
134
        """
135
        find_servers. mainly implemented for simmetry with client
136 1
        """
137 1
        if uris is None:
138 1
            uris = []
139 1
        params = ua.FindServersParameters()
140 1
        params.EndpointUrl = self.endpoint.geturl()
141 1
        params.ServerUris = uris
142 1
        return self.iserver.find_servers(params)
143
144
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
145 1
        """
146
        Register to an OPC-UA Discovery server. Registering must be renewed at
147
        least every 10 minutes, so this method will use our asyncio thread to
148
        re-register every period seconds
149
        if period is 0 registration is not automatically renewed
150
        """
151
        # FIXME: habe a period per discovery
152 1
        if url in self._discovery_clients:
153
            self._discovery_clients[url].disconnect()
154
        self._discovery_clients[url] = Client(url)
155
        self._discovery_clients[url].connect()
156
        self._discovery_clients[url].register_server(self)
157 1
        self._discovery_period = period
158
        if period:
159
            self.iserver.loop.call_soon(self._renew_registration)
160
161
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
162
        """
163
        stop registration thread
164
        """
165 1
        # FIXME: is there really no way to deregister?
166
        self._discovery_clients[url].disconnect()
167
168
    def _renew_registration(self):
169
        for client in self._discovery_clients.values():
170
            client.register_server(self)
171 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
172 1
173
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
174 1
        """
175 1
        Create a client to discovery server and return it
176
        """
177 1
        client = Client(url)
178
        client.connect()
179 1
        return client
180 1
181 1
    def allow_remote_admin(self, allow):
182 1
        """
183
        Enable or disable the builtin Admin user from network clients
184 1
        """
185
        self.iserver.allow_remote_admin = allow
186
187
    def set_endpoint(self, url):
188
        self.endpoint = urlparse(url)
189 1
190
    def get_endpoints(self):
191 1
        return self.iserver.get_endpoints()
192
193
    def _setup_server_nodes(self):
194
        # to be called just before starting server since it needs all parameters to be setup
195
        self._set_endpoints()
196 1
        self._policies = [ua.SecurityPolicyFactory()]
197
        if self.certificate and self.private_key:
198 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
199
                                ua.MessageSecurityMode.SignAndEncrypt)
200
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
201
                                                           ua.MessageSecurityMode.SignAndEncrypt,
202
                                                           self.certificate,
203 1
                                                           self.private_key)
204
                                 )
205 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
206
                                ua.MessageSecurityMode.Sign)
207
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
208
                                                           ua.MessageSecurityMode.Sign,
209
                                                           self.certificate,
210
                                                           self.private_key)
211 1
                                 )
212 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
213 1
                                ua.MessageSecurityMode.SignAndEncrypt)
214 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
215
                                                           ua.MessageSecurityMode.SignAndEncrypt,
216 1
                                                           self.certificate,
217 1
                                                           self.private_key)
218 1
                                 )
219
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
220 1
                                ua.MessageSecurityMode.Sign)
221 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
222 1
                                                           ua.MessageSecurityMode.Sign,
223
                                                           self.certificate,
224 1
                                                           self.private_key)
225 1
                                 )
226 1
227
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
228 1
        idtoken = ua.UserTokenPolicy()
229 1
        idtoken.PolicyId = 'anonymous'
230 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
231 1
232 1
        idtoken2 = ua.UserTokenPolicy()
233 1
        idtoken2.PolicyId = 'certificate_basic256'
234
        idtoken2.TokenType = ua.UserTokenType.Certificate
235 1
236 1
        idtoken3 = ua.UserTokenPolicy()
237 1
        idtoken3.PolicyId = 'certificate_basic128'
238 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
239 1
240 1
        idtoken4 = ua.UserTokenPolicy()
241 1
        idtoken4.PolicyId = 'username'
242 1
        idtoken4.TokenType = ua.UserTokenType.UserName
243 1
244 1
        appdesc = ua.ApplicationDescription()
245 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
246
        appdesc.ApplicationUri = self.application_uri
247 1
        appdesc.ApplicationType = self.application_type
248
        appdesc.ProductUri = self.product_uri
249
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
250 1
251
        edp = ua.EndpointDescription()
252
        edp.EndpointUrl = self.endpoint.geturl()
253
        edp.Server = appdesc
254 1
        if self.certificate:
255 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
256 1
        edp.SecurityMode = mode
257 1
        edp.SecurityPolicyUri = policy.URI
258 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
259
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
260 1
        edp.SecurityLevel = 0
261
        self.iserver.add_endpoint(edp)
262
263
    def set_server_name(self, name):
264 1
        self.name = name
265 1
266 1
    def start(self):
267 1
        """
268
        Start to listen on network
269 1
        """
270
        self._setup_server_nodes()
271
        self.iserver.start()
272
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
273 1
        self.bserver.set_policies(self._policies)
274
        self.bserver.start()
275 1
276
    def stop(self):
277
        """
278
        Stop server
279 1
        """
280
        for client in self._discovery_clients.values():
281 1
            client.disconnect()
282
        self.bserver.stop()
283
        self.iserver.stop()
284
285 1
    def get_root_node(self):
286
        """
287 1
        Get Root node of server. Returns a Node object.
288
        """
289
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
290
291 1
    def get_objects_node(self):
292
        """
293 1
        Get Objects node of server. Returns a Node object.
294
        """
295
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
296
297
    def get_server_node(self):
298
        """
299 1
        Get Server node of server. Returns a Node object.
300 1
        """
301 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
302 1
303 1
    def get_node(self, nodeid):
304 1
        """
305 1
        Get a specific node using NodeId object or a string representing a NodeId
306 1
        """
307
        return Node(self.iserver.isession, nodeid)
308 1
309
    def create_subscription(self, period, handler):
310
        """
311
        Create a subscription.
312 1
        returns a Subscription object which allow
313 1
        to subscribe to events or data on server
314
        """
315 1
        params = ua.CreateSubscriptionParameters()
316
        params.RequestedPublishingInterval = period
317
        params.RequestedLifetimeCount = 3000
318
        params.RequestedMaxKeepAliveCount = 10000
319 1
        params.MaxNotificationsPerPublish = 0
320 1
        params.PublishingEnabled = True
321 1
        params.Priority = 0
322 1
        return Subscription(self.iserver.isession, params, handler)
323 1
324
    def get_namespace_array(self):
325 1
        """
326
        get all namespace defined in server
327
        """
328
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
329 1
        return ns_node.get_value()
330 1
331
    def register_namespace(self, uri):
332 1
        """
333
        Register a new namespace. Nodes should in custom namespace, not 0.
334
        """
335
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
336
        uries = ns_node.get_value()
337 1
        if uri in uries:
338 1
            return uries.index(uri)
339 1
        uries.append(uri)
340
        ns_node.set_value(uries)
341 1
        return len(uries) - 1
342
343 1
    def get_namespace_index(self, uri):
344 1
        """
345 1
        get index of a namespace using its uri
346 1
        """
347
        uries = self.get_namespace_array()
348 1
        return uries.index(uri)
349
350 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
351 1
        """
352 1
        Returns an event object using an event type from address space.
353
        Use this object to fire events
354 1
        """
355
        if not etype:
356 1
            etype = BaseEvent()
357
        return EventGenerator(self.iserver.isession, etype, source)
358
359
    def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=[]):
360 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
361 1
362
    def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=[]):
363 1
        return self._create_custom_type(idx, name, basetype, properties, [], [])
364 1
365
    def create_custom_object_type(self, idx, name, basetype=ua.ObjectIds.BaseObjectType, properties=[], variables=[], methods=[]):
366 1
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
367
368
    #def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
369 1
        #return self._create_custom_type(idx, name, basetype, properties)
370
371
    def create_custom_variable_type(self, idx, name, basetype=ua.ObjectIds.BaseVariableType, properties=[], variables=[], methods=[]):
372
        return self._create_custom_type(idx, name, basetype, properties, variables, methods)
373
374
    def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
375
        if isinstance(basetype, Node):
376
            base_t = basetype
377
        elif isinstance(basetype, ua.NodeId):
378
            base_t = Node(self.iserver.isession, basetype)
379
        else:
380
            base_t = Node(self.iserver.isession, ua.NodeId(basetype))
381
382
        custom_t = base_t.add_object_type(idx, name)
383
        for property in properties:
384
            datatype = None
385
            if len(property) > 2:
386
                datatype = property[2]
387
            custom_t.add_property(idx, property[0], None, varianttype=property[1], datatype=datatype)
388
        for variable in variables:
389
            datatype = None
390
            if len(variable) > 2:
391
                datatype = variable[2]
392
            custom_t.add_variable(idx, variable[0], None, varianttype=variable[1], datatype=datatype)
393
        for method in methods:
394
            custom_t.add_method(idx, method[0], method[1], method[2], method[3])
395
396
        return custom_t
397
398
    def import_xml(self, path):
399
        """
400
        import nodes defined in xml
401
        """
402
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
403
        return importer.import_xml(path, self)
404
405
    def delete_nodes(self, nodes, recursive=False):
406
        return delete_nodes(self.iserver.isession, nodes, recursive)
407
408
    def historize_node(self, node):
409
        self.iserver.enable_history(node)
410
411
    def dehistorize_node(self, node):
412
        self.iserver.disable_history(node)
413
414
    def subscribe_server_callback(self, event, handle):
415
        self.iserver.subscribe_server_callback(event, handle)
416
417
    def unsubscribe_server_callback(self, event, handle):
418
        self.iserver.unsubscribe_server_callback(event, handle)
419
420
    def link_method(self, node, callback):
421
        node.server.add_method_callback(node.nodeid, callback)
422