Completed
Pull Request — master (#232)
by
unknown
04:10
created

Server.unsubscribe_server_event()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
ccs 0
cts 0
cp 0
crap 2
rs 10
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.server.event_generator import EventGenerator
17 1
from opcua.common.node import Node
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
from opcua.common.event_objects import BaseEvent
24 1
use_crypto = True
25 1
try:
26
    from opcua.crypto import uacrypto
27
except ImportError:
28
    print("cryptography is not installed, use of crypto disabled")
29
    use_crypto = False
30
31 1
32
class Server(object):
33
34
    """
35
    High level Server class
36
37
    This class creates an opcua server with default values
38
39
    Create your own namespace and then populate your server address space
40
    using use the get_root() or get_objects() to get Node objects.
41
    and get_event_object() to fire events.
42
    Then start server. See example_server.py
43
    All methods are threadsafe
44
45
    If you need more flexibility you call directly the Ua Service methods
46
    on the iserver  or iserver.isesssion object members.
47
48
    During startup the standard address space will be constructed, which may be
49
    time-consuming when running a server on a less powerful device (e.g. a
50
    Raspberry Pi). In order to improve startup performance, a optional path to a
51
    cache file can be passed to the server constructor.
52
    If the parameter is defined, the address space will be loaded from the
53
    cache file or the file will be created if it does not exist yet.
54
    As a result the first startup will be even slower due to the cache file
55
    generation but all further startups will be significantly faster.
56
57
    :ivar application_uri:
58
    :vartype application_uri: uri
59
    :ivar product_uri:
60
    :vartype product_uri: uri
61
    :ivar name:
62
    :vartype name: string
63 1
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
64 1
    :vartype default_timeout: int
65 1
    :ivar iserver: internal server object
66 1
    :vartype default_timeout: InternalServer
67 1
    :ivar bserver: binary protocol server
68 1
    :vartype bserver: BinaryServer
69 1
70 1
    """
71 1
72 1
    def __init__(self, cacheFile = None):
73 1
        self.logger = logging.getLogger(__name__)
74 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
75 1
        self.application_uri = "urn:freeopcua:python:server"
76 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
77 1
        self.name = "FreeOpcUa Python Server"
78
        self.application_type = ua.ApplicationType.ClientAndServer
79
        self.default_timeout = 3600000
80 1
        self.iserver = InternalServer(cacheFile)
81 1
        self.bserver = None
82 1
        self._discovery_clients = {}
83
        self._discovery_period = 60
84 1
        self.certificate = None
85
        self.private_key = None
86
        self._policies = []
87
88 1
        # setup some expected values
89
        self.register_namespace(self.application_uri)
90
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
91 1
        sa_node.set_value([self.application_uri])
92
93
    def __enter__(self):
94
        self.start()
95 1
        return self
96
97 1
    def __exit__(self):
98 1
        self.stop()
99
100 1
    def load_certificate(self, path):
101
        """
102
        load server certificate from file, either pem or der
103
        """
104
        self.certificate = uacrypto.load_certificate(path)
105
106
    def load_private_key(self, path):
107 1
        self.private_key = uacrypto.load_private_key(path)
108
109
    def disable_clock(self, val=True):
110
        """
111
        for debugging you may want to disable clock that write every second
112
        to address space
113
        """
114
        self.iserver.disabled_clock = val
115 1
116
    def set_application_uri(self, uri):
117 1
        """
118
        Set application/server URI.
119
        This uri is supposed to be unique. If you intent to register
120
        your server to a discovery server, it really should be unique in
121 1
        your system!
122 1
        default is : "urn:freeopcua:python:server"
123 1
        """
124 1
        self.application_uri = uri
125 1
126 1
    def find_servers(self, uris=None):
127
        """
128 1
        find_servers. mainly implemented for simmetry with client
129
        """
130
        if uris is None:
131
            uris = []
132
        params = ua.FindServersParameters()
133
        params.EndpointUrl = self.endpoint.geturl()
134
        params.ServerUris = uris
135
        return self.iserver.find_servers(params)
136 1
137 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
138 1
        """
139 1
        Register to an OPC-UA Discovery server. Registering must be renewed at
140 1
        least every 10 minutes, so this method will use our asyncio thread to
141 1
        re-register every period seconds
142 1
        if period is 0 registration is not automatically renewed
143
        """
144
        # FIXME: habe a period per discovery
145 1
        if url in self._discovery_clients:
146
            self._discovery_clients[url].disconnect()
147
        self._discovery_clients[url] = Client(url)
148
        self._discovery_clients[url].connect()
149
        self._discovery_clients[url].register_server(self)
150
        self._discovery_period = period
151
        if period:
152 1
            self.iserver.loop.call_soon(self._renew_registration)
153
154
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
155
        """
156
        stop registration thread
157 1
        """
158
        # FIXME: is there really no way to deregister?
159
        self._discovery_clients[url].disconnect()
160
161
    def _renew_registration(self):
162
        for client in self._discovery_clients.values():
163
            client.register_server(self)
164
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
165 1
166
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
167
        """
168
        Create a client to discovery server and return it
169
        """
170
        client = Client(url)
171 1
        client.connect()
172 1
        return client
173
174 1
    def allow_remote_admin(self, allow):
175 1
        """
176
        Enable or disable the builtin Admin user from network clients
177 1
        """
178
        self.iserver.allow_remote_admin = allow
179 1
180 1
    def set_endpoint(self, url):
181 1
        self.endpoint = urlparse(url)
182 1
183
    def get_endpoints(self):
184 1
        return self.iserver.get_endpoints()
185
186
    def _setup_server_nodes(self):
187
        # to be called just before starting server since it needs all parameters to be setup
188
        self._set_endpoints()
189 1
        self._policies = [ua.SecurityPolicyFactory()]
190
        if self.certificate and self.private_key:
191 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
192
                                ua.MessageSecurityMode.SignAndEncrypt)
193
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
194
                                                           ua.MessageSecurityMode.SignAndEncrypt,
195
                                                           self.certificate,
196 1
                                                           self.private_key)
197
                                 )
198 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
199
                                ua.MessageSecurityMode.Sign)
200
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
201
                                                           ua.MessageSecurityMode.Sign,
202
                                                           self.certificate,
203 1
                                                           self.private_key)
204
                                 )
205 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
206
                                ua.MessageSecurityMode.SignAndEncrypt)
207
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
208
                                                           ua.MessageSecurityMode.SignAndEncrypt,
209
                                                           self.certificate,
210
                                                           self.private_key)
211 1
                                 )
212 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
213 1
                                ua.MessageSecurityMode.Sign)
214 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
215
                                                           ua.MessageSecurityMode.Sign,
216 1
                                                           self.certificate,
217 1
                                                           self.private_key)
218 1
                                 )
219
220 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
221 1
        idtoken = ua.UserTokenPolicy()
222 1
        idtoken.PolicyId = 'anonymous'
223
        idtoken.TokenType = ua.UserTokenType.Anonymous
224 1
225 1
        idtoken2 = ua.UserTokenPolicy()
226 1
        idtoken2.PolicyId = 'certificate_basic256'
227
        idtoken2.TokenType = ua.UserTokenType.Certificate
228 1
229 1
        idtoken3 = ua.UserTokenPolicy()
230 1
        idtoken3.PolicyId = 'certificate_basic128'
231 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
232 1
233 1
        idtoken4 = ua.UserTokenPolicy()
234
        idtoken4.PolicyId = 'username'
235 1
        idtoken4.TokenType = ua.UserTokenType.UserName
236 1
237 1
        appdesc = ua.ApplicationDescription()
238 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
239 1
        appdesc.ApplicationUri = self.application_uri
240 1
        appdesc.ApplicationType = self.application_type
241 1
        appdesc.ProductUri = self.product_uri
242 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
243 1
244 1
        edp = ua.EndpointDescription()
245 1
        edp.EndpointUrl = self.endpoint.geturl()
246
        edp.Server = appdesc
247 1
        if self.certificate:
248
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
249
        edp.SecurityMode = mode
250 1
        edp.SecurityPolicyUri = policy.URI
251
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
252
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
253
        edp.SecurityLevel = 0
254 1
        self.iserver.add_endpoint(edp)
255 1
256 1
    def set_server_name(self, name):
257 1
        self.name = name
258 1
259
    def start(self):
260 1
        """
261
        Start to listen on network
262
        """
263
        self._setup_server_nodes()
264 1
        self.iserver.start()
265 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
266 1
        self.bserver.set_policies(self._policies)
267 1
        self.bserver.start()
268
269 1
    def stop(self):
270
        """
271
        Stop server
272
        """
273 1
        for client in self._discovery_clients.values():
274
            client.disconnect()
275 1
        self.bserver.stop()
276
        self.iserver.stop()
277
278
    def get_root_node(self):
279 1
        """
280
        Get Root node of server. Returns a Node object.
281 1
        """
282
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
283
284
    def get_objects_node(self):
285 1
        """
286
        Get Objects node of server. Returns a Node object.
287 1
        """
288
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
289
290
    def get_server_node(self):
291 1
        """
292
        Get Server node of server. Returns a Node object.
293 1
        """
294
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
295
296
    def get_node(self, nodeid):
297
        """
298
        Get a specific node using NodeId object or a string representing a NodeId
299 1
        """
300 1
        return Node(self.iserver.isession, nodeid)
301 1
302 1
    def create_subscription(self, period, handler):
303 1
        """
304 1
        Create a subscription.
305 1
        returns a Subscription object which allow
306 1
        to subscribe to events or data on server
307
        """
308 1
        params = ua.CreateSubscriptionParameters()
309
        params.RequestedPublishingInterval = period
310
        params.RequestedLifetimeCount = 3000
311
        params.RequestedMaxKeepAliveCount = 10000
312 1
        params.MaxNotificationsPerPublish = 0
313 1
        params.PublishingEnabled = True
314
        params.Priority = 0
315 1
        return Subscription(self.iserver.isession, params, handler)
316
317
    def get_namespace_array(self):
318
        """
319 1
        get all namespace defined in server
320 1
        """
321 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
322 1
        return ns_node.get_value()
323 1
324
    def register_namespace(self, uri):
325 1
        """
326
        Register a new namespace. Nodes should in custom namespace, not 0.
327
        """
328
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
329 1
        uries = ns_node.get_value()
330 1
        uries.append(uri)
331
        ns_node.set_value(uries)
332 1
        return len(uries) - 1
333
334
    def get_namespace_index(self, uri):
335
        """
336
        get index of a namespace using its uri
337 1
        """
338 1
        uries = self.get_namespace_array()
339 1
        return uries.index(uri)
340
341 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
342
        """
343 1
        Returns an event object using an event type from address space.
344 1
        Use this object to fire events
345 1
        """
346 1
        if not etype:
347
            etype = BaseEvent()
348 1
        return EventGenerator(self.iserver.isession, etype, source)
349
350 1
    def create_custom_event_type(self, idx, name, baseetype=ua.ObjectIds.BaseEventType, properties=[]):
351 1
352 1
        if isinstance(baseetype, Node):
353
            base_event = baseetype
354 1
        elif isinstance(baseetype, ua.NodeId):
355
            base_event = Node(self.iserver.isession, baseetype)
356 1
        else:
357
            base_event = Node(self.iserver.isession, ua.NodeId(baseetype))
358
359
        custom_event = base_event.add_subtype(idx, name)
360 1
        for property in properties:
361 1
            custom_event.add_property(idx, property[0], ua.Variant(None, property[1]))
362
363 1
        return custom_event
364 1
365
    def import_xml(self, path):
366 1
        """
367
        import nodes defined in xml
368
        """
369 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
370
        importer.import_xml(path)
371
372
    def delete_nodes(self, nodes, recursive=False):
373
        return delete_nodes(self.iserver.isession, nodes, recursive)
374
375
    def historize_node(self, node):
376
        self.iserver.enable_history(node)
377
378
    def dehistorize_node(self, node):
379
        self.iserver.disable_history(node)
380
        
381
        
382
    def subscribe_server_event(self, event, handle):
383
        self.iserver.subscribe_server_event(event, handle)
384
        
385
    def unsubscribe_server_event(self, event, handle):
386
        self.iserver.unsubscribe_server_event(event, handle)
387