Passed
Push — dev ( 3fa5c0...bfa3a0 )
by Olivier
02:27
created

opcua.server.Server.delete_nodes()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2
Metric Value
cc 2
dl 0
loc 8
ccs 8
cts 8
cp 1
crap 2
rs 9.4285
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import Event
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.client.client import Client
21 1
from opcua.crypto import security_policies
22 1
use_crypto = True
23 1
try:
24 1
    from opcua.crypto import uacrypto
25 1
except ImportError:
26 1
    print("cryptography is not installed, use of crypto disabled")
27 1
    use_crypto = False
28
29
30 1
class Server(object):
31
32
    """
33
    High level Server class
34
    Create an opcua server with default values
35
    The class is very short. Users are adviced to read the code.
36
    Create your own namespace and then populate your server address space
37
    using use the get_root() or get_objects() to get Node objects.
38
    and get_event_object() to fire events.
39
    Then start server. See example_server.py
40
    All methods are threadsafe
41
42
43
    :ivar application_uri:
44
    :vartype application_uri: uri
45
    :ivar product_uri:
46
    :vartype product_uri: uri
47
    :ivar name:
48
    :vartype name: string
49
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
50
    :vartype default_timeout: int
51
    :ivar iserver: internal server object
52
    :vartype default_timeout: InternalServer
53
    :ivar bserver: binary protocol server
54
    :vartype bserver: BinaryServer
55
56
    """
57
58 1
    def __init__(self):
59 1
        self.logger = logging.getLogger(__name__)
60 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
61 1
        self.application_uri = "urn:freeopcua:python:server"
62 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
63 1
        self.name = "FreeOpcUa Python Server"
64 1
        self.application_type = ua.ApplicationType.ClientAndServer
65 1
        self.default_timeout = 3600000
66 1
        self.iserver = InternalServer()
67 1
        self.bserver = None
68 1
        self._discovery_clients = {}
69 1
        self._discovery_period = 60
70 1
        self.certificate = None
71 1
        self.private_key = None
72 1
        self._policies = []
73
74
        # setup some expected values
75 1
        self.register_namespace(self.application_uri)
76 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
77 1
        sa_node.set_value([self.application_uri])
78
79 1
    def load_certificate(self, path):
80
        """
81
        load server certificate from file, either pem or der
82
        """
83
        self.certificate = uacrypto.load_certificate(path)
84
85 1
    def load_private_key(self, path):
86
        self.private_key = uacrypto.load_private_key(path)
87
88 1
    def disable_clock(self, val=True):
89
        """
90
        for debugging you may want to disable clock that write every second
91
        to address space
92
        """
93
        self.iserver.disabled_clock = val
94
95 1
    def set_application_uri(self, uri):
96
        """
97
        Set application/server URI.
98
        This uri is supposed to be unique. If you intent to register
99
        your server to a discovery server, it really should be unique in
100
        your system!
101
        default is : "urn:freeopcua:python:server"
102
        """
103 1
        self.application_uri = uri
104
105 1
    def find_servers(self, uris=None):
106
        """
107
        find_servers. mainly implemented for simmetry with client
108
        """
109 1
        if uris is None:
110 1
            uris = []
111 1
        params = ua.FindServersParameters()
112 1
        params.EndpointUrl = self.endpoint.geturl()
113 1
        params.ServerUris = uris
114 1
        return self.iserver.find_servers(params)
115
116 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
117
        """
118
        Register to an OPC-UA Discovery server. Registering must be renewed at
119
        least every 10 minutes, so this method will use our asyncio thread to
120
        re-register every period seconds
121
        if period is 0 registration is not automatically renewed
122
        """
123
        # FIXME: habe a period per discovery 
124 1
        if url in self._discovery_clients:
125 1
            self._discovery_clients[url].disconnect()
126 1
        self._discovery_clients[url] = Client(url)
127 1
        self._discovery_clients[url].connect()
128 1
        self._discovery_clients[url].register_server(self)
129 1
        self._discovery_period = period
130 1
        if period:
131
            self.iserver.loop.call_soon(self._renew_registration)
132
133 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
134
        """
135
        stop registration thread
136
        """
137
        # FIXME: is there really no way to deregister?
138
        self._discovery_clients[url].disconnect()
139
140 1
    def _renew_registration(self):
141
        for client in self._discovery_clients.values():
142
            client.register_server(self)
143
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
144
145 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
146
        """
147
        Create a client to discovery server and return it
148
        """
149
        client = Client(url)
150
        client.connect()
151
        return client
152
153 1
    def allow_remote_admin(self, allow):
154
        """
155
        Enable or disable the builtin Admin user from network clients
156
        """
157
        self.iserver.allow_remote_admin = allow
158
159 1
    def set_endpoint(self, url):
160 1
        self.endpoint = urlparse(url)
161
162 1
    def get_endpoints(self):
163 1
        return self.iserver.get_endpoints()
164
165 1
    def _setup_server_nodes(self):
166
        # to be called just before starting server since it needs all parameters to be setup
167 1
        self._set_endpoints()
168 1
        self._policies = [ua.SecurityPolicyFactory()]
169 1
        if self.certificate and self.private_key:
170
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
171
                                ua.MessageSecurityMode.SignAndEncrypt)
172
            self._policies.append(ua.SecurityPolicyFactory(
173
                                security_policies.SecurityPolicyBasic128Rsa15,
174
                                ua.MessageSecurityMode.SignAndEncrypt,
175
                                self.certificate, self.private_key))
176
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
177
                                ua.MessageSecurityMode.Sign)
178
            self._policies.append(ua.SecurityPolicyFactory(
179
                                security_policies.SecurityPolicyBasic128Rsa15,
180
                                ua.MessageSecurityMode.Sign,
181
                                self.certificate, self.private_key))
182
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
183
                                ua.MessageSecurityMode.SignAndEncrypt)
184
            self._policies.append(ua.SecurityPolicyFactory(
185
                                security_policies.SecurityPolicyBasic256,
186
                                ua.MessageSecurityMode.SignAndEncrypt,
187
                                self.certificate, self.private_key))
188
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
189
                                ua.MessageSecurityMode.Sign)
190
            self._policies.append(ua.SecurityPolicyFactory(
191
                                security_policies.SecurityPolicyBasic256,
192
                                ua.MessageSecurityMode.Sign,
193
                                self.certificate, self.private_key))
194
195 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
196 1
        idtoken = ua.UserTokenPolicy()
197 1
        idtoken.PolicyId = 'anonymous'
198 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
199
200 1
        idtoken2 = ua.UserTokenPolicy()
201 1
        idtoken2.PolicyId = 'certificate_basic256'
202 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
203
204 1
        idtoken3 = ua.UserTokenPolicy()
205 1
        idtoken3.PolicyId = 'certificate_basic128'
206 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
207
208 1
        idtoken4 = ua.UserTokenPolicy()
209 1
        idtoken4.PolicyId = 'username'
210 1
        idtoken4.TokenType = ua.UserTokenType.UserName
211
212 1
        appdesc = ua.ApplicationDescription()
213 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
214 1
        appdesc.ApplicationUri = self.application_uri
215 1
        appdesc.ApplicationType = self.application_type
216 1
        appdesc.ProductUri = self.product_uri
217 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
218
219 1
        edp = ua.EndpointDescription()
220 1
        edp.EndpointUrl = self.endpoint.geturl()
221 1
        edp.Server = appdesc
222 1
        if self.certificate:
223
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
224 1
        edp.SecurityMode = mode
225 1
        edp.SecurityPolicyUri = policy.URI
226 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
227 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
228 1
        edp.SecurityLevel = 0
229 1
        self.iserver.add_endpoint(edp)
230
231 1
    def set_server_name(self, name):
232
        self.name = name
233
234 1
    def start(self):
235
        """
236
        Start to listen on network
237
        """
238 1
        self._setup_server_nodes()
239 1
        self.iserver.start()
240 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
241 1
        self.bserver.set_policies(self._policies)
242 1
        self.bserver.start()
243
244 1
    def stop(self):
245
        """
246
        Stop server
247
        """
248 1
        for client in self._discovery_clients.values():
249 1
            client.disconnect()
250 1
        self.bserver.stop()
251 1
        self.iserver.stop()
252
253 1
    def get_root_node(self):
254
        """
255
        Get Root node of server. Returns a Node object.
256
        """
257 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
258
259 1
    def get_objects_node(self):
260
        """
261
        Get Objects node of server. Returns a Node object.
262
        """
263 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
264
265 1
    def get_server_node(self):
266
        """
267
        Get Server node of server. Returns a Node object.
268
        """
269 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
270
271 1
    def get_node(self, nodeid):
272
        """
273
        Get a specific node using NodeId object or a string representing a NodeId
274
        """
275 1
        return Node(self.iserver.isession, nodeid)
276
277 1
    def create_subscription(self, period, handler):
278
        """
279
        Create a subscription.
280
        returns a Subscription object which allow
281
        to subscribe to events or data on server
282
        """
283 1
        params = ua.CreateSubscriptionParameters()
284 1
        params.RequestedPublishingInterval = period
285 1
        params.RequestedLifetimeCount = 3000
286 1
        params.RequestedMaxKeepAliveCount = 10000
287 1
        params.MaxNotificationsPerPublish = 0
288 1
        params.PublishingEnabled = True
289 1
        params.Priority = 0
290 1
        return Subscription(self.iserver.isession, params, handler)
291
292 1
    def get_namespace_array(self):
293
        """
294
        get all namespace defined in server
295
        """
296 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
297 1
        return ns_node.get_value()
298
299 1
    def register_namespace(self, uri):
300
        """
301
        Register a new namespace. Nodes should in custom namespace, not 0.
302
        """
303 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
304 1
        uries = ns_node.get_value()
305 1
        uries.append(uri)
306 1
        ns_node.set_value(uries)
307 1
        return (len(uries) - 1)
308
309 1
    def get_namespace_index(self, uri):
310
        """
311
        get index of a namespace using its uri
312
        """
313 1
        uries = self.get_namespace_array()
314 1
        return uries.index(uri)
315
316 1
    def get_event_object(self, etype=ua.ObjectIds.BaseEventType, source=ua.ObjectIds.Server):
317
        """
318
        Returns an event object using an event type from address space.
319
        Use this object to fire events
320
        """
321
        return Event(self.iserver.isession, etype, source)
322
323 1
    def import_xml(self, path):
324
        """
325
        import nodes defined in xml
326
        """
327 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
328 1
        importer.import_xml(path)
329
330 1
    def delete_nodes(self, nodes):
331 1
        nodestodelete = []
332 1
        for node in nodes:
333 1
            it = ua.DeleteNodesItem()
334 1
            it.NodeId = node.nodeid
335 1
            it.DeleteTargetReferences = True
336 1
            nodestodelete.append(it)
337 1
        return self.iserver.isession.delete_nodes(nodestodelete)
338
 
339