Completed
Pull Request — master (#133)
by Denis
02:36
created

opcua.server.Server.disable_clock()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 6
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.4285
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.event import EventGenerator
18 1
from opcua.common.subscription import Subscription
19 1
from opcua.common import xmlimporter
20 1
from opcua.common.manage_nodes import delete_nodes
21 1
from opcua.client.client import Client
22 1
from opcua.crypto import security_policies
23 1
use_crypto = True
24 1
try:
25 1
    from opcua.crypto import uacrypto
26 1
except ImportError:
27 1
    print("cryptography is not installed, use of crypto disabled")
28 1
    use_crypto = False
29
30
31 1
class Server(object):
32
33
    """
34
    High level Server class
35
36
    This class creates an opcua server with default values
37
38
    Create your own namespace and then populate your server address space
39
    using use the get_root() or get_objects() to get Node objects.
40
    and get_event_object() to fire events.
41
    Then start server. See example_server.py
42
    All methods are threadsafe
43
44
    If you need more flexibility you call directly the Ua Service methods
45
    on the iserver  or iserver.isesssion object members.
46
47
48
    :ivar application_uri:
49
    :vartype application_uri: uri
50
    :ivar product_uri:
51
    :vartype product_uri: uri
52
    :ivar name:
53
    :vartype name: string
54
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
55
    :vartype default_timeout: int
56
    :ivar iserver: internal server object
57
    :vartype default_timeout: InternalServer
58
    :ivar bserver: binary protocol server
59
    :vartype bserver: BinaryServer
60
61
    """
62
63 1
    def __init__(self):
64 1
        self.logger = logging.getLogger(__name__)
65 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
66 1
        self.application_uri = "urn:freeopcua:python:server"
67 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
68 1
        self.name = "FreeOpcUa Python Server"
69 1
        self.application_type = ua.ApplicationType.ClientAndServer
70 1
        self.default_timeout = 3600000
71 1
        self.iserver = InternalServer()
72 1
        self.bserver = None
73 1
        self._discovery_clients = {}
74 1
        self._discovery_period = 60
75 1
        self.certificate = None
76 1
        self.private_key = None
77 1
        self._policies = []
78
79
        # setup some expected values
80 1
        self.register_namespace(self.application_uri)
81 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
82 1
        sa_node.set_value([self.application_uri])
83
84 1
    def load_certificate(self, path):
85
        """
86
        load server certificate from file, either pem or der
87
        """
88
        self.certificate = uacrypto.load_certificate(path)
89
90 1
    def load_private_key(self, path):
91
        self.private_key = uacrypto.load_private_key(path)
92
93 1
    def disable_clock(self, val=True):
94
        """
95
        for debugging you may want to disable clock that write every second
96
        to address space
97
        """
98
        self.iserver.disabled_clock = val
99
100 1
    def set_application_uri(self, uri):
101
        """
102
        Set application/server URI.
103
        This uri is supposed to be unique. If you intent to register
104
        your server to a discovery server, it really should be unique in
105
        your system!
106
        default is : "urn:freeopcua:python:server"
107
        """
108 1
        self.application_uri = uri
109
110 1
    def find_servers(self, uris=None):
111
        """
112
        find_servers. mainly implemented for simmetry with client
113
        """
114 1
        if uris is None:
115 1
            uris = []
116 1
        params = ua.FindServersParameters()
117 1
        params.EndpointUrl = self.endpoint.geturl()
118 1
        params.ServerUris = uris
119 1
        return self.iserver.find_servers(params)
120
121 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
122
        """
123
        Register to an OPC-UA Discovery server. Registering must be renewed at
124
        least every 10 minutes, so this method will use our asyncio thread to
125
        re-register every period seconds
126
        if period is 0 registration is not automatically renewed
127
        """
128
        # FIXME: habe a period per discovery
129 1
        if url in self._discovery_clients:
130 1
            self._discovery_clients[url].disconnect()
131 1
        self._discovery_clients[url] = Client(url)
132 1
        self._discovery_clients[url].connect()
133 1
        self._discovery_clients[url].register_server(self)
134 1
        self._discovery_period = period
135 1
        if period:
136
            self.iserver.loop.call_soon(self._renew_registration)
137
138 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
139
        """
140
        stop registration thread
141
        """
142
        # FIXME: is there really no way to deregister?
143
        self._discovery_clients[url].disconnect()
144
145 1
    def _renew_registration(self):
146
        for client in self._discovery_clients.values():
147
            client.register_server(self)
148
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
149
150 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
151
        """
152
        Create a client to discovery server and return it
153
        """
154
        client = Client(url)
155
        client.connect()
156
        return client
157
158 1
    def allow_remote_admin(self, allow):
159
        """
160
        Enable or disable the builtin Admin user from network clients
161
        """
162
        self.iserver.allow_remote_admin = allow
163
164 1
    def set_endpoint(self, url):
165 1
        self.endpoint = urlparse(url)
166
167 1
    def get_endpoints(self):
168 1
        return self.iserver.get_endpoints()
169
170 1
    def _setup_server_nodes(self):
171
        # to be called just before starting server since it needs all parameters to be setup
172 1
        self._set_endpoints()
173 1
        self._policies = [ua.SecurityPolicyFactory()]
174 1
        if self.certificate and self.private_key:
175
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
176
                                ua.MessageSecurityMode.SignAndEncrypt)
177
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
178
                                                           ua.MessageSecurityMode.SignAndEncrypt,
179
                                                           self.certificate,
180
                                                           self.private_key)
181
                                 )
182
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
183
                                ua.MessageSecurityMode.Sign)
184
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
185
                                                           ua.MessageSecurityMode.Sign,
186
                                                           self.certificate,
187
                                                           self.private_key)
188
                                 )
189
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
190
                                ua.MessageSecurityMode.SignAndEncrypt)
191
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
192
                                                           ua.MessageSecurityMode.SignAndEncrypt,
193
                                                           self.certificate,
194
                                                           self.private_key)
195
                                 )
196
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
197
                                ua.MessageSecurityMode.Sign)
198
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
199
                                                           ua.MessageSecurityMode.Sign,
200
                                                           self.certificate,
201
                                                           self.private_key)
202
                                 )
203
204 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
205 1
        idtoken = ua.UserTokenPolicy()
206 1
        idtoken.PolicyId = 'anonymous'
207 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
208
209 1
        idtoken2 = ua.UserTokenPolicy()
210 1
        idtoken2.PolicyId = 'certificate_basic256'
211 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
212
213 1
        idtoken3 = ua.UserTokenPolicy()
214 1
        idtoken3.PolicyId = 'certificate_basic128'
215 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
216
217 1
        idtoken4 = ua.UserTokenPolicy()
218 1
        idtoken4.PolicyId = 'username'
219 1
        idtoken4.TokenType = ua.UserTokenType.UserName
220
221 1
        appdesc = ua.ApplicationDescription()
222 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
223 1
        appdesc.ApplicationUri = self.application_uri
224 1
        appdesc.ApplicationType = self.application_type
225 1
        appdesc.ProductUri = self.product_uri
226 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
227
228 1
        edp = ua.EndpointDescription()
229 1
        edp.EndpointUrl = self.endpoint.geturl()
230 1
        edp.Server = appdesc
231 1
        if self.certificate:
232
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
233 1
        edp.SecurityMode = mode
234 1
        edp.SecurityPolicyUri = policy.URI
235 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
236 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
237 1
        edp.SecurityLevel = 0
238 1
        self.iserver.add_endpoint(edp)
239
240 1
    def set_server_name(self, name):
241
        self.name = name
242
243 1
    def start(self):
244
        """
245
        Start to listen on network
246
        """
247 1
        self._setup_server_nodes()
248 1
        self.iserver.start()
249 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
250 1
        self.bserver.set_policies(self._policies)
251 1
        self.bserver.start()
252
253 1
    def stop(self):
254
        """
255
        Stop server
256
        """
257 1
        for client in self._discovery_clients.values():
258 1
            client.disconnect()
259 1
        self.bserver.stop()
260 1
        self.iserver.stop()
261
262 1
    def get_root_node(self):
263
        """
264
        Get Root node of server. Returns a Node object.
265
        """
266 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
267
268 1
    def get_objects_node(self):
269
        """
270
        Get Objects node of server. Returns a Node object.
271
        """
272 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
273
274 1
    def get_server_node(self):
275
        """
276
        Get Server node of server. Returns a Node object.
277
        """
278 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
279
280 1
    def get_node(self, nodeid):
281
        """
282
        Get a specific node using NodeId object or a string representing a NodeId
283
        """
284 1
        return Node(self.iserver.isession, nodeid)
285
286 1
    def create_subscription(self, period, handler):
287
        """
288
        Create a subscription.
289
        returns a Subscription object which allow
290
        to subscribe to events or data on server
291
        """
292 1
        params = ua.CreateSubscriptionParameters()
293 1
        params.RequestedPublishingInterval = period
294 1
        params.RequestedLifetimeCount = 3000
295 1
        params.RequestedMaxKeepAliveCount = 10000
296 1
        params.MaxNotificationsPerPublish = 0
297 1
        params.PublishingEnabled = True
298 1
        params.Priority = 0
299 1
        return Subscription(self.iserver.isession, params, handler)
300
301 1
    def get_namespace_array(self):
302
        """
303
        get all namespace defined in server
304
        """
305 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
306 1
        return ns_node.get_value()
307
308 1
    def register_namespace(self, uri):
309
        """
310
        Register a new namespace. Nodes should in custom namespace, not 0.
311
        """
312 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
313 1
        uries = ns_node.get_value()
314 1
        uries.append(uri)
315 1
        ns_node.set_value(uries)
316 1
        return len(uries) - 1
317
318 1
    def get_namespace_index(self, uri):
319
        """
320
        get index of a namespace using its uri
321
        """
322 1
        uries = self.get_namespace_array()
323 1
        return uries.index(uri)
324
325 1
    def get_event_generator(self, event=ua.BaseEvent()):
326
        """
327
        Returns an event object using an event type from address space.
328
        Use this object to fire events
329
        """
330
        return EventGenerator(self.iserver.isession, event)
331
332 1
    def import_xml(self, path):
333
        """
334
        import nodes defined in xml
335
        """
336 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
337 1
        importer.import_xml(path)
338
339 1
    def delete_nodes(self, nodes, recursive=False):
340
        return delete_nodes(self.iserver.isession, nodes, recursive)
341