Completed
Pull Request — master (#353)
by
unknown
03:45
created

InternalServer   B

Complexity

Total Complexity 37

Size/Duplication

Total Lines 195
Duplicated Lines 0 %

Test Coverage

Coverage 97.16%

Importance

Changes 4
Bugs 2 Features 0
Metric Value
dl 0
loc 195
rs 8.6
c 4
b 2
f 0
ccs 137
cts 141
cp 0.9716
wmc 37

21 Methods

Rating   Name   Duplication   Size   Complexity  
A setup_nodes() 0 7 1
A register_server() 0 11 1
A register_server2() 0 2 1
A get_endpoints() 0 13 3
A disable_history_event() 0 6 1
B find_servers() 0 12 6
A subscribe_server_callback() 0 5 1
A add_endpoint() 0 2 1
A load_address_space() 0 5 1
A unsubscribe_server_callback() 0 5 1
B __init__() 0 30 1
A dump_address_space() 0 5 1
A start() 0 9 3
A _set_current_time() 0 3 1
A disable_history_data_change() 0 8 1
A stop() 0 5 1
A get_new_channel_id() 0 3 1
A enable_history_data_change() 0 8 1
A enable_history_event() 0 13 3
B load_standard_address_space() 0 19 6
A create_session() 0 2 1
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5 1
6 1
from datetime import datetime
7 1
from copy import copy, deepcopy
8 1
from datetime import timedelta
9 1
from os import path
10 1
import logging
11 1
from threading import Lock
12 1
from enum import Enum
13
try:
14
    from urllib.parse import urlparse
15
except ImportError:
16
    from urlparse import urlparse
17 1
18 1
19 1
from opcua import ua
20 1
from opcua.common import utils
21 1
from opcua.common.callback import (CallbackType, ServerItemCallback,
22 1
                                   CallbackDispatcher)
23 1
from opcua.common.node import Node
24 1
from opcua.server.history import HistoryManager
25 1
from opcua.server.address_space import AddressSpace
26 1
from opcua.server.address_space import AttributeService
27 1
from opcua.server.address_space import ViewService
28 1
from opcua.server.address_space import NodeManagementService
29 1
from opcua.server.address_space import MethodService
30
from opcua.server.subscription_service import SubscriptionService
31
from opcua.server.standard_address_space import standard_address_space
32 1
from opcua.server.users import User
33 1
from opcua.common import xmlimporter
34 1
35 1
36
class SessionState(Enum):
37
    Created = 0
38 1
    Activated = 1
39 1
    Closed = 2
40 1
41 1
42
class ServerDesc(object):
43
    def __init__(self, serv, cap=None):
44 1
        self.Server = serv
45
        self.Capabilities = cap
46 1
47 1
48 1
class InternalServer(object):
49 1
50 1
    def __init__(self, cachefile=None):
51 1
        self.logger = logging.getLogger(__name__)
52 1
53
        self.server_callback_dispatcher = CallbackDispatcher()
54 1
55 1
        self.endpoints = []
56 1
        self._channel_id_counter = 5
57 1
        self.allow_remote_admin = True
58 1
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
59
        self._known_servers = {}  # used if we are a discovery server
60 1
61
        self.aspace = AddressSpace()
62
        self.attribute_service = AttributeService(self.aspace)
63
        self.view_service = ViewService(self.aspace)
64
        self.method_service = MethodService(self.aspace)
65
        self.node_mgt_service = NodeManagementService(self.aspace)
66
67
        self.load_standard_address_space(cachefile)
68 1
69 1
        self.loop = utils.ThreadLoop()
70 1
        self.asyncio_transports = []
71
        self.subscription_service = SubscriptionService(self.loop, self.aspace)
72 1
73
        self.history_manager = HistoryManager(self)
74
75 1
        # create a session to use on server side
76 1
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
77 1
78 1
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
79 1
        self.setup_nodes()
80
81 1
    def setup_nodes(self):
82
        """
83
        Set up some nodes as defined by spec
84 1
        """
85
        uries = ["http://opcfoundation.org/UA/"]
86
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
87 1
        ns_node.set_value(uries)
88 1
89 1
    def load_standard_address_space(self, cachefile=None):
90 1
        # check for a cache file, in windows the file extension is also needed for the check
91 1
        cachefile_win = cachefile
92 1
        if cachefile_win:
93 1
            cachefile_win += ".dat"
94 1
95 1
        if cachefile and (path.isfile(cachefile) or path.isfile(cachefile_win)):
96
            # import address space from shelve
97 1
            self.aspace.load_cache(cachefile)
98 1
        else:
99 1
            # import address space from code generated from xml
100 1
            standard_address_space.fill_address_space(self.node_mgt_service)
101
            # import address space directly from xml, this has performance impact so disabled
102 1
            # importer = xmlimporter.XmlImporter(self.node_mgt_service)
103 1
            # importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml", self)
104 1
105
            # if a cache file was supplied a shelve of the standard address space can now be built for next start up
106 1
            if cachefile:
107 1
                self.aspace.make_cache(cachefile)
108 1
109
    def load_address_space(self, path):
110 1
        """
111 1
        Load address space from path
112
        """
113 1
        self.aspace.load(path)
114 1
115 1
    def dump_address_space(self, path):
116
        """
117 1
        Dump current address space to path
118 1
        """
119 1
        self.aspace.dump(path)
120 1
121 1
    def start(self):
122 1
        self.logger.info("starting internal server")
123 1
        for edp in self.endpoints:
124 1
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
125 1
        self.loop.start()
126
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
127 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
128 1
        if not self.disabled_clock:
129 1
            self._set_current_time()
130 1
131 1
    def stop(self):
132 1
        self.logger.info("stopping internal server")
133 1
        self.isession.close_session()
134 1
        self.loop.stop()
135 1
        self.history_manager.stop()
136 1
137 1
    def _set_current_time(self):
138 1
        self.current_time_node.set_value(datetime.utcnow())
139
        self.loop.call_later(1, self._set_current_time)
140 1
141 1
    def get_new_channel_id(self):
142 1
        self._channel_id_counter += 1
143 1
        return self._channel_id_counter
144 1
145 1
    def add_endpoint(self, endpoint):
146 1
        self.endpoints.append(endpoint)
147 1
148 1
    def get_endpoints(self, params=None, sockname=None):
149
        self.logger.info("get endpoint")
150 1
        if sockname:
151
            # return to client the ip address it has access to
152
            edps = []
153 1
            for edp in self.endpoints:
154 1
                edp1 = copy(edp)
155
                url = urlparse(edp1.EndpointUrl)
156 1
                url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
157
                edp1.EndpointUrl = url.geturl()
158
                edps.append(edp1)
159
            return edps
160 1
        return self.endpoints[:]
161 1
162 1
    def find_servers(self, params):
163 1
        if not params.ServerUris:
164
            return [desc.Server for desc in self._known_servers.values()]
165 1
        servers = []
166
        for serv in self._known_servers.values():
167
            serv_uri = serv.Server.ApplicationUri.split(":")
168
            for uri in params.ServerUris:
169 1
                uri = uri.split(":")
170 1
                if serv_uri[:len(uri)] == uri:
171 1
                    servers.append(serv.Server)
172 1
                    break
173
        return servers
174 1
175
    def register_server(self, server, conf=None):
176
        appdesc = ua.ApplicationDescription()
177
        appdesc.ApplicationUri = server.ServerUri
178
        appdesc.ProductUri = server.ProductUri
179 1
        # FIXME: select name from client locale
180 1
        appdesc.ApplicationName = server.ServerNames[0]
181
        appdesc.ApplicationType = server.ServerType
182 1
        appdesc.DiscoveryUrls = server.DiscoveryUrls
183
        # FIXME: select discovery uri using reachability from client network
184 1
        appdesc.GatewayServerUri = server.GatewayServerUri
185
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
186 1
187
    def register_server2(self, params):
188
        return self.register_server(params.Server, params.DiscoveryConfiguration)
189
190 1
    def create_session(self, name, user=User.Anonymous, external=False):
191 1
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
192
193
    def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
194 1
        """
195 1
        Set attribute Historizing of node to True and start storing data for history
196 1
        """
197
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
198 1
        node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
199 1
        node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
200 1
        self.history_manager.historize_data_change(node, period, count)
201 1
202 1
    def disable_history_data_change(self, node):
203 1
        """
204 1
        Set attribute Historizing of node to False and stop storing data for history
205 1
        """
206 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
207 1
        node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
208 1
        node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
209 1
        self.history_manager.dehistorize(node)
210 1
211 1
    def enable_history_event(self, source, period=timedelta(days=7), count=0):
212 1
        """
213 1
        Set attribute History Read of object events to True and start storing data for history
214 1
        """
215
        event_notifier = source.get_event_notifier()
216 1
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
217
            raise ua.UaError("Node does not generate events", event_notifier)
218
219 1
        if ua.EventNotifier.HistoryRead not in event_notifier:
220 1
            event_notifier.append(ua.EventNotifier.HistoryRead)
221
            source.set_event_notifier(event_notifier)
222 1
223 1
        self.history_manager.historize_event(source, period, count)
224
225 1
    def disable_history_event(self, source):
226 1
        """
227 1
        Set attribute History Read of node to False and stop storing data for history
228 1
        """
229 1
        source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
230 1
        self.history_manager.dehistorize(source)
231 1
232 1
    def subscribe_server_callback(self, event, handle):
233
        """
234 1
        Create a subscription from event to handle
235
        """
236 1
        self.server_callback_dispatcher.addListener(event, handle)
237 1
238 1
    def unsubscribe_server_callback(self, event, handle):
239 1
        """
240
        Remove a subscription from event to handle
241 1
        """
242 1
        self.server_callback_dispatcher.removeListener(event, handle)
243 1
244 1
245
class InternalSession(object):
246 1
    _counter = 10
247 1
    _auth_counter = 1000
248 1
249
    def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
250 1
        self.logger = logging.getLogger(__name__)
251 1
        self.iserver = internal_server
252 1
        self.external = external  # define if session is external, we need to copy some objects if it is internal
253 1
        self.aspace = aspace
254 1
        self.subscription_service = submgr
255 1
        self.name = name
256 1
        self.user = user
257
        self.nonce = None
258 1
        self.state = SessionState.Created
259 1
        self.session_id = ua.NodeId(self._counter)
260 1
        InternalSession._counter += 1
261 1
        self.authentication_token = ua.NodeId(self._auth_counter)
262 1
        InternalSession._auth_counter += 1
263
        self.subscriptions = []
264 1
        self.logger.info("Created internal session %s", self.name)
265 1
        self._lock = Lock()
266
267 1
    def __str__(self):
268 1
        return "InternalSession(name:{}, user:{}, id:{}, auth_token:{})".format(
269
            self.name, self.user, self.session_id, self.authentication_token)
270
271 1
    def get_endpoints(self, params=None, sockname=None):
272 1
        return self.iserver.get_endpoints(params, sockname)
273
274 1
    def create_session(self, params, sockname=None):
275 1
        self.logger.info("Create session request")
276
277 1
        result = ua.CreateSessionResult()
278 1
        result.SessionId = self.session_id
279
        result.AuthenticationToken = self.authentication_token
280 1
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
281 1
        result.MaxRequestMessageSize = 65536
282
        self.nonce = utils.create_nonce(32)
283 1
        result.ServerNonce = self.nonce
284 1
        result.ServerEndpoints = self.get_endpoints(sockname=sockname)
285
286 1
        return result
287 1
288
    def close_session(self, delete_subs=True):
289 1
        self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
290
        self.state = SessionState.Closed
291
        self.delete_subscriptions(self.subscriptions[:])
292 1
293 1
    def activate_session(self, params):
294
        self.logger.info("activate session")
295 1
        result = ua.ActivateSessionResult()
296 1
        if self.state != SessionState.Created:
297
            raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
298 1
        self.nonce = utils.create_nonce(32)
299 1
        result.ServerNonce = self.nonce
300 1
        for _ in params.ClientSoftwareCertificates:
301 1
            result.Results.append(ua.StatusCode())
302 1
        self.state = SessionState.Activated
303
        id_token = params.UserIdentityToken
304 1
        if isinstance(id_token, ua.UserNameIdentityToken):
305 1
            if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
306
                self.user = User.Admin
307 1
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
308
        return result
309
310 1
    def read(self, params):
311
        results = self.iserver.attribute_service.read(params)
312
        if self.external:
313 1
            return results
314 1
        return [deepcopy(dv) for dv in results]
315 1
316 1
    def history_read(self, params):
317 1
        return self.iserver.history_manager.read_history(params)
318 1
319
    def write(self, params):
320 1
        if not self.external:
321 1
            # If session is internal we need to store a copy og object, not a reference,
322
            # otherwise users may change it and we will not generate expected events
323 1
            params.NodesToWrite = [deepcopy(ntw) for ntw in params.NodesToWrite]
324 1
        return self.iserver.attribute_service.write(params, self.user)
325 1
326 1
    def browse(self, params):
327
        return self.iserver.view_service.browse(params)
328
329
    def translate_browsepaths_to_nodeids(self, params):
330
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
331
332
    def add_nodes(self, params):
333
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
334
335
    def delete_nodes(self, params):
336
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
337
338
    def add_references(self, params):
339
        return self.iserver.node_mgt_service.add_references(params, self.user)
340
341
    def delete_references(self, params):
342
        return self.iserver.node_mgt_service.delete_references(params, self.user)
343
344
    def add_method_callback(self, methodid, callback):
345
        return self.aspace.add_method_callback(methodid, callback)
346
347
    def call(self, params):
348
        return self.iserver.method_service.call(params)
349
350
    def create_subscription(self, params, callback):
351
        result = self.subscription_service.create_subscription(params, callback)
352
        with self._lock:
353
            self.subscriptions.append(result.SubscriptionId)
354
        return result
355
356
    def create_monitored_items(self, params):
357
        subscription_result = self.subscription_service.create_monitored_items(params)
358
        self.iserver.server_callback_dispatcher.dispatch(
359
            CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
360
        return subscription_result
361
362
    def modify_monitored_items(self, params):
363
        subscription_result = self.subscription_service.modify_monitored_items(params)
364
        self.iserver.server_callback_dispatcher.dispatch(
365
            CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
366
        return subscription_result
367
368
    def republish(self, params):
369
        return self.subscription_service.republish(params)
370
371
    def delete_subscriptions(self, ids):
372
        for i in ids:
373
            with self._lock:
374
                if i in self.subscriptions:
375
                    self.subscriptions.remove(i)
376
        return self.subscription_service.delete_subscriptions(ids)
377
378
    def delete_monitored_items(self, params):
379
        subscription_result = self.subscription_service.delete_monitored_items(params)
380
        self.iserver.server_callback_dispatcher.dispatch(
381
            CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
382
        return subscription_result
383
384
    def publish(self, acks=None):
385
        if acks is None:
386
            acks = []
387
        return self.subscription_service.publish(acks)
388