Completed
Pull Request — master (#104)
by
unknown
66:47 queued 61:24
created

opcua.server.Server.get_objects_node()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
cc 1
dl 0
loc 5
ccs 0
cts 1
cp 0
crap 2
rs 9.4286
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import Event
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.client.client import Client
21
from opcua.crypto import security_policies
22
use_crypto = True
23 1
try:
24
    from opcua.crypto import uacrypto
25
except ImportError:
26
    print("cryptography is not installed, use of crypto disabled")
27
    use_crypto = False
28
29
30
class Server(object):
31
32
    """
33
    High level Server class
34
    Create an opcua server with default values
35
    The class is very short. Users are adviced to read the code.
36
    Create your own namespace and then populate your server address space
37
    using use the get_root() or get_objects() to get Node objects.
38
    and get_event_object() to fire events.
39
    Then start server. See example_server.py
40
    All methods are threadsafe
41
42
43
    :ivar application_uri:
44
    :vartype application_uri: uri
45
    :ivar product_uri:
46
    :vartype product_uri: uri
47
    :ivar name:
48
    :vartype name: string
49
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
50
    :vartype default_timeout: int
51 1
    :ivar iserver: internal server object
52 1
    :vartype default_timeout: InternalServer
53 1
    :ivar bserver: binary protocol server
54 1
    :vartype bserver: BinaryServer
55 1
56 1
    """
57 1
58 1
    def __init__(self):
59 1
        self.logger = logging.getLogger(__name__)
60 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4841/freeopcua/server/")
61 1
        self.application_uri = "urn:freeopcua:python:server"
62 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
63
        self.name = "FreeOpcUa Python Server"
64
        self.application_type = ua.ApplicationType.ClientAndServer
65 1
        self.default_timeout = 3600000
66 1
        self.iserver = InternalServer()
67 1
        self.bserver = None
68
        self._discovery_clients = {}
69 1
        self._discovery_period = 60
70
        self.certificate = None
71
        self.private_key = None
72
        self._policies = []
73
74
        # setup some expected values
75
        self.register_namespace(self.application_uri)
76 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
77
        sa_node.set_value([self.application_uri])
78
79
    def load_certificate(self, path):
80
        """
81
        load server certificate from file, either pem or der
82
        """
83
        self.certificate = uacrypto.load_certificate(path)
84 1
85
    def load_private_key(self, path):
86 1
        self.private_key = uacrypto.load_private_key(path)
87
88
    def disable_clock(self, val=True):
89
        """
90 1
        for debugging you may want to disable clock that write every second
91 1
        to address space
92 1
        """
93 1
        self.iserver.disabled_clock = val
94 1
95 1
    def set_application_uri(self, uri):
96
        """
97 1
        Set application/server URI.
98
        This uri is supposed to be unique. If you intent to register
99
        your server to a discovery server, it really should be unique in
100
        your system!
101
        default is : "urn:freeopcua:python:server"
102
        """
103 1
        self.application_uri = uri
104 1
105 1
    def find_servers(self, uris=None):
106 1
        """
107 1
        find_servers. mainly implemented for simmetry with client
108 1
        """
109 1
        if uris is None:
110
            uris = []
111 1
        params = ua.FindServersParameters()
112 1
        params.EndpointUrl = self.endpoint.geturl()
113 1
        params.ServerUris = uris
114 1
        return self.iserver.find_servers(params)
115
116 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
117
        """
118
        Register to an OPC-UA Discovery server. Registering must be renewed at
119
        least every 10 minutes, so this method will use our asyncio thread to
120
        re-register every period seconds
121
        if period is 0 registration is not automatically renewed
122
        """
123
        # FIXME: habe a period per discovery 
124 1
        if url in self._discovery_clients:
125
            self._discovery_clients[url].disconnect()
126
        self._discovery_clients[url] = Client(url)
127
        self._discovery_clients[url].connect()
128
        self._discovery_clients[url].register_server(self)
129
        self._discovery_period = period
130 1
        if period:
131 1
            self.iserver.loop.call_soon(self._renew_registration)
132
133 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
134 1
        """
135
        stop registration thread
136 1
        """
137
        # FIXME: is there really no way to deregister?
138 1
        self._discovery_clients[url].disconnect()
139
140 1
    def _renew_registration(self):
141 1
        for client in self._discovery_clients.values():
142 1
            client.register_server(self)
143 1
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
144
145 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
146 1
        """
147 1
        Create a client to discovery server and return it
148
        """
149 1
        client = Client(url)
150 1
        client.connect()
151 1
        return client
152
153 1
    def allow_remote_admin(self, allow):
154 1
        """
155 1
        Enable or disable the builtin Admin user from network clients
156
        """
157 1
        self.iserver.allow_remote_admin = allow
158 1
159 1
    def set_endpoint(self, url):
160 1
        self.endpoint = urlparse(url)
161 1
162 1
    def get_endpoints(self):
163
        return self.iserver.get_endpoints()
164 1
165 1
    def _setup_server_nodes(self):
166 1
        # to be called just before starting server since it needs all parameters to be setup
167 1
        self._set_endpoints()
168 1
        self._policies = [ua.SecurityPolicyFactory()]
169 1
        if self.certificate and self.private_key:
170 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
171 1
                                ua.MessageSecurityMode.SignAndEncrypt)
172 1
            self._policies.append(ua.SecurityPolicyFactory(
173
                                security_policies.SecurityPolicyBasic128Rsa15,
174 1
                                ua.MessageSecurityMode.SignAndEncrypt,
175
                                self.certificate, self.private_key))
176
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
177 1
                                ua.MessageSecurityMode.Sign)
178
            self._policies.append(ua.SecurityPolicyFactory(
179
                                security_policies.SecurityPolicyBasic128Rsa15,
180
                                ua.MessageSecurityMode.Sign,
181 1
                                self.certificate, self.private_key))
182 1
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
183 1
                                ua.MessageSecurityMode.SignAndEncrypt)
184 1
            self._policies.append(ua.SecurityPolicyFactory(
185
                                security_policies.SecurityPolicyBasic256,
186 1
                                ua.MessageSecurityMode.SignAndEncrypt,
187
                                self.certificate, self.private_key))
188
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
189
                                ua.MessageSecurityMode.Sign)
190 1
            self._policies.append(ua.SecurityPolicyFactory(
191 1
                                security_policies.SecurityPolicyBasic256,
192 1
                                ua.MessageSecurityMode.Sign,
193 1
                                self.certificate, self.private_key))
194
195 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
196
        idtoken = ua.UserTokenPolicy()
197
        idtoken.PolicyId = 'anonymous'
198
        idtoken.TokenType = ua.UserTokenType.Anonymous
199 1
200
        idtoken2 = ua.UserTokenPolicy()
201 1
        idtoken2.PolicyId = 'certificate_basic256'
202
        idtoken2.TokenType = ua.UserTokenType.Certificate
203
204
        idtoken3 = ua.UserTokenPolicy()
205 1
        idtoken3.PolicyId = 'certificate_basic128'
206
        idtoken3.TokenType = ua.UserTokenType.Certificate
207 1
208
        idtoken4 = ua.UserTokenPolicy()
209
        idtoken4.PolicyId = 'username'
210
        idtoken4.TokenType = ua.UserTokenType.UserName
211 1
212
        appdesc = ua.ApplicationDescription()
213 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
214
        appdesc.ApplicationUri = self.application_uri
215
        appdesc.ApplicationType = self.application_type
216
        appdesc.ProductUri = self.product_uri
217 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
218
219 1
        edp = ua.EndpointDescription()
220
        edp.EndpointUrl = self.endpoint.geturl()
221
        edp.Server = appdesc
222
        if self.certificate:
223
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
224
        edp.SecurityMode = mode
225 1
        edp.SecurityPolicyUri = policy.URI
226 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3]
227 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
228 1
        edp.SecurityLevel = 0
229 1
        self.iserver.add_endpoint(edp)
230 1
231 1
    def set_server_name(self, name):
232 1
        self.name = name
233
234 1
    def start(self):
235
        """
236
        Start to listen on network
237
        """
238 1
        self._setup_server_nodes()
239 1
        self.iserver.start()
240
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
241 1
        self.bserver.set_policies(self._policies)
242
        self.bserver.start()
243
244
    def stop(self):
245 1
        """
246 1
        Stop server
247 1
        """
248 1
        for client in self._discovery_clients.values():
249 1
            client.disconnect()
250
        self.bserver.stop()
251 1
        self.iserver.stop()
252
253
    def get_root_node(self):
254
        """
255 1
        Get Root node of server. Returns a Node object.
256 1
        """
257
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
258 1
259
    def get_objects_node(self):
260
        """
261
        Get Objects node of server. Returns a Node object.
262
        """
263
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
264
265 1
    def get_server_node(self):
266
        """
267
        Get Server node of server. Returns a Node object.
268
        """
269 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
270 1
271
    def get_node(self, nodeid):
272
        """
273
        Get a specific node using NodeId object or a string representing a NodeId
274
        """
275
        return Node(self.iserver.isession, nodeid)
276
277
    def create_subscription(self, period, handler):
278
        """
279
        Create a subscription.
280
        returns a Subscription object which allow
281
        to subscribe to events or data on server
282
        """
283
        params = ua.CreateSubscriptionParameters()
284
        params.RequestedPublishingInterval = period
285
        params.RequestedLifetimeCount = 3000
286
        params.RequestedMaxKeepAliveCount = 10000
287
        params.MaxNotificationsPerPublish = 0
288
        params.PublishingEnabled = True
289
        params.Priority = 0
290
        return Subscription(self.iserver.isession, params, handler)
291
292
    def get_namespace_array(self):
293
        """
294
        get all namespace defined in server
295
        """
296
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
297
        return ns_node.get_value()
298
299
    def register_namespace(self, uri):
300
        """
301
        Register a new namespace. Nodes should in custom namespace, not 0.
302
        """
303
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
304
        uries = ns_node.get_value()
305
        uries.append(uri)
306
        ns_node.set_value(uries)
307
        return (len(uries) - 1)
308
309
    def get_namespace_index(self, uri):
310
        """
311
        get index of a namespace using its uri
312
        """
313
        uries = self.get_namespace_array()
314
        return uries.index(uri)
315
316
    def get_event_object(self, etype=ua.ObjectIds.BaseEventType, source=ua.ObjectIds.Server):
317
        """
318
        Returns an event object using an event type from address space.
319
        Use this object to fire events
320
        """
321
        return Event(self.iserver.isession, etype, source)
322
323
    def import_xml(self, path):
324
        """
325
        import nodes defined in xml
326
        """
327
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
328
        importer.import_xml(path)
329