Completed
Pull Request — master (#232)
by
unknown
03:55
created

InternalServer.create_session()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 2
ccs 1
cts 1
cp 1
crap 1
rs 10
1
"""
2
Internal server implementing opcu-ua interface. can be used on server side or to implement binary/https opc-ua servers
3
"""
4
5 1
from datetime import datetime
6 1
from copy import copy, deepcopy
7 1
from datetime import timedelta
8 1
from os import path
9 1
import logging
10 1
from threading import Lock
11 1
from enum import Enum
12 1
try:
13
    from urllib.parse import urlparse
14
except ImportError:
15
    from urlparse import urlparse
16
17 1
18 1
from opcua import ua
19 1
from opcua.common import utils
20 1
from opcua.common.node import Node
21 1
from opcua.server.history import HistoryManager
22 1
from opcua.server.address_space import AddressSpace
23 1
from opcua.server.address_space import AttributeService
24 1
from opcua.server.address_space import ViewService
25 1
from opcua.server.address_space import NodeManagementService
26 1
from opcua.server.address_space import MethodService
27 1
from opcua.server.subscription_service import SubscriptionService
28 1
from opcua.server.standard_address_space import standard_address_space
29 1
from opcua.server.users import User
30
from opcua.common import xmlimporter
31
32 1
33 1
class SessionState(Enum):
34 1
    Created = 0
35 1
    Activated = 1
36
    Closed = 2
37
38 1
39 1
class ServerDesc(object):
40 1
    def __init__(self, serv, cap=None):
41 1
        self.Server = serv
42
        self.Capabilities = cap
43
44 1
45
class InternalServer(object):
46 1
47 1
    def __init__(self, cacheFile = None):
48 1
        self.logger = logging.getLogger(__name__)
49 1
        
50 1
        self.server_event_dispatcher = utils.EventDispatcher()
51 1
        
52 1
        self.endpoints = []
53
        self._channel_id_counter = 5
54 1
        self.allow_remote_admin = True
55 1
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
56 1
        self._known_servers = {}  # used if we are a discovery server
57 1
58 1
        self.aspace = AddressSpace()
59
        self.attribute_service = AttributeService(self.aspace)
60 1
        self.view_service = ViewService(self.aspace)
61
        self.method_service = MethodService(self.aspace)
62
        self.node_mgt_service = NodeManagementService(self.aspace)
63
64
        if cacheFile and path.isfile(cacheFile):
65
            # import address space from shelve
66
            self.aspace.load(cacheFile)
67
        else:
68 1
            # import address space from code generated from xml
69 1
            standard_address_space.fill_address_space(self.node_mgt_service)
70 1
            # import address space directly from xml, this has preformance impact so disabled
71
            #importer = xmlimporter.XmlImporter(self.node_mgt_service)
72 1
            #importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml")
73
74
            if cacheFile:
75 1
                self.aspace.dump(cacheFile)
76 1
77 1
        self.loop = utils.ThreadLoop()
78 1
        self.asyncio_transports = []
79 1
        self.subscription_service = SubscriptionService(self.loop, self.aspace)
80
81 1
        self.history_manager = HistoryManager(self)
82
83
        # create a session to use on server side
84 1
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
85
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
86
        uries = ["http://opcfoundation.org/UA/"]
87 1
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
88 1
        ns_node.set_value(uries)
89 1
        
90 1
        
91 1
92 1
    def load_address_space(self, path):
93 1
        self.aspace.load(path)
94 1
95 1
    def dump_address_space(self, path):
96
        self.aspace.dump(path)
97 1
98 1
    def start(self):
99 1
        self.logger.info("starting internal server")
100 1
        for edp in self.endpoints:
101
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
102 1
        self.loop.start()
103 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0)
104 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
105
        if not self.disabled_clock:
106 1
            self._set_current_time()
107 1
108 1
    def stop(self):
109
        self.logger.info("stopping internal server")
110 1
        self.loop.stop()
111 1
        self.history_manager.stop()
112
113 1
    def _set_current_time(self):
114 1
        self.current_time_node.set_value(datetime.utcnow())
115 1
        self.loop.call_later(1, self._set_current_time)
116
117 1
    def get_new_channel_id(self):
118 1
        self._channel_id_counter += 1
119 1
        return self._channel_id_counter
120 1
121 1
    def add_endpoint(self, endpoint):
122 1
        self.endpoints.append(endpoint)
123 1
124 1
    def get_endpoints(self, params=None, sockname=None):
125 1
        self.logger.info("get endpoint")
126
        if sockname:
127 1
            #return to client the ip address it has access to
128 1
            edps = []
129 1
            for edp in self.endpoints:
130 1
                edp1 = copy(edp)
131 1
                url = urlparse(edp1.EndpointUrl)
132 1
                url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
133 1
                edp1.EndpointUrl = url.geturl()
134 1
                edps.append(edp1)
135 1
            return edps
136 1
        return self.endpoints[:]
137 1
138 1
    def find_servers(self, params):
139
        if not params.ServerUris:
140 1
            return [desc.Server for desc in self._known_servers.values()]
141 1
        servers = []
142 1
        for serv in self._known_servers.values():
143 1
            serv_uri = serv.Server.ApplicationUri.split(":")
144 1
            for uri in params.ServerUris:
145 1
                uri = uri.split(":")
146 1
                if serv_uri[:len(uri)] == uri:
147 1
                    servers.append(serv.Server)
148 1
                    break
149
        return servers
150 1
151
    def register_server(self, server, conf=None):
152
        appdesc = ua.ApplicationDescription()
153 1
        appdesc.ApplicationUri = server.ServerUri
154 1
        appdesc.ProductUri = server.ProductUri
155
        appdesc.ApplicationName = server.ServerNames[0]  # FIXME: select name from client locale
156 1
        appdesc.ApplicationType = server.ServerType
157
        appdesc.GatewayServerUri = server.GatewayServerUri
158
        appdesc.DiscoveryUrls = server.DiscoveryUrls  # FIXME: select discovery uri using reachability from client network
159
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
160 1
161 1
    def register_server2(self, params):
162 1
        return self.register_server(params.Server, params.DiscoveryConfiguration)
163 1
164
    def create_session(self, name, user=User.Anonymous, external=False):
165 1
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
166
167
    def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
168
        """
169 1
        Set attribute Historizing of node to True and start storing data for history
170 1
        """
171 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
172 1
        node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
173
        node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
174 1
        self.history_manager.historize_data_change(node, period, count)
175
176
    def disable_history_data_change(self, node):
177
        """
178
        Set attribute Historizing of node to False and stop storing data for history
179 1
        """
180 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
181
        node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
182 1
        node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
183
        self.history_manager.dehistorize(node)
184 1
185
    def enable_history_event(self, source, period=timedelta(days=7), count=0):
186 1
        """
187
        Set attribute History Read of object events to True and start storing data for history
188
        """
189
        event_notifier = source.get_event_notifier()
190 1
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
191 1
            raise ua.UaError("Node does not generate events", event_notifier)
192
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
193
            event_notifier.append(ua.EventNotifier.HistoryRead)
194 1
            source.set_event_notifier(event_notifier)
195 1
        self.history_manager.historize_event(source, period, count)
196 1
197
    def disable_history_event(self, source):
198 1
        """
199 1
        Set attribute History Read of node to False and stop storing data for history
200 1
        """
201 1
        source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
202 1
        self.history_manager.dehistorize(source)
203 1
        
204 1
    def subscribe_server_event(self, event, handle):
205 1
        """
206 1
        Create a subscription from event to handle
207 1
        """
208 1
        self.server_event_dispatcher.addListener(event, handle)
209 1
        
210 1
    def unsubscribe_server_event(self, event, handle):
211 1
        """
212 1
        Remove a subscription from event to handle
213 1
        """
214 1
        self.server_event_dispatcher.removeListener(event, handle)
215
216 1
217
class InternalSession(object):
218
    _counter = 10
219 1
    _auth_counter = 1000
220 1
221
    def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
222 1
        self.logger = logging.getLogger(__name__)
223 1
        self.iserver = internal_server
224
        self.external = external  # define if session is external, we need to copy some objects if it is internal
225 1
        self.aspace = aspace
226 1
        self.subscription_service = submgr
227 1
        self.name = name
228 1
        self.user = user
229 1
        self.nonce = None
230 1
        self.state = SessionState.Created
231 1
        self.session_id = ua.NodeId(self._counter)
232 1
        InternalSession._counter += 1
233
        self.authentication_token = ua.NodeId(self._auth_counter)
234 1
        InternalSession._auth_counter += 1
235
        self.subscriptions = []
236 1
        self.logger.info("Created internal session %s", self.name)
237 1
        self._lock = Lock()
238 1
239 1
    def __str__(self):
240
        return "InternalSession(name:{}, user:{}, id:{}, auth_token:{})".format(self.name, self.user, self.session_id, self.authentication_token)
241 1
242 1
    def get_endpoints(self, params=None, sockname=None):
243 1
        return self.iserver.get_endpoints(params, sockname)
244 1
245
    def create_session(self, params, sockname=None):
246 1
        self.logger.info("Create session request")
247 1
248 1
        result = ua.CreateSessionResult()
249
        result.SessionId = self.session_id
250 1
        result.AuthenticationToken = self.authentication_token
251 1
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
252 1
        result.MaxRequestMessageSize = 65536
253 1
        self.nonce = utils.create_nonce(32)
254 1
        result.ServerNonce = self.nonce
255 1
        result.ServerEndpoints = self.get_endpoints(sockname=sockname)
256 1
257
        return result
258 1
259 1
    def close_session(self, delete_subs):
260 1
        self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
261 1
        self.state = SessionState.Closed
262 1
        self.delete_subscriptions(self.subscriptions[:])
263
264 1
    def activate_session(self, params):
265 1
        self.logger.info("activate session")
266
        result = ua.ActivateSessionResult()
267 1
        if self.state != SessionState.Created:
268 1
            raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
269
        self.nonce = utils.create_nonce(32)
270
        result.ServerNonce = self.nonce
271 1
        for _ in params.ClientSoftwareCertificates:
272 1
            result.Results.append(ua.StatusCode())
273
        self.state = SessionState.Activated
274 1
        id_token = params.UserIdentityToken
275 1
        if isinstance(id_token, ua.UserNameIdentityToken):
276
            if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
277 1
                self.user = User.Admin
278 1
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
279
        return result
280 1
281 1
    def read(self, params):
282
        results = self.iserver.attribute_service.read(params)
283 1
        if self.external:
284 1
            return results
285
        return [deepcopy(dv) for dv in results]
286 1
287 1
    def history_read(self, params):
288
        return self.iserver.history_manager.read_history(params)
289 1
290
    def write(self, params):
291
        if not self.external:
292 1
            # If session is internal we need to store a copy og object, not a reference,
293 1
            #otherwise users may change it and we will not generate expected events
294
            params.NodesToWrite = [deepcopy(ntw) for ntw in params.NodesToWrite]
295 1
        return self.iserver.attribute_service.write(params, self.user)
296 1
297
    def browse(self, params):
298 1
        return self.iserver.view_service.browse(params)
299 1
300 1
    def translate_browsepaths_to_nodeids(self, params):
301 1
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
302 1
303
    def add_nodes(self, params):
304 1
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
305 1
306
    def delete_nodes(self, params):
307 1
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
308
309
    def add_references(self, params):
310 1
        return self.iserver.node_mgt_service.add_references(params, self.user)
311
312
    def delete_references(self, params):
313 1
        return self.iserver.node_mgt_service.delete_references(params, self.user)
314 1
315 1
    def add_method_callback(self, methodid, callback):
316 1
        return self.aspace.add_method_callback(methodid, callback)
317 1
318 1
    def call(self, params):
319
        return self.iserver.method_service.call(params)
320 1
321 1
    def create_subscription(self, params, callback):
322
        result = self.subscription_service.create_subscription(params, callback)
323 1
        with self._lock:
324 1
            self.subscriptions.append(result.SubscriptionId)
325 1
        return result
326 1
327
    def create_monitored_items(self, params):
328
        subscription_result = self.subscription_service.create_monitored_items(params)
329
        self.iserver.server_event_dispatcher.dispatch('create_monitored_items', utils.ServerItemEvent(params, subscription_result))
330
        return subscription_result
331
332
    def modify_monitored_items(self, params):
333
        subscription_result = self.subscription_service.modify_monitored_items(params)
334
        self.iserver.server_event_dispatcher.dispatch('modify_monitored_items', utils.ServerItemEvent(params, subscription_result))
335
        return subscription_result
336
    
337
    def republish(self, params):
338
        return self.subscription_service.republish(params)
339
340
    def delete_subscriptions(self, ids):
341
        for i in ids:
342
            with self._lock:
343
                if i in self.subscriptions:
344
                    self.subscriptions.remove(i)
345
        return self.subscription_service.delete_subscriptions(ids)
346
347
    def delete_monitored_items(self, params):
348
        subscription_result = self.subscription_service.delete_monitored_items(params)
349
        self.iserver.server_event_dispatcher.dispatch('delete_monitored_items', utils.ServerItemEvent(params, subscription_result))
350
        return subscription_result
351
352
    def publish(self, acks=None):
353
        if acks is None:
354
            acks = []
355
        return self.subscription_service.publish(acks)
356