Completed
Pull Request — master (#167)
by
unknown
05:31 queued 02:22
created

Server.__enter__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 3
ccs 1
cts 1
cp 1
crap 1
rs 10
c 1
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import Event
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
use_crypto = True
24 1
try:
25 1
    from opcua.crypto import uacrypto
26 1
except ImportError:
27 1
    print("cryptography is not installed, use of crypto disabled")
28 1
    use_crypto = False
29
30
31 1
class Server(object):
32
33
    """
34
    High level Server class
35
36
    This class creates an opcua server with default values
37
38
    Create your own namespace and then populate your server address space
39
    using use the get_root() or get_objects() to get Node objects.
40
    and get_event_object() to fire events.
41
    Then start server. See example_server.py
42
    All methods are threadsafe
43
44
    If you need more flexibility you call directly the Ua Service methods
45
    on the iserver  or iserver.isesssion object members.
46
47
48
    :ivar application_uri:
49
    :vartype application_uri: uri
50
    :ivar product_uri:
51
    :vartype product_uri: uri
52
    :ivar name:
53
    :vartype name: string
54
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
55
    :vartype default_timeout: int
56
    :ivar iserver: internal server object
57
    :vartype default_timeout: InternalServer
58
    :ivar bserver: binary protocol server
59
    :vartype bserver: BinaryServer
60
61
    """
62
63 1
    def __init__(self):
64 1
        self.logger = logging.getLogger(__name__)
65 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
66 1
        self.application_uri = "urn:freeopcua:python:server"
67 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
68 1
        self.name = "FreeOpcUa Python Server"
69 1
        self.application_type = ua.ApplicationType.ClientAndServer
70 1
        self.default_timeout = 3600000
71 1
        self.iserver = InternalServer()
72 1
        self.bserver = None
73 1
        self._discovery_clients = {}
74 1
        self._discovery_period = 60
75 1
        self.certificate = None
76 1
        self.private_key = None
77 1
        self._policies = []
78
79
        # setup some expected values
80 1
        self.register_namespace(self.application_uri)
81 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
82 1
        sa_node.set_value([self.application_uri])
83
84 1
    def __enter__(self):
85
        self.start()
86
        return self
87
88
    def __exit__(self):
89
        self.stop()
90 1
91
    def load_certificate(self, path):
92
        """
93 1
        load server certificate from file, either pem or der
94
        """
95
        self.certificate = uacrypto.load_certificate(path)
96
97
    def load_private_key(self, path):
98
        self.private_key = uacrypto.load_private_key(path)
99
100 1
    def disable_clock(self, val=True):
101
        """
102
        for debugging you may want to disable clock that write every second
103
        to address space
104
        """
105
        self.iserver.disabled_clock = val
106
107
    def set_application_uri(self, uri):
108 1
        """
109
        Set application/server URI.
110 1
        This uri is supposed to be unique. If you intent to register
111
        your server to a discovery server, it really should be unique in
112
        your system!
113
        default is : "urn:freeopcua:python:server"
114 1
        """
115 1
        self.application_uri = uri
116 1
117 1
    def find_servers(self, uris=None):
118 1
        """
119 1
        find_servers. mainly implemented for simmetry with client
120
        """
121 1
        if uris is None:
122
            uris = []
123
        params = ua.FindServersParameters()
124
        params.EndpointUrl = self.endpoint.geturl()
125
        params.ServerUris = uris
126
        return self.iserver.find_servers(params)
127
128
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
129 1
        """
130 1
        Register to an OPC-UA Discovery server. Registering must be renewed at
131 1
        least every 10 minutes, so this method will use our asyncio thread to
132 1
        re-register every period seconds
133 1
        if period is 0 registration is not automatically renewed
134 1
        """
135 1
        # FIXME: habe a period per discovery
136
        if url in self._discovery_clients:
137
            self._discovery_clients[url].disconnect()
138 1
        self._discovery_clients[url] = Client(url)
139
        self._discovery_clients[url].connect()
140
        self._discovery_clients[url].register_server(self)
141
        self._discovery_period = period
142
        if period:
143
            self.iserver.loop.call_soon(self._renew_registration)
144
145 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
146
        """
147
        stop registration thread
148
        """
149
        # FIXME: is there really no way to deregister?
150 1
        self._discovery_clients[url].disconnect()
151
152
    def _renew_registration(self):
153
        for client in self._discovery_clients.values():
154
            client.register_server(self)
155
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
156
157
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
158 1
        """
159
        Create a client to discovery server and return it
160
        """
161
        client = Client(url)
162
        client.connect()
163
        return client
164 1
165 1
    def allow_remote_admin(self, allow):
166
        """
167 1
        Enable or disable the builtin Admin user from network clients
168 1
        """
169
        self.iserver.allow_remote_admin = allow
170 1
171
    def set_endpoint(self, url):
172 1
        self.endpoint = urlparse(url)
173 1
174 1
    def get_endpoints(self):
175
        return self.iserver.get_endpoints()
176
177
    def _setup_server_nodes(self):
178
        # to be called just before starting server since it needs all parameters to be setup
179
        self._set_endpoints()
180
        self._policies = [ua.SecurityPolicyFactory()]
181
        if self.certificate and self.private_key:
182
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
183
                                ua.MessageSecurityMode.SignAndEncrypt)
184
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
185
                                                           ua.MessageSecurityMode.SignAndEncrypt,
186
                                                           self.certificate,
187
                                                           self.private_key)
188
                                 )
189
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
190
                                ua.MessageSecurityMode.Sign)
191
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
192
                                                           ua.MessageSecurityMode.Sign,
193
                                                           self.certificate,
194
                                                           self.private_key)
195
                                 )
196
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
197
                                ua.MessageSecurityMode.SignAndEncrypt)
198
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
199
                                                           ua.MessageSecurityMode.SignAndEncrypt,
200
                                                           self.certificate,
201
                                                           self.private_key)
202
                                 )
203
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
204 1
                                ua.MessageSecurityMode.Sign)
205 1
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
206 1
                                                           ua.MessageSecurityMode.Sign,
207 1
                                                           self.certificate,
208
                                                           self.private_key)
209 1
                                 )
210 1
211 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
212
        idtoken = ua.UserTokenPolicy()
213 1
        idtoken.PolicyId = 'anonymous'
214 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
215 1
216
        idtoken2 = ua.UserTokenPolicy()
217 1
        idtoken2.PolicyId = 'certificate_basic256'
218 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
219 1
220
        idtoken3 = ua.UserTokenPolicy()
221 1
        idtoken3.PolicyId = 'certificate_basic128'
222 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
223 1
224 1
        idtoken4 = ua.UserTokenPolicy()
225 1
        idtoken4.PolicyId = 'username'
226 1
        idtoken4.TokenType = ua.UserTokenType.UserName
227
228 1
        appdesc = ua.ApplicationDescription()
229 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
230 1
        appdesc.ApplicationUri = self.application_uri
231 1
        appdesc.ApplicationType = self.application_type
232
        appdesc.ProductUri = self.product_uri
233 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
234 1
235 1
        edp = ua.EndpointDescription()
236 1
        edp.EndpointUrl = self.endpoint.geturl()
237 1
        edp.Server = appdesc
238 1
        if self.certificate:
239
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
240 1
        edp.SecurityMode = mode
241
        edp.SecurityPolicyUri = policy.URI
242
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
243 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
244
        edp.SecurityLevel = 0
245
        self.iserver.add_endpoint(edp)
246
247 1
    def set_server_name(self, name):
248 1
        self.name = name
249 1
250 1
    def start(self):
251 1
        """
252
        Start to listen on network
253 1
        """
254
        self._setup_server_nodes()
255
        self.iserver.start()
256
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
257 1
        self.bserver.set_policies(self._policies)
258 1
        self.bserver.start()
259 1
260 1
    def stop(self):
261
        """
262 1
        Stop server
263
        """
264
        for client in self._discovery_clients.values():
265
            client.disconnect()
266 1
        self.bserver.stop()
267
        self.iserver.stop()
268 1
269
    def get_root_node(self):
270
        """
271
        Get Root node of server. Returns a Node object.
272 1
        """
273
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
274 1
275
    def get_objects_node(self):
276
        """
277
        Get Objects node of server. Returns a Node object.
278 1
        """
279
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
280 1
281
    def get_server_node(self):
282
        """
283
        Get Server node of server. Returns a Node object.
284 1
        """
285
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
286 1
287
    def get_node(self, nodeid):
288
        """
289
        Get a specific node using NodeId object or a string representing a NodeId
290
        """
291
        return Node(self.iserver.isession, nodeid)
292 1
293 1
    def create_subscription(self, period, handler):
294 1
        """
295 1
        Create a subscription.
296 1
        returns a Subscription object which allow
297 1
        to subscribe to events or data on server
298 1
        """
299 1
        params = ua.CreateSubscriptionParameters()
300
        params.RequestedPublishingInterval = period
301 1
        params.RequestedLifetimeCount = 3000
302
        params.RequestedMaxKeepAliveCount = 10000
303
        params.MaxNotificationsPerPublish = 0
304
        params.PublishingEnabled = True
305 1
        params.Priority = 0
306 1
        return Subscription(self.iserver.isession, params, handler)
307
308 1
    def get_namespace_array(self):
309
        """
310
        get all namespace defined in server
311
        """
312 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
313 1
        return ns_node.get_value()
314 1
315 1
    def register_namespace(self, uri):
316 1
        """
317
        Register a new namespace. Nodes should in custom namespace, not 0.
318 1
        """
319
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
320
        uries = ns_node.get_value()
321
        uries.append(uri)
322 1
        ns_node.set_value(uries)
323 1
        return len(uries) - 1
324
325 1
    def get_namespace_index(self, uri):
326
        """
327
        get index of a namespace using its uri
328
        """
329
        uries = self.get_namespace_array()
330
        return uries.index(uri)
331
332 1
    def get_event_object(self, etype=ua.ObjectIds.BaseEventType, source=ua.ObjectIds.Server):
333
        """
334
        Returns an event object using an event type from address space.
335
        Use this object to fire events
336 1
        """
337 1
        return Event(self.iserver.isession, etype, source)
338
339 1
    def import_xml(self, path):
340 1
        """
341
        import nodes defined in xml
342 1
        """
343
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
344
        importer.import_xml(path)
345 1
346
    def delete_nodes(self, nodes, recursive=False):
347
        return delete_nodes(self.iserver.isession, nodes, recursive)
348
        
349
    def historize_node(self, node):
350
        self.iserver.enable_history(node)
351
    
352
    def dehistorize_node(self, node):
353
        self.iserver.disable_history(node)
354