Completed
Pull Request — master (#161)
by Denis
04:02
created

Server.get_event_generator()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2
Metric Value
cc 2
dl 0
loc 8
ccs 4
cts 4
cp 1
crap 2
rs 9.4285
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import EventGenerator
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
use_crypto = True
24 1
try:
25 1
    from opcua.crypto import uacrypto
26 1
except ImportError:
27 1
    print("cryptography is not installed, use of crypto disabled")
28 1
    use_crypto = False
29
30
31 1
class Server(object):
32
33
    """
34
    High level Server class
35
36
    This class creates an opcua server with default values
37
38
    Create your own namespace and then populate your server address space
39
    using use the get_root() or get_objects() to get Node objects.
40
    and get_event_object() to fire events.
41
    Then start server. See example_server.py
42
    All methods are threadsafe
43
44
    If you need more flexibility you call directly the Ua Service methods
45
    on the iserver  or iserver.isesssion object members.
46
47
48
    :ivar application_uri:
49
    :vartype application_uri: uri
50
    :ivar product_uri:
51
    :vartype product_uri: uri
52
    :ivar name:
53
    :vartype name: string
54
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
55
    :vartype default_timeout: int
56
    :ivar iserver: internal server object
57
    :vartype default_timeout: InternalServer
58
    :ivar bserver: binary protocol server
59
    :vartype bserver: BinaryServer
60
61
    """
62
63 1
    def __init__(self):
64 1
        self.logger = logging.getLogger(__name__)
65 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
66 1
        self.application_uri = "urn:freeopcua:python:server"
67 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
68 1
        self.name = "FreeOpcUa Python Server"
69 1
        self.application_type = ua.ApplicationType.ClientAndServer
70 1
        self.default_timeout = 3600000
71 1
        self.iserver = InternalServer()
72 1
        self.bserver = None
73 1
        self._discovery_clients = {}
74 1
        self._discovery_period = 60
75 1
        self.certificate = None
76 1
        self.private_key = None
77 1
        self._policies = []
78
79
        # setup some expected values
80 1
        self.register_namespace(self.application_uri)
81 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
82 1
        sa_node.set_value([self.application_uri])
83
84 1
    def __enter__(self):
85
        self.start()
86
        return self
87
88 1
    def __exit__(self):
89
        self.stop()
90
91 1
    def load_certificate(self, path):
92
        """
93
        load server certificate from file, either pem or der
94
        """
95
        self.certificate = uacrypto.load_certificate(path)
96
97 1
    def load_private_key(self, path):
98
        self.private_key = uacrypto.load_private_key(path)
99
100 1
    def disable_clock(self, val=True):
101
        """
102
        for debugging you may want to disable clock that write every second
103
        to address space
104
        """
105
        self.iserver.disabled_clock = val
106
107 1
    def set_application_uri(self, uri):
108
        """
109
        Set application/server URI.
110
        This uri is supposed to be unique. If you intent to register
111
        your server to a discovery server, it really should be unique in
112
        your system!
113
        default is : "urn:freeopcua:python:server"
114
        """
115 1
        self.application_uri = uri
116
117 1
    def find_servers(self, uris=None):
118
        """
119
        find_servers. mainly implemented for simmetry with client
120
        """
121 1
        if uris is None:
122 1
            uris = []
123 1
        params = ua.FindServersParameters()
124 1
        params.EndpointUrl = self.endpoint.geturl()
125 1
        params.ServerUris = uris
126 1
        return self.iserver.find_servers(params)
127
128 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
129
        """
130
        Register to an OPC-UA Discovery server. Registering must be renewed at
131
        least every 10 minutes, so this method will use our asyncio thread to
132
        re-register every period seconds
133
        if period is 0 registration is not automatically renewed
134
        """
135
        # FIXME: habe a period per discovery
136 1
        if url in self._discovery_clients:
137 1
            self._discovery_clients[url].disconnect()
138 1
        self._discovery_clients[url] = Client(url)
139 1
        self._discovery_clients[url].connect()
140 1
        self._discovery_clients[url].register_server(self)
141 1
        self._discovery_period = period
142 1
        if period:
143
            self.iserver.loop.call_soon(self._renew_registration)
144
145 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
146
        """
147
        stop registration thread
148
        """
149
        # FIXME: is there really no way to deregister?
150
        self._discovery_clients[url].disconnect()
151
152 1
    def _renew_registration(self):
153
        for client in self._discovery_clients.values():
154
            client.register_server(self)
155
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
156
157 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
158
        """
159
        Create a client to discovery server and return it
160
        """
161
        client = Client(url)
162
        client.connect()
163
        return client
164
165 1
    def allow_remote_admin(self, allow):
166
        """
167
        Enable or disable the builtin Admin user from network clients
168
        """
169
        self.iserver.allow_remote_admin = allow
170
171 1
    def set_endpoint(self, url):
172 1
        self.endpoint = urlparse(url)
173
174 1
    def get_endpoints(self):
175 1
        return self.iserver.get_endpoints()
176
177 1
    def _setup_server_nodes(self):
178
        # to be called just before starting server since it needs all parameters to be setup
179 1
        self._set_endpoints()
180 1
        self._policies = [ua.SecurityPolicyFactory()]
181 1
        if self.certificate and self.private_key:
182
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
183
                                ua.MessageSecurityMode.SignAndEncrypt)
184
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
185
                                                           ua.MessageSecurityMode.SignAndEncrypt,
186
                                                           self.certificate,
187
                                                           self.private_key)
188
                                 )
189
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
190
                                ua.MessageSecurityMode.Sign)
191
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
192
                                                           ua.MessageSecurityMode.Sign,
193
                                                           self.certificate,
194
                                                           self.private_key)
195
                                 )
196
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
197
                                ua.MessageSecurityMode.SignAndEncrypt)
198
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
199
                                                           ua.MessageSecurityMode.SignAndEncrypt,
200
                                                           self.certificate,
201
                                                           self.private_key)
202
                                 )
203
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
204
                                ua.MessageSecurityMode.Sign)
205
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
206
                                                           ua.MessageSecurityMode.Sign,
207
                                                           self.certificate,
208
                                                           self.private_key)
209
                                 )
210
211 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
212 1
        idtoken = ua.UserTokenPolicy()
213 1
        idtoken.PolicyId = 'anonymous'
214 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
215
216 1
        idtoken2 = ua.UserTokenPolicy()
217 1
        idtoken2.PolicyId = 'certificate_basic256'
218 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
219
220 1
        idtoken3 = ua.UserTokenPolicy()
221 1
        idtoken3.PolicyId = 'certificate_basic128'
222 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
223
224 1
        idtoken4 = ua.UserTokenPolicy()
225 1
        idtoken4.PolicyId = 'username'
226 1
        idtoken4.TokenType = ua.UserTokenType.UserName
227
228 1
        appdesc = ua.ApplicationDescription()
229 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
230 1
        appdesc.ApplicationUri = self.application_uri
231 1
        appdesc.ApplicationType = self.application_type
232 1
        appdesc.ProductUri = self.product_uri
233 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
234
235 1
        edp = ua.EndpointDescription()
236 1
        edp.EndpointUrl = self.endpoint.geturl()
237 1
        edp.Server = appdesc
238 1
        if self.certificate:
239
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
240 1
        edp.SecurityMode = mode
241 1
        edp.SecurityPolicyUri = policy.URI
242 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
243 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
244 1
        edp.SecurityLevel = 0
245 1
        self.iserver.add_endpoint(edp)
246
247 1
    def set_server_name(self, name):
248
        self.name = name
249
250 1
    def start(self):
251
        """
252
        Start to listen on network
253
        """
254 1
        self._setup_server_nodes()
255 1
        self.iserver.start()
256 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
257 1
        self.bserver.set_policies(self._policies)
258 1
        self.bserver.start()
259
260 1
    def stop(self):
261
        """
262
        Stop server
263
        """
264 1
        for client in self._discovery_clients.values():
265 1
            client.disconnect()
266 1
        self.bserver.stop()
267 1
        self.iserver.stop()
268
269 1
    def get_root_node(self):
270
        """
271
        Get Root node of server. Returns a Node object.
272
        """
273 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
274
275 1
    def get_objects_node(self):
276
        """
277
        Get Objects node of server. Returns a Node object.
278
        """
279 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
280
281 1
    def get_server_node(self):
282
        """
283
        Get Server node of server. Returns a Node object.
284
        """
285 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
286
287 1
    def get_node(self, nodeid):
288
        """
289
        Get a specific node using NodeId object or a string representing a NodeId
290
        """
291 1
        return Node(self.iserver.isession, nodeid)
292
293 1
    def create_subscription(self, period, handler):
294
        """
295
        Create a subscription.
296
        returns a Subscription object which allow
297
        to subscribe to events or data on server
298
        """
299 1
        params = ua.CreateSubscriptionParameters()
300 1
        params.RequestedPublishingInterval = period
301 1
        params.RequestedLifetimeCount = 3000
302 1
        params.RequestedMaxKeepAliveCount = 10000
303 1
        params.MaxNotificationsPerPublish = 0
304 1
        params.PublishingEnabled = True
305 1
        params.Priority = 0
306 1
        return Subscription(self.iserver.isession, params, handler)
307
308 1
    def get_namespace_array(self):
309
        """
310
        get all namespace defined in server
311
        """
312 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
313 1
        return ns_node.get_value()
314
315 1
    def register_namespace(self, uri):
316
        """
317
        Register a new namespace. Nodes should in custom namespace, not 0.
318
        """
319 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
320 1
        uries = ns_node.get_value()
321 1
        uries.append(uri)
322 1
        ns_node.set_value(uries)
323 1
        return len(uries) - 1
324
325 1
    def get_namespace_index(self, uri):
326
        """
327
        get index of a namespace using its uri
328
        """
329 1
        uries = self.get_namespace_array()
330 1
        return uries.index(uri)
331
332 1
    def get_event_generator(self, etype=None, source=ua.ObjectIds.Server):
333
        """
334
        Returns an event object using an event type from address space.
335
        Use this object to fire events
336
        """
337 1
        if not etype:
338 1
            etype = ua.BaseEvent()
339 1
        return EventGenerator(self.iserver.isession, etype, source)
340
341 1
    def create_custom_event_type(self, idx, name, baseetype=ua.ObjectIds.BaseEventType, properties=[]):
342
343 1
        if isinstance(baseetype, Node):
344 1
            base_event = baseetype
345 1
        elif isinstance(baseetype, ua.NodeId):
346 1
            base_event = Node(self.iserver.isession, baseetype)
347
        else:
348 1
            base_event = Node(self.iserver.isession, ua.NodeId(baseetype))
349
350 1
        custom_event = base_event.add_subtype(idx, name)
351 1
        for property in properties:
352 1
            custom_event.add_property(idx, property[0], ua.Variant(None, property[1]))
353
354 1
        return custom_event
355
356 1
    def import_xml(self, path):
357
        """
358
        import nodes defined in xml
359
        """
360 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
361 1
        importer.import_xml(path)
362
363 1
    def delete_nodes(self, nodes, recursive=False):
364 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
365
366 1
    def historize_node(self, node):
367
        self.iserver.enable_history(node)
368
369 1
    def dehistorize_node(self, node):
370
        self.iserver.disable_history(node)
371