Completed
Push — master ( 00ad73...c8d9dd )
by Olivier
18:33 queued 13:54
created

opcua.server.Server.get_event_object()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 6
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.4285
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import Event
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
use_crypto = True
24 1
try:
25 1
    from opcua.crypto import uacrypto
26 1
except ImportError:
27 1
    print("cryptography is not installed, use of crypto disabled")
28 1
    use_crypto = False
29
30
31 1
class Server(object):
32
33
    """
34
    High level Server class
35
36
    This class creates an opcua server with default values
37
38
    Create your own namespace and then populate your server address space
39
    using use the get_root() or get_objects() to get Node objects.
40
    and get_event_object() to fire events.
41
    Then start server. See example_server.py
42
    All methods are threadsafe
43
44
    If you need more flexibility you call directly the Ua Service methods
45
    on the iserver  or iserver.isesssion object members.
46
47
48
    :ivar application_uri:
49
    :vartype application_uri: uri
50
    :ivar product_uri:
51
    :vartype product_uri: uri
52
    :ivar name:
53
    :vartype name: string
54
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
55
    :vartype default_timeout: int
56
    :ivar iserver: internal server object
57
    :vartype default_timeout: InternalServer
58
    :ivar bserver: binary protocol server
59
    :vartype bserver: BinaryServer
60
61
    """
62
63 1
    def __init__(self):
64 1
        self.logger = logging.getLogger(__name__)
65 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
66 1
        self.application_uri = "urn:freeopcua:python:server"
67 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
68 1
        self.name = "FreeOpcUa Python Server"
69 1
        self.application_type = ua.ApplicationType.ClientAndServer
70 1
        self.default_timeout = 3600000
71 1
        self.iserver = InternalServer()
72 1
        self.bserver = None
73 1
        self._discovery_clients = {}
74 1
        self._discovery_period = 60
75 1
        self.certificate = None
76 1
        self.private_key = None
77 1
        self._policies = []
78
79
        # setup some expected values
80 1
        self.register_namespace(self.application_uri)
81 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
82 1
        sa_node.set_value([self.application_uri])
83
84 1
    def load_certificate(self, path):
85
        """
86
        load server certificate from file, either pem or der
87
        """
88
        self.certificate = uacrypto.load_certificate(path)
89
90 1
    def load_private_key(self, path):
91
        self.private_key = uacrypto.load_private_key(path)
92
93 1
    def disable_clock(self, val=True):
94
        """
95
        for debugging you may want to disable clock that write every second
96
        to address space
97
        """
98
        self.iserver.disabled_clock = val
99
100 1
    def set_application_uri(self, uri):
101
        """
102
        Set application/server URI.
103
        This uri is supposed to be unique. If you intent to register
104
        your server to a discovery server, it really should be unique in
105
        your system!
106
        default is : "urn:freeopcua:python:server"
107
        """
108 1
        self.application_uri = uri
109
110 1
    def find_servers(self, uris=None):
111
        """
112
        find_servers. mainly implemented for simmetry with client
113
        """
114 1
        if uris is None:
115 1
            uris = []
116 1
        params = ua.FindServersParameters()
117 1
        params.EndpointUrl = self.endpoint.geturl()
118 1
        params.ServerUris = uris
119 1
        return self.iserver.find_servers(params)
120
121 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
122
        """
123
        Register to an OPC-UA Discovery server. Registering must be renewed at
124
        least every 10 minutes, so this method will use our asyncio thread to
125
        re-register every period seconds
126
        if period is 0 registration is not automatically renewed
127
        """
128
        # FIXME: habe a period per discovery 
129 1
        if url in self._discovery_clients:
130 1
            self._discovery_clients[url].disconnect()
131 1
        self._discovery_clients[url] = Client(url)
132 1
        self._discovery_clients[url].connect()
133 1
        self._discovery_clients[url].register_server(self)
134 1
        self._discovery_period = period
135 1
        if period:
136
            self.iserver.loop.call_soon(self._renew_registration)
137
138 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
139
        """
140
        stop registration thread
141
        """
142
        # FIXME: is there really no way to deregister?
143
        self._discovery_clients[url].disconnect()
144
145 1
    def _renew_registration(self):
146
        for client in self._discovery_clients.values():
147
            client.register_server(self)
148
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
149
150 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
151
        """
152
        Create a client to discovery server and return it
153
        """
154
        client = Client(url)
155
        client.connect()
156
        return client
157
158 1
    def allow_remote_admin(self, allow):
159
        """
160
        Enable or disable the builtin Admin user from network clients
161
        """
162
        self.iserver.allow_remote_admin = allow
163
164 1
    def set_endpoint(self, url):
165 1
        self.endpoint = urlparse(url)
166
167 1
    def get_endpoints(self):
168 1
        return self.iserver.get_endpoints()
169
170 1
    def _setup_server_nodes(self):
171
        # to be called just before starting server since it needs all parameters to be setup
172 1
        self._set_endpoints()
173 1
        self._policies = [ua.SecurityPolicyFactory()]
174 1
        if self.certificate and self.private_key:
175
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
176
                                ua.MessageSecurityMode.SignAndEncrypt)
177
            self._policies.append(ua.SecurityPolicyFactory(
178
                                security_policies.SecurityPolicyBasic128Rsa15,
179
                                ua.MessageSecurityMode.SignAndEncrypt,
180
                                self.certificate, self.private_key))
181
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
182
                                ua.MessageSecurityMode.Sign)
183
            self._policies.append(ua.SecurityPolicyFactory(
184
                                security_policies.SecurityPolicyBasic128Rsa15,
185
                                ua.MessageSecurityMode.Sign,
186
                                self.certificate, self.private_key))
187
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
188
                                ua.MessageSecurityMode.SignAndEncrypt)
189
            self._policies.append(ua.SecurityPolicyFactory(
190
                                security_policies.SecurityPolicyBasic256,
191
                                ua.MessageSecurityMode.SignAndEncrypt,
192
                                self.certificate, self.private_key))
193
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
194
                                ua.MessageSecurityMode.Sign)
195
            self._policies.append(ua.SecurityPolicyFactory(
196
                                security_policies.SecurityPolicyBasic256,
197
                                ua.MessageSecurityMode.Sign,
198
                                self.certificate, self.private_key))
199
200 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
201 1
        idtoken = ua.UserTokenPolicy()
202 1
        idtoken.PolicyId = 'anonymous'
203 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
204
205 1
        idtoken2 = ua.UserTokenPolicy()
206 1
        idtoken2.PolicyId = 'certificate_basic256'
207 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
208
209 1
        idtoken3 = ua.UserTokenPolicy()
210 1
        idtoken3.PolicyId = 'certificate_basic128'
211 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
212
213 1
        idtoken4 = ua.UserTokenPolicy()
214 1
        idtoken4.PolicyId = 'username'
215 1
        idtoken4.TokenType = ua.UserTokenType.UserName
216
217 1
        appdesc = ua.ApplicationDescription()
218 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
219 1
        appdesc.ApplicationUri = self.application_uri
220 1
        appdesc.ApplicationType = self.application_type
221 1
        appdesc.ProductUri = self.product_uri
222 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
223
224 1
        edp = ua.EndpointDescription()
225 1
        edp.EndpointUrl = self.endpoint.geturl()
226 1
        edp.Server = appdesc
227 1
        if self.certificate:
228
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
229 1
        edp.SecurityMode = mode
230 1
        edp.SecurityPolicyUri = policy.URI
231 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
232 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
233 1
        edp.SecurityLevel = 0
234 1
        self.iserver.add_endpoint(edp)
235
236 1
    def set_server_name(self, name):
237
        self.name = name
238
239 1
    def start(self):
240
        """
241
        Start to listen on network
242
        """
243 1
        self._setup_server_nodes()
244 1
        self.iserver.start()
245 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
246 1
        self.bserver.set_policies(self._policies)
247 1
        self.bserver.start()
248
249 1
    def stop(self):
250
        """
251
        Stop server
252
        """
253 1
        for client in self._discovery_clients.values():
254 1
            client.disconnect()
255 1
        self.bserver.stop()
256 1
        self.iserver.stop()
257
258 1
    def get_root_node(self):
259
        """
260
        Get Root node of server. Returns a Node object.
261
        """
262 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
263
264 1
    def get_objects_node(self):
265
        """
266
        Get Objects node of server. Returns a Node object.
267
        """
268 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
269
270 1
    def get_server_node(self):
271
        """
272
        Get Server node of server. Returns a Node object.
273
        """
274 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
275
276 1
    def get_node(self, nodeid):
277
        """
278
        Get a specific node using NodeId object or a string representing a NodeId
279
        """
280 1
        return Node(self.iserver.isession, nodeid)
281
282 1
    def create_subscription(self, period, handler):
283
        """
284
        Create a subscription.
285
        returns a Subscription object which allow
286
        to subscribe to events or data on server
287
        """
288 1
        params = ua.CreateSubscriptionParameters()
289 1
        params.RequestedPublishingInterval = period
290 1
        params.RequestedLifetimeCount = 3000
291 1
        params.RequestedMaxKeepAliveCount = 10000
292 1
        params.MaxNotificationsPerPublish = 0
293 1
        params.PublishingEnabled = True
294 1
        params.Priority = 0
295 1
        return Subscription(self.iserver.isession, params, handler)
296
297 1
    def get_namespace_array(self):
298
        """
299
        get all namespace defined in server
300
        """
301 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
302 1
        return ns_node.get_value()
303
304 1
    def register_namespace(self, uri):
305
        """
306
        Register a new namespace. Nodes should in custom namespace, not 0.
307
        """
308 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
309 1
        uries = ns_node.get_value()
310 1
        uries.append(uri)
311 1
        ns_node.set_value(uries)
312 1
        return (len(uries) - 1)
313
314 1
    def get_namespace_index(self, uri):
315
        """
316
        get index of a namespace using its uri
317
        """
318 1
        uries = self.get_namespace_array()
319 1
        return uries.index(uri)
320
321 1
    def get_event_object(self, etype=ua.ObjectIds.BaseEventType, source=ua.ObjectIds.Server):
322
        """
323
        Returns an event object using an event type from address space.
324
        Use this object to fire events
325
        """
326
        return Event(self.iserver.isession, etype, source)
327
328 1
    def import_xml(self, path):
329
        """
330
        import nodes defined in xml
331
        """
332 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
333 1
        importer.import_xml(path)
334
335 1
    def delete_nodes(self, nodes, recursive=False):
336 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
337
 
338