Completed
Pull Request — master (#133)
by Denis
02:28
created

Server.trigger_event()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
cc 1
dl 0
loc 2
ccs 0
cts 0
cp 0
crap 2
rs 10
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5 1
import logging
6 1
try:
7 1
    from urllib.parse import urlparse
8
except ImportError:
9
    from urlparse import urlparse
10
11
12 1
from opcua import ua
13
#from opcua.binary_server import BinaryServer
14 1
from opcua.server.binary_server_asyncio import BinaryServer
15 1
from opcua.server.internal_server import InternalServer
16 1
from opcua.common.node import Node
17 1
from opcua.common.subscription import Subscription
18 1
from opcua.common import xmlimporter
19 1
from opcua.common.manage_nodes import delete_nodes
20 1
from opcua.client.client import Client
21 1
from opcua.crypto import security_policies
22 1
use_crypto = True
23 1
try:
24 1
    from opcua.crypto import uacrypto
25 1
except ImportError:
26 1
    print("cryptography is not installed, use of crypto disabled")
27 1
    use_crypto = False
28 1
29
30
class Server(object):
31 1
32
    """
33
    High level Server class
34
35
    This class creates an opcua server with default values
36
37
    Create your own namespace and then populate your server address space
38
    using use the get_root() or get_objects() to get Node objects.
39
    and get_event_object() to fire events.
40
    Then start server. See example_server.py
41
    All methods are threadsafe
42
43
    If you need more flexibility you call directly the Ua Service methods
44
    on the iserver  or iserver.isesssion object members.
45
46
47
    :ivar application_uri:
48
    :vartype application_uri: uri
49
    :ivar product_uri:
50
    :vartype product_uri: uri
51
    :ivar name:
52
    :vartype name: string
53
    :ivar default_timeout: timout in milliseconds for sessions and secure channel
54
    :vartype default_timeout: int
55
    :ivar iserver: internal server object
56
    :vartype default_timeout: InternalServer
57
    :ivar bserver: binary protocol server
58
    :vartype bserver: BinaryServer
59
60
    """
61
62
    def __init__(self):
63 1
        self.logger = logging.getLogger(__name__)
64 1
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
65 1
        self.application_uri = "urn:freeopcua:python:server"
66 1
        self.product_uri = "urn:freeopcua.github.no:python:server"
67 1
        self.name = "FreeOpcUa Python Server"
68 1
        self.application_type = ua.ApplicationType.ClientAndServer
69 1
        self.default_timeout = 3600000
70 1
        self.iserver = InternalServer()
71 1
        self.bserver = None
72 1
        self._discovery_clients = {}
73 1
        self._discovery_period = 60
74 1
        self.certificate = None
75 1
        self.private_key = None
76 1
        self._policies = []
77 1
78
        # setup some expected values
79
        self.register_namespace(self.application_uri)
80 1
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
81 1
        sa_node.set_value([self.application_uri])
82 1
83
    def load_certificate(self, path):
84 1
        """
85
        load server certificate from file, either pem or der
86
        """
87
        self.certificate = uacrypto.load_certificate(path)
88
89
    def load_private_key(self, path):
90 1
        self.private_key = uacrypto.load_private_key(path)
91
92
    def disable_clock(self, val=True):
93 1
        """
94
        for debugging you may want to disable clock that write every second
95
        to address space
96
        """
97
        self.iserver.disabled_clock = val
98
99
    def set_application_uri(self, uri):
100 1
        """
101
        Set application/server URI.
102
        This uri is supposed to be unique. If you intent to register
103
        your server to a discovery server, it really should be unique in
104
        your system!
105
        default is : "urn:freeopcua:python:server"
106
        """
107
        self.application_uri = uri
108 1
109
    def find_servers(self, uris=None):
110 1
        """
111
        find_servers. mainly implemented for simmetry with client
112
        """
113
        if uris is None:
114 1
            uris = []
115 1
        params = ua.FindServersParameters()
116 1
        params.EndpointUrl = self.endpoint.geturl()
117 1
        params.ServerUris = uris
118 1
        return self.iserver.find_servers(params)
119 1
120
    def register_to_discovery(self, url="opc.tcp://localhost:4840", period=60):
121 1
        """
122
        Register to an OPC-UA Discovery server. Registering must be renewed at
123
        least every 10 minutes, so this method will use our asyncio thread to
124
        re-register every period seconds
125
        if period is 0 registration is not automatically renewed
126
        """
127
        # FIXME: habe a period per discovery
128
        if url in self._discovery_clients:
129 1
            self._discovery_clients[url].disconnect()
130 1
        self._discovery_clients[url] = Client(url)
131 1
        self._discovery_clients[url].connect()
132 1
        self._discovery_clients[url].register_server(self)
133 1
        self._discovery_period = period
134 1
        if period:
135 1
            self.iserver.loop.call_soon(self._renew_registration)
136
137
    def unregister_to_discovery(self, url="opc.tcp://localhost:4840"):
138 1
        """
139
        stop registration thread
140
        """
141
        # FIXME: is there really no way to deregister?
142
        self._discovery_clients[url].disconnect()
143
144
    def _renew_registration(self):
145 1
        for client in self._discovery_clients.values():
146
            client.register_server(self)
147
            self.iserver.loop.call_later(self._discovery_period, self._renew_registration)
148
149
    def get_client_to_discovery(self, url="opc.tcp://localhost:4840"):
150 1
        """
151
        Create a client to discovery server and return it
152
        """
153
        client = Client(url)
154
        client.connect()
155
        return client
156
157
    def allow_remote_admin(self, allow):
158 1
        """
159
        Enable or disable the builtin Admin user from network clients
160
        """
161
        self.iserver.allow_remote_admin = allow
162
163
    def set_endpoint(self, url):
164 1
        self.endpoint = urlparse(url)
165 1
166
    def get_endpoints(self):
167 1
        return self.iserver.get_endpoints()
168 1
169
    def _setup_server_nodes(self):
170 1
        # to be called just before starting server since it needs all parameters to be setup
171
        self._set_endpoints()
172 1
        self._policies = [ua.SecurityPolicyFactory()]
173 1
        if self.certificate and self.private_key:
174 1
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
175
                                ua.MessageSecurityMode.SignAndEncrypt)
176
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
177
                                                           ua.MessageSecurityMode.SignAndEncrypt,
178
                                                           self.certificate,
179
                                                           self.private_key)
180
                                 )
181
            self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
182
                                ua.MessageSecurityMode.Sign)
183
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
184
                                                           ua.MessageSecurityMode.Sign,
185
                                                           self.certificate,
186
                                                           self.private_key)
187
                                 )
188
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
189
                                ua.MessageSecurityMode.SignAndEncrypt)
190
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
191
                                                           ua.MessageSecurityMode.SignAndEncrypt,
192
                                                           self.certificate,
193
                                                           self.private_key)
194
                                 )
195
            self._set_endpoints(security_policies.SecurityPolicyBasic256,
196
                                ua.MessageSecurityMode.Sign)
197
            self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
198
                                                           ua.MessageSecurityMode.Sign,
199
                                                           self.certificate,
200
                                                           self.private_key)
201
                                 )
202
203
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
204 1
        idtoken = ua.UserTokenPolicy()
205 1
        idtoken.PolicyId = 'anonymous'
206 1
        idtoken.TokenType = ua.UserTokenType.Anonymous
207 1
208
        idtoken2 = ua.UserTokenPolicy()
209 1
        idtoken2.PolicyId = 'certificate_basic256'
210 1
        idtoken2.TokenType = ua.UserTokenType.Certificate
211 1
212
        idtoken3 = ua.UserTokenPolicy()
213 1
        idtoken3.PolicyId = 'certificate_basic128'
214 1
        idtoken3.TokenType = ua.UserTokenType.Certificate
215 1
216
        idtoken4 = ua.UserTokenPolicy()
217 1
        idtoken4.PolicyId = 'username'
218 1
        idtoken4.TokenType = ua.UserTokenType.UserName
219 1
220
        appdesc = ua.ApplicationDescription()
221 1
        appdesc.ApplicationName = ua.LocalizedText(self.name)
222 1
        appdesc.ApplicationUri = self.application_uri
223 1
        appdesc.ApplicationType = self.application_type
224 1
        appdesc.ProductUri = self.product_uri
225 1
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
226 1
227
        edp = ua.EndpointDescription()
228 1
        edp.EndpointUrl = self.endpoint.geturl()
229 1
        edp.Server = appdesc
230 1
        if self.certificate:
231 1
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
232
        edp.SecurityMode = mode
233 1
        edp.SecurityPolicyUri = policy.URI
234 1
        edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
235 1
        edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
236 1
        edp.SecurityLevel = 0
237 1
        self.iserver.add_endpoint(edp)
238 1
239
    def set_server_name(self, name):
240 1
        self.name = name
241
242
    def start(self):
243 1
        """
244
        Start to listen on network
245
        """
246
        self._setup_server_nodes()
247 1
        self.iserver.start()
248 1
        self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
249 1
        self.bserver.set_policies(self._policies)
250 1
        self.bserver.start()
251 1
252
    def stop(self):
253 1
        """
254
        Stop server
255
        """
256
        for client in self._discovery_clients.values():
257 1
            client.disconnect()
258 1
        self.bserver.stop()
259 1
        self.iserver.stop()
260 1
261
    def get_root_node(self):
262 1
        """
263
        Get Root node of server. Returns a Node object.
264
        """
265
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
266 1
267
    def get_objects_node(self):
268 1
        """
269
        Get Objects node of server. Returns a Node object.
270
        """
271
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
272 1
273
    def get_server_node(self):
274 1
        """
275
        Get Server node of server. Returns a Node object.
276
        """
277
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
278 1
279
    def get_node(self, nodeid):
280 1
        """
281
        Get a specific node using NodeId object or a string representing a NodeId
282
        """
283
        return Node(self.iserver.isession, nodeid)
284 1
285
    def create_subscription(self, period, handler):
286 1
        """
287
        Create a subscription.
288
        returns a Subscription object which allow
289
        to subscribe to events or data on server
290
        """
291
        params = ua.CreateSubscriptionParameters()
292 1
        params.RequestedPublishingInterval = period
293 1
        params.RequestedLifetimeCount = 3000
294 1
        params.RequestedMaxKeepAliveCount = 10000
295 1
        params.MaxNotificationsPerPublish = 0
296 1
        params.PublishingEnabled = True
297 1
        params.Priority = 0
298 1
        return Subscription(self.iserver.isession, params, handler)
299 1
300
    def get_namespace_array(self):
301 1
        """
302
        get all namespace defined in server
303
        """
304
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
305 1
        return ns_node.get_value()
306 1
307
    def register_namespace(self, uri):
308 1
        """
309
        Register a new namespace. Nodes should in custom namespace, not 0.
310
        """
311
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
312 1
        uries = ns_node.get_value()
313 1
        uries.append(uri)
314 1
        ns_node.set_value(uries)
315 1
        return len(uries) - 1
316 1
317
    def get_namespace_index(self, uri):
318 1
        """
319
        get index of a namespace using its uri
320
        """
321
        uries = self.get_namespace_array()
322 1
        return uries.index(uri)
323 1
324
    def get_event_object(self, etype=ua.ObjectIds.BaseEventType):
325 1
        """
326
        Returns an event object using an event type from address space.
327
        Use this object to fire events
328
        """
329
        node = None
330
331
        if isinstance(etype, ua.BaseEvent):
332 1
            event = etype
333
            source = Node(self.iserver.isession, event.SourceNode)
334
            if event.SourceNode.Identifier:
335
                event.SourceName = source.get_display_name().Text
336 1
                source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
337 1
        elif isinstance(etype, Node):
338
            node = etype
339 1
        elif isinstance(etype, ua.NodeId):
340 1
            node = Node(self.iserver.isession, etype)
341
        else:
342 1
            node = Node(self.iserver.isession, ua.NodeId(etype))
343
344
        if node:
345 1
            event = ua.Event()
346
            references = node.get_children_descriptions(refs=ua.ObjectIds.HasProperty)
347
            for desc in references:
348
                node = Node(self.iserver.isession, desc.NodeId)
349
                setattr(event, desc.BrowseName.Name, node.get_value())
350
351
        return event
352
353
    def trigger_event(self, event):
354
        self.iserver.isession.subscription_service.trigger_event(event)
355
356
    def import_xml(self, path):
357
        """
358
        import nodes defined in xml
359
        """
360
        importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
361
        importer.import_xml(path)
362
363
    def delete_nodes(self, nodes, recursive=False):
364
        return delete_nodes(self.iserver.isession, nodes, recursive)
365
366
    def historize_node(self, node):
367
        self.iserver.enable_history(node)
368
369
    def dehistorize_node(self, node):
370
        self.iserver.disable_history(node)
371