Completed
Pull Request — master (#638)
by
unknown
04:15
created

InternalServer.find_servers()   B

Complexity

Conditions 6

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
c 0
b 0
f 0
dl 0
loc 12
ccs 12
cts 12
cp 1
crap 6
rs 8.6666
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6 1
from datetime import datetime, timedelta
7 1
from copy import copy, deepcopy
8 1
import os
9 1
import logging
10 1
from threading import Lock
11 1
from enum import Enum
12 1
try:
13 1
    from urllib.parse import urlparse
14
except ImportError:
15
    from urlparse import urlparse
16
17
18 1
from opcua import ua
19 1
from opcua.common import utils
20 1
from opcua.common.callback import (CallbackType, ServerItemCallback,
21
                                   CallbackDispatcher)
22 1
from opcua.common.node import Node
23 1
from opcua.server.history import HistoryManager
24 1
from opcua.server.address_space import AddressSpace
25 1
from opcua.server.address_space import AttributeService
26 1
from opcua.server.address_space import ViewService
27 1
from opcua.server.address_space import NodeManagementService
28 1
from opcua.server.address_space import MethodService
29 1
from opcua.server.subscription_service import SubscriptionService
30 1
from opcua.server.standard_address_space import standard_address_space
31 1
from opcua.server.users import User
32
#from opcua.common import xmlimporter
33
34
35 1
class SessionState(Enum):
36 1
    Created = 0
37 1
    Activated = 1
38 1
    Closed = 2
39
40
41 1
class ServerDesc(object):
42 1
    def __init__(self, serv, cap=None):
43 1
        self.Server = serv
44 1
        self.Capabilities = cap
45
46
47 1
class InternalServer(object):
48
49 1
    def __init__(self, shelffile=None):
50 1
        self.logger = logging.getLogger(__name__)
51
52 1
        self.server_callback_dispatcher = CallbackDispatcher()
53
54 1
        self.endpoints = []
55 1
        self._channel_id_counter = 5
56 1
        self.allow_remote_admin = True
57 1
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
58 1
        self._known_servers = {}  # used if we are a discovery server
59
60 1
        self.aspace = AddressSpace()
61 1
        self.attribute_service = AttributeService(self.aspace)
62 1
        self.view_service = ViewService(self.aspace)
63 1
        self.method_service = MethodService(self.aspace)
64 1
        self.node_mgt_service = NodeManagementService(self.aspace)
65
66 1
        self.load_standard_address_space(shelffile)
67
68 1
        self.loop = None
69 1
        self.asyncio_transports = []
70 1
        self.subscription_service = SubscriptionService(self.aspace)
71
72 1
        self.history_manager = HistoryManager(self)
73
74
        # create a session to use on server side
75 1
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
76
77 1
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
78 1
        self._address_space_fixes()
79 1
        self.setup_nodes()
80
81 1
    def setup_nodes(self):
82
        """
83
        Set up some nodes as defined by spec
84
        """
85 1
        uries = ["http://opcfoundation.org/UA/"]
86 1
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
87 1
        ns_node.set_value(uries)
88
89 1
    def load_standard_address_space(self, shelffile=None):
90 1
        if shelffile is not None and os.path.isfile(shelffile):
91
            # import address space from shelf
92
            self.aspace.load_aspace_shelf(shelffile)
93
        else:
94
            # import address space from code generated from xml
95 1
            standard_address_space.fill_address_space(self.node_mgt_service)
96
            # import address space directly from xml, this has performance impact so disabled
97
            # importer = xmlimporter.XmlImporter(self.node_mgt_service)
98
            # importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml", self)
99
100
            # if a cache file was supplied a shelve of the standard address space can now be built for next start up
101 1
            if shelffile:
102
                self.aspace.make_aspace_shelf(shelffile)
103
104 1
    def _address_space_fixes(self):
105
        """
106
        Looks like the xml definition of address space has some error. This is a good place to fix them
107
        """
108
109 1
        it = ua.AddReferencesItem()
110 1
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
111 1
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
112 1
        it.IsForward = False
113 1
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
114 1
        it.TargetNodeClass = ua.NodeClass.Object
115
116 1
        results = self.isession.add_references([it])
117
 
118 1
    def load_address_space(self, path):
119
        """
120
        Load address space from path
121
        """
122
        self.aspace.load(path)
123
124 1
    def dump_address_space(self, path):
125
        """
126
        Dump current address space to path
127
        """
128
        self.aspace.dump(path)
129
130 1
    def start(self):
131 1
        self.logger.info("starting internal server")
132 1
        for edp in self.endpoints:
133 1
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
134 1
        self.loop = utils.ThreadLoop()
135 1
        self.loop.start()
136 1
        self.subscription_service.set_loop(self.loop)
137 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
138 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
139 1
        if not self.disabled_clock:
140 1
            self._set_current_time()
141
142 1
    def stop(self):
143 1
        self.logger.info("stopping internal server")
144 1
        self.isession.close_session()
145 1
        if self.loop:
146 1
            self.loop.stop()
147 1
            self.loop = None
148 1
        self.subscription_service.set_loop(None)
149 1
        self.history_manager.stop()
150
151 1
    def _set_current_time(self):
152 1
        self.current_time_node.set_value(datetime.utcnow())
153 1
        self.loop.call_later(1, self._set_current_time)
154
155 1
    def get_new_channel_id(self):
156 1
        self._channel_id_counter += 1
157 1
        return self._channel_id_counter
158
159 1
    def add_endpoint(self, endpoint):
160 1
        self.endpoints.append(endpoint)
161
162 1
    def get_endpoints(self, params=None, sockname=None):
163 1
        self.logger.info("get endpoint")
164 1
        if sockname:
165
            # return to client the ip address it has access to
166 1
            edps = []
167 1
            for edp in self.endpoints:
168 1
                edp1 = copy(edp)
169 1
                url = urlparse(edp1.EndpointUrl)
170 1
                url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
171 1
                edp1.EndpointUrl = url.geturl()
172 1
                edps.append(edp1)
173 1
            return edps
174 1
        return self.endpoints[:]
175
176 1
    def find_servers(self, params):
177 1
        if not params.ServerUris:
178 1
            return [desc.Server for desc in self._known_servers.values()]
179 1
        servers = []
180 1
        for serv in self._known_servers.values():
181 1
            serv_uri = serv.Server.ApplicationUri.split(":")
182 1
            for uri in params.ServerUris:
183 1
                uri = uri.split(":")
184 1
                if serv_uri[:len(uri)] == uri:
185 1
                    servers.append(serv.Server)
186 1
                    break
187 1
        return servers
188
189 1
    def register_server(self, server, conf=None):
190 1
        appdesc = ua.ApplicationDescription()
191 1
        appdesc.ApplicationUri = server.ServerUri
192 1
        appdesc.ProductUri = server.ProductUri
193
        # FIXME: select name from client locale
194 1
        appdesc.ApplicationName = server.ServerNames[0]
195 1
        appdesc.ApplicationType = server.ServerType
196 1
        appdesc.DiscoveryUrls = server.DiscoveryUrls
197
        # FIXME: select discovery uri using reachability from client network
198 1
        appdesc.GatewayServerUri = server.GatewayServerUri
199 1
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
200
201 1
    def register_server2(self, params):
202
        return self.register_server(params.Server, params.DiscoveryConfiguration)
203
204 1
    def create_session(self, name, user=User.Anonymous, external=False):
205 1
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
206
207 1
    def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
208
        """
209
        Set attribute Historizing of node to True and start storing data for history
210
        """
211 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
212 1
        node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
213 1
        node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
214 1
        self.history_manager.historize_data_change(node, period, count)
215
216 1
    def disable_history_data_change(self, node):
217
        """
218
        Set attribute Historizing of node to False and stop storing data for history
219
        """
220 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
221 1
        node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
222 1
        node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
223 1
        self.history_manager.dehistorize(node)
224
225 1
    def enable_history_event(self, source, period=timedelta(days=7), count=0):
226
        """
227
        Set attribute History Read of object events to True and start storing data for history
228
        """
229 1
        event_notifier = source.get_event_notifier()
230 1
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
231
            raise ua.UaError("Node does not generate events", event_notifier)
232
233 1
        if ua.EventNotifier.HistoryRead not in event_notifier:
234 1
            event_notifier.add(ua.EventNotifier.HistoryRead)
235 1
            source.set_event_notifier(event_notifier)
236
237 1
        self.history_manager.historize_event(source, period, count)
238
239 1
    def disable_history_event(self, source):
240
        """
241
        Set attribute History Read of node to False and stop storing data for history
242
        """
243 1
        source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
244 1
        self.history_manager.dehistorize(source)
245
246 1
    def subscribe_server_callback(self, event, handle):
247
        """
248
        Create a subscription from event to handle
249
        """
250
        self.server_callback_dispatcher.addListener(event, handle)
251
252 1
    def unsubscribe_server_callback(self, event, handle):
253
        """
254
        Remove a subscription from event to handle
255
        """
256
        self.server_callback_dispatcher.removeListener(event, handle)
257
258
259 1
class InternalSession(object):
260 1
    _counter = 10
261 1
    _auth_counter = 1000
262
263 1
    def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
264 1
        self.logger = logging.getLogger(__name__)
265 1
        self.iserver = internal_server
266 1
        self.external = external  # define if session is external, we need to copy some objects if it is internal
267 1
        self.aspace = aspace
268 1
        self.subscription_service = submgr
269 1
        self.name = name
270 1
        self.user = user
271 1
        self.nonce = None
272 1
        self.state = SessionState.Created
273 1
        self.session_id = ua.NodeId(self._counter)
274 1
        InternalSession._counter += 1
275 1
        self.authentication_token = ua.NodeId(self._auth_counter)
276 1
        InternalSession._auth_counter += 1
277 1
        self.subscriptions = []
278 1
        self.logger.info("Created internal session %s", self.name)
279 1
        self._lock = Lock()
280
281 1
    def __str__(self):
282
        return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
283
            self.name, self.user, self.session_id, self.authentication_token)
284
285 1
    def get_endpoints(self, params=None, sockname=None):
286 1
        return self.iserver.get_endpoints(params, sockname)
287
288 1
    def create_session(self, params, sockname=None):
289 1
        self.logger.info("Create session request")
290
291 1
        result = ua.CreateSessionResult()
292 1
        result.SessionId = self.session_id
293 1
        result.AuthenticationToken = self.authentication_token
294 1
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
295 1
        result.MaxRequestMessageSize = 65536
296 1
        self.nonce = utils.create_nonce(32)
297 1
        result.ServerNonce = self.nonce
298 1
        result.ServerEndpoints = self.get_endpoints(sockname=sockname)
299
300 1
        return result
301
302 1
    def close_session(self, delete_subs=True):
303 1
        self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
304 1
        self.state = SessionState.Closed
305 1
        self.delete_subscriptions(self.subscriptions[:])
306
307 1
    def activate_session(self, params):
308 1
        self.logger.info("activate session")
309 1
        result = ua.ActivateSessionResult()
310 1
        if self.state != SessionState.Created:
311
            raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
312 1
        self.nonce = utils.create_nonce(32)
313 1
        result.ServerNonce = self.nonce
314 1
        for _ in params.ClientSoftwareCertificates:
315
            result.Results.append(ua.StatusCode())
316 1
        self.state = SessionState.Activated
317 1
        id_token = params.UserIdentityToken
318 1
        if isinstance(id_token, ua.UserNameIdentityToken):
319 1
            if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
320 1
                self.user = User.Admin
321 1
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
322 1
        return result
323
324 1
    def read(self, params):
325 1
        results = self.iserver.attribute_service.read(params)
326 1
        return results
327
328 1
    def history_read(self, params):
329 1
        return self.iserver.history_manager.read_history(params)
330
331 1
    def write(self, params):
332 1
        return self.iserver.attribute_service.write(params, self.user)
333
334 1
    def browse(self, params):
335 1
        return self.iserver.view_service.browse(params)
336
337 1
    def translate_browsepaths_to_nodeids(self, params):
338 1
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
339
340 1
    def add_nodes(self, params):
341 1
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
342
343 1
    def delete_nodes(self, params):
344 1
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
345
346 1
    def add_references(self, params):
347 1
        return self.iserver.node_mgt_service.add_references(params, self.user)
348
349 1
    def delete_references(self, params):
350 1
        return self.iserver.node_mgt_service.delete_references(params, self.user)
351
352 1
    def add_method_callback(self, methodid, callback):
353 1
        return self.aspace.add_method_callback(methodid, callback)
354
355 1
    def call(self, params):
356 1
        return self.iserver.method_service.call(params)
357
358 1
    def create_subscription(self, params, callback):
359 1
        result = self.subscription_service.create_subscription(params, callback)
360 1
        with self._lock:
361 1
            self.subscriptions.append(result.SubscriptionId)
362 1
        return result
363
364 1
    def modify_subscription(self, params, callback):
365
        return self.subscription_service.modify_subscription(params, callback)
366
367 1
    def create_monitored_items(self, params):
368 1
        subscription_result = self.subscription_service.create_monitored_items(params)
369 1
        self.iserver.server_callback_dispatcher.dispatch(
370
            CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
371 1
        return subscription_result
372
373 1
    def modify_monitored_items(self, params):
374
        subscription_result = self.subscription_service.modify_monitored_items(params)
375
        self.iserver.server_callback_dispatcher.dispatch(
376
            CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
377
        return subscription_result
378
379 1
    def republish(self, params):
380
        return self.subscription_service.republish(params)
381
382 1
    def delete_subscriptions(self, ids):
383 1
        for i in ids:
384 1
            with self._lock:
385 1
                if i in self.subscriptions:
386 1
                    self.subscriptions.remove(i)
387 1
        return self.subscription_service.delete_subscriptions(ids)
388
389 1
    def delete_monitored_items(self, params):
390 1
        subscription_result = self.subscription_service.delete_monitored_items(params)
391 1
        self.iserver.server_callback_dispatcher.dispatch(
392
            CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
393 1
        return subscription_result
394
395 1
    def publish(self, acks=None):
396 1
        if acks is None:
397 1
            acks = []
398
        return self.subscription_service.publish(acks)
399