Completed
Pull Request — master (#133)
by Denis
02:49
created

Server.get_event_object()   A

Complexity

Conditions 3

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3
Metric Value
cc 3
dl 0
loc 14
ccs 7
cts 7
cp 1
crap 3
rs 9.4285
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.subscription import Subscription
18 1
from opcua.common import xmlimporter
19 1
from opcua.common.manage_nodes import delete_nodes
20 1
from opcua.client.client import Client
21 1
from opcua.crypto import security_policies
22 1
use_crypto = True
23 1
try:
24 1
    from opcua.crypto import uacrypto
25 1
except ImportError:
26 1
    print("cryptography is not installed, use of crypto disabled")
27 1
    use_crypto = False
28
29
30 1
class Server(object):
31
32
    """
33
    High level Server class
34
35
    This class creates an opcua server with default values
36
37
    Create your own namespace and then populate your server address space
38
    using use the get_root() or get_objects() to get Node objects.
39
    and get_event_object() to fire events.
40
    Then start server. See example_server.py
41
    All methods are threadsafe
42
43
    If you need more flexibility you call directly the Ua Service methods
44
    on the iserver  or iserver.isesssion object members.
45
46
47
    :ivar application_uri:
48
    :vartype application_uri: uri
49
    :ivar product_uri:
50
    :vartype product_uri: uri
51
    :ivar name:
52
    :vartype name: string
53
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
54
    :vartype default_timeout: int
55
    :ivar iserver: internal server object
56
    :vartype default_timeout: InternalServer
57
    :ivar bserver: binary protocol server
58
    :vartype bserver: BinaryServer
59
60
    """
61
62 1
    def __init__(self):
63 1
        self.logger = logging.getLogger(__name__)
64 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
65 1
        self.application_uri = "urn:freeopcua:python:server"
66 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
67 1
        self.name = "FreeOpcUa Python Server"
68 1
        self.application_type = ua.ApplicationType.ClientAndServer
69 1
        self.default_timeout = 3600000
70 1
        self.iserver = InternalServer()
71 1
        self.bserver = None
72 1
        self._discovery_clients = {}
73 1
        self._discovery_period = 60
74 1
        self.certificate = None
75 1
        self.private_key = None
76 1
        self._policies = []
77
78
        # setup some expected values
79 1
        self.register_namespace(self.application_uri)
80 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
81 1
        sa_node.set_value([self.application_uri])
82
83 1
    def load_certificate(self, path):
84
        """
85
        load server certificate from file, either pem or der
86
        """
87
        self.certificate = uacrypto.load_certificate(path)
88
89 1
    def load_private_key(self, path):
90
        self.private_key = uacrypto.load_private_key(path)
91
92 1
    def disable_clock(self, val=True):
93
        """
94
        for debugging you may want to disable clock that write every second
95
        to address space
96
        """
97
        self.iserver.disabled_clock = val
98
99 1
    def set_application_uri(self, uri):
100
        """
101
        Set application/server URI.
102
        This uri is supposed to be unique. If you intent to register
103
        your server to a discovery server, it really should be unique in
104
        your system!
105
        default is : "urn:freeopcua:python:server"
106
        """
107 1
        self.application_uri = uri
108
109 1
    def find_servers(self, uris=None):
110
        """
111
        find_servers. mainly implemented for simmetry with client
112
        """
113 1
        if uris is None:
114 1
            uris = []
115 1
        params = ua.FindServersParameters()
116 1
        params.EndpointUrl = self.endpoint.geturl()
117 1
        params.ServerUris = uris
118 1
        return self.iserver.find_servers(params)
119
120 1
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
121
        """
122
        Register to an OPC-UA Discovery server. Registering must be renewed at
123
        least every 10 minutes, so this method will use our asyncio thread to
124
        re-register every period seconds
125
        if period is 0 registration is not automatically renewed
126
        """
127
        # FIXME: habe a period per discovery
128 1
        if url in self._discovery_clients:
129 1
            self._discovery_clients[url].disconnect()
130 1
        self._discovery_clients[url] = Client(url)
131 1
        self._discovery_clients[url].connect()
132 1
        self._discovery_clients[url].register_server(self)
133 1
        self._discovery_period = period
134 1
        if period:
135
            self.iserver.loop.call_soon(self._renew_registration)
136
137 1
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
138
        """
139
        stop registration thread
140
        """
141
        # FIXME: is there really no way to deregister?
142
        self._discovery_clients[url].disconnect()
143
144 1
    def _renew_registration(self):
145
        for client in self._discovery_clients.values():
146
            client.register_server(self)
147
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
148
149 1
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
150
        """
151
        Create a client to discovery server and return it
152
        """
153
        client = Client(url)
154
        client.connect()
155
        return client
156
157 1
    def allow_remote_admin(self, allow):
158
        """
159
        Enable or disable the builtin Admin user from network clients
160
        """
161
        self.iserver.allow_remote_admin = allow
162
163 1
    def set_endpoint(self, url):
164 1
        self.endpoint = urlparse(url)
165
166 1
    def get_endpoints(self):
167 1
        return self.iserver.get_endpoints()
168
169 1
    def _setup_server_nodes(self):
170
        # to be called just before starting server since it needs all parameters to be setup
171 1
        self._set_endpoints()
172 1
        self._policies = [ua.SecurityPolicyFactory()]
173 1
        if self.certificate and self.private_key:
174
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
175
                                ua.MessageSecurityMode.SignAndEncrypt)
176
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
177
                                                           ua.MessageSecurityMode.SignAndEncrypt,
178
                                                           self.certificate,
179
                                                           self.private_key)
180
                                 )
181
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
182
                                ua.MessageSecurityMode.Sign)
183
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
184
                                                           ua.MessageSecurityMode.Sign,
185
                                                           self.certificate,
186
                                                           self.private_key)
187
                                 )
188
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
189
                                ua.MessageSecurityMode.SignAndEncrypt)
190
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
191
                                                           ua.MessageSecurityMode.SignAndEncrypt,
192
                                                           self.certificate,
193
                                                           self.private_key)
194
                                 )
195
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
196
                                ua.MessageSecurityMode.Sign)
197
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
198
                                                           ua.MessageSecurityMode.Sign,
199
                                                           self.certificate,
200
                                                           self.private_key)
201
                                 )
202
203 1
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
204 1
        idtoken = ua.UserTokenPolicy()
205 1
        idtoken.PolicyId = 'anonymous'
206 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
207
208 1
        idtoken2 = ua.UserTokenPolicy()
209 1
        idtoken2.PolicyId = 'certificate_basic256'
210 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
211
212 1
        idtoken3 = ua.UserTokenPolicy()
213 1
        idtoken3.PolicyId = 'certificate_basic128'
214 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
215
216 1
        idtoken4 = ua.UserTokenPolicy()
217 1
        idtoken4.PolicyId = 'username'
218 1
        idtoken4.TokenType = ua.UserTokenType.UserName
219
220 1
        appdesc = ua.ApplicationDescription()
221 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
222 1
        appdesc.ApplicationUri = self.application_uri
223 1
        appdesc.ApplicationType = self.application_type
224 1
        appdesc.ProductUri = self.product_uri
225 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
226
227 1
        edp = ua.EndpointDescription()
228 1
        edp.EndpointUrl = self.endpoint.geturl()
229 1
        edp.Server = appdesc
230 1
        if self.certificate:
231
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
232 1
        edp.SecurityMode = mode
233 1
        edp.SecurityPolicyUri = policy.URI
234 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
235 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
236 1
        edp.SecurityLevel = 0
237 1
        self.iserver.add_endpoint(edp)
238
239 1
    def set_server_name(self, name):
240
        self.name = name
241
242 1
    def start(self):
243
        """
244
        Start to listen on network
245
        """
246 1
        self._setup_server_nodes()
247 1
        self.iserver.start()
248 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
249 1
        self.bserver.set_policies(self._policies)
250 1
        self.bserver.start()
251
252 1
    def stop(self):
253
        """
254
        Stop server
255
        """
256 1
        for client in self._discovery_clients.values():
257 1
            client.disconnect()
258 1
        self.bserver.stop()
259 1
        self.iserver.stop()
260
261 1
    def get_root_node(self):
262
        """
263
        Get Root node of server. Returns a Node object.
264
        """
265 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
266
267 1
    def get_objects_node(self):
268
        """
269
        Get Objects node of server. Returns a Node object.
270
        """
271 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
272
273 1
    def get_server_node(self):
274
        """
275
        Get Server node of server. Returns a Node object.
276
        """
277 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
278
279 1
    def get_node(self, nodeid):
280
        """
281
        Get a specific node using NodeId object or a string representing a NodeId
282
        """
283 1
        return Node(self.iserver.isession, nodeid)
284
285 1
    def create_subscription(self, period, handler):
286
        """
287
        Create a subscription.
288
        returns a Subscription object which allow
289
        to subscribe to events or data on server
290
        """
291 1
        params = ua.CreateSubscriptionParameters()
292 1
        params.RequestedPublishingInterval = period
293 1
        params.RequestedLifetimeCount = 3000
294 1
        params.RequestedMaxKeepAliveCount = 10000
295 1
        params.MaxNotificationsPerPublish = 0
296 1
        params.PublishingEnabled = True
297 1
        params.Priority = 0
298 1
        return Subscription(self.iserver.isession, params, handler)
299
300 1
    def get_namespace_array(self):
301
        """
302
        get all namespace defined in server
303
        """
304 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
305 1
        return ns_node.get_value()
306
307 1
    def register_namespace(self, uri):
308
        """
309
        Register a new namespace. Nodes should in custom namespace, not 0.
310
        """
311 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
312 1
        uries = ns_node.get_value()
313 1
        uries.append(uri)
314 1
        ns_node.set_value(uries)
315 1
        return len(uries) - 1
316
317 1
    def get_namespace_index(self, uri):
318
        """
319
        get index of a namespace using its uri
320
        """
321 1
        uries = self.get_namespace_array()
322 1
        return uries.index(uri)
323
324 1
    def get_event_object(self, event=ua.BaseEvent()):
325
        """
326
        Returns an event object using an event type from address space.
327
        Use this object to fire events
328
        """
329
        #TODO: Implement EventTypeID as input paramter
330
331 1
        if isinstance(event, ua.BaseEvent):
332 1
            source = Node(self.iserver.isession, event.SourceNode)
333 1
            if event.SourceNode.Identifier:
334 1
                event.SourceName = source.get_display_name().Text
335 1
                source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
336
337 1
        return event
338
339 1
    def trigger_event(self, event):
340 1
        self.iserver.isession.subscription_service.trigger_event(event)
341
342 1
    def import_xml(self, path):
343
        """
344
        import nodes defined in xml
345
        """
346 1
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
347 1
        importer.import_xml(path)
348
349 1
    def delete_nodes(self, nodes, recursive=False):
350 1
        return delete_nodes(self.iserver.isession, nodes, recursive)
351
352 1
    def historize_node(self, node):
353
        self.iserver.enable_history(node)
354
355 1
    def dehistorize_node(self, node):
356
        self.iserver.disable_history(node)
357