Completed
Pull Request — master (#140)
by Olivier
02:30
created

opcua.server.InternalServer.get_endpoints()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 3
Metric Value
cc 3
dl 0
loc 13
ccs 12
cts 12
cp 1
crap 3
rs 9.4285
1
"""
2
Internal server implementing opcu-ua interface. can be used on server side or to implement binary/https opc-ua servers
3
"""
4
5 1
from datetime import datetime
6 1
from datetime import timedelta
7 1
from copy import copy
8 1
import logging
9 1
from threading import Lock
10 1
from enum import Enum
11 1
try:
12 1
    from urllib.parse import urlparse
13
except ImportError:
14
    from urlparse import urlparse
15
16
17 1
from opcua import ua
18 1
from opcua.common import utils
19 1
from opcua.common.node import Node
20 1
from opcua.server.history import HistoryManager
21 1
from opcua.server.address_space import AddressSpace
22 1
from opcua.server.address_space import AttributeService
23 1
from opcua.server.address_space import ViewService
24 1
from opcua.server.address_space import NodeManagementService
25 1
from opcua.server.address_space import MethodService
26 1
from opcua.server.subscription_service import SubscriptionService
27 1
from opcua.server.standard_address_space import standard_address_space
28 1
from opcua.server.users import User
29 1
from opcua.common import xmlimporter
30
31
32 1
class SessionState(Enum):
33 1
    Created = 0
34 1
    Activated = 1
35 1
    Closed = 2
36
37
38 1
class ServerDesc(object):
39 1
    def __init__(self, serv, cap=None):
40 1
        self.Server = serv
41 1
        self.Capabilities = cap
42
43
44 1
class InternalServer(object):
45
46 1
    def __init__(self):
47 1
        self.logger = logging.getLogger(__name__)
48 1
        self.endpoints = []
49 1
        self._channel_id_counter = 5
50 1
        self.allow_remote_admin = True
51 1
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
52 1
        self._known_servers = {}  # used if we are a discovery server
53
54 1
        self.aspace = AddressSpace()
55 1
        self.attribute_service = AttributeService(self.aspace)
56 1
        self.view_service = ViewService(self.aspace)
57 1
        self.method_service = MethodService(self.aspace)
58 1
        self.node_mgt_service = NodeManagementService(self.aspace)
59
        # import address space from code generated from xml
60 1
        standard_address_space.fill_address_space(self.node_mgt_service)
61
        # import address space from save db to disc
62
        #standard_address_space.fill_address_space_from_disk(self.aspace)
63
64
        # import address space directly from xml, this has preformance impact so disabled
65
        #importer = xmlimporter.XmlImporter(self.node_mgt_service)
66
        #importer.import_xml("/home/olivier/python-opcua/schemas/Opc.Ua.NodeSet2.xml")
67
68 1
        self.loop = utils.ThreadLoop()
69 1
        self.subscription_service = SubscriptionService(self.loop, self.aspace)
70
71 1
        self.history_manager = HistoryManager(self)
72
73
        # create a session to use on server side
74 1
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
75 1
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
76 1
        uries = ["http://opcfoundation.org/UA/"]
77 1
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
78 1
        ns_node.set_value(uries)
79
80 1
    def load_address_space(self, path):
81
        self.aspace.load(path)
82
83 1
    def dump_address_space(self, path):
84
        self.aspace.dump(path)
85
86 1
    def start(self):
87 1
        self.logger.info("starting internal server")
88 1
        for edp in self.endpoints:
89 1
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
90 1
        self.loop.start()
91 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0)
92 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
93 1
        if not self.disabled_clock:
94 1
            self._set_current_time()
95
96 1
    def stop(self):
97 1
        self.logger.info("stopping internal server")
98 1
        self.loop.stop()
99
100 1
    def _set_current_time(self):
101 1
        self.current_time_node.set_value(datetime.utcnow())
102 1
        self.loop.call_later(1, self._set_current_time)
103
104 1
    def get_new_channel_id(self):
105 1
        self._channel_id_counter += 1
106 1
        return self._channel_id_counter
107
108 1
    def add_endpoint(self, endpoint):
109 1
        self.endpoints.append(endpoint)
110
111 1
    def get_endpoints(self, params=None, sockname=None):
112 1
        self.logger.info("get endpoint")
113 1
        if sockname:
114
            #return to client the ip address it has access to
115 1
            edps = []
116 1
            for edp in self.endpoints:
117 1
                edp1 = copy(edp)
118 1
                url = urlparse(edp1.EndpointUrl)
119 1
                url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
120 1
                edp1.EndpointUrl = url.geturl()
121 1
                edps.append(edp1)
122 1
            return edps
123 1
        return self.endpoints[:]
124
125 1
    def find_servers(self, params):
126 1
        if not params.ServerUris:
127 1
            return [desc.Server for desc in self._known_servers.values()]
128 1
        servers = []
129 1
        for serv in self._known_servers.values():
130 1
            serv_uri = serv.Server.ApplicationUri.split(":")
131 1
            for uri in params.ServerUris:
132 1
                uri = uri.split(":")
133 1
                if serv_uri[:len(uri)] == uri:
134 1
                    servers.append(serv.Server)
135 1
                    break
136 1
        return servers
137
138 1
    def register_server(self, server, conf=None):
139 1
        appdesc = ua.ApplicationDescription()
140 1
        appdesc.ApplicationUri = server.ServerUri
141 1
        appdesc.ProductUri = server.ProductUri
142 1
        appdesc.ApplicationName = server.ServerNames[0]  # FIXME: select name from client locale
143 1
        appdesc.ApplicationType = server.ServerType
144 1
        appdesc.GatewayServerUri = server.GatewayServerUri
145 1
        appdesc.DiscoveryUrls = server.DiscoveryUrls  # FIXME: select discovery uri using reachability from client network
146 1
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
147
148 1
    def register_server2(self, params):
149
        return self.register_server(params.Server, params.DiscoveryConfiguration)
150
151 1
    def create_session(self, name, user=User.Anonymous, external=False):
152 1
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
153
154 1
    def enable_history(self, node, period=timedelta(days=7)):
155
        """
156
        Set attribute Historizing of node to True and start storing data for history
157
        """
158 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
159 1
        self.history_manager.historize(node, period)
160
161 1
    def disable_history(self, node):
162
        """
163
        Set attribute Historizing of node to False and stop storing data for history
164
        """
165 1
        node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
166 1
        self.history_manager.dehistorize(node)
167
168
169
170 1
class InternalSession(object):
171 1
    _counter = 10
172 1
    _auth_counter = 1000
173
174 1
    def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous, external=False):
175 1
        self.logger = logging.getLogger(__name__)
176 1
        self.iserver = internal_server
177 1
        self.external = external  # define if session is external, we need to copy some objects if it is internal
178 1
        self.aspace = aspace
179 1
        self.subscription_service = submgr
180 1
        self.name = name
181 1
        self.user = user
182 1
        self.state = SessionState.Created
183 1
        self.session_id = ua.NodeId(self._counter)
184 1
        InternalSession._counter += 1
185 1
        self.authentication_token = ua.NodeId(self._auth_counter)
186 1
        InternalSession._auth_counter += 1
187 1
        self.subscriptions = []
188
        #self.logger.debug("Created internal session %s for user %s", self.name, self.user)
189 1
        print("Created internal session {} for user {}".format(self.name, self.user))
190 1
        self._lock = Lock()
191
192 1
    def __str__(self):
193
        return "InternalSession(name:{}, user:{}, id:{}, auth_token:{})".format(self.name, self.user, self.session_id, self.authentication_token)
194
195 1
    def get_endpoints(self, params=None, sockname=None):
196 1
        return self.iserver.get_endpoints(params, sockname)
197
198 1
    def create_session(self, params, sockname=None):
199 1
        self.logger.info("Create session request")
200
201 1
        result = ua.CreateSessionResult()
202 1
        result.SessionId = self.session_id
203 1
        result.AuthenticationToken = self.authentication_token
204 1
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
205 1
        result.MaxRequestMessageSize = 65536
206 1
        self.nonce = utils.create_nonce(32)
207 1
        result.ServerNonce = self.nonce
208 1
        result.ServerEndpoints = self.get_endpoints(sockname=sockname)
209
210 1
        return result
211
212 1
    def close_session(self, delete_subs):
213 1
        self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
214 1
        self.state = SessionState.Closed
215 1
        self.delete_subscriptions(self.subscriptions[:])
216
217 1
    def activate_session(self, params):
218 1
        self.logger.info("activate session")
219 1
        result = ua.ActivateSessionResult()
220 1
        if self.state != SessionState.Created:
221
            raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
222 1
        self.nonce = utils.create_nonce(32)
223 1
        result.ServerNonce = self.nonce
224 1
        for _ in params.ClientSoftwareCertificates:
225
            result.Results.append(ua.StatusCode())
226 1
        self.state = SessionState.Activated
227 1
        id_token = params.UserIdentityToken
228 1
        if isinstance(id_token, ua.UserNameIdentityToken):
229 1
            if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
230 1
                self.user = User.Admin
231 1
        return result
232
233 1
    def read(self, params):
234 1
        return self.iserver.attribute_service.read(params)
235
236 1
    def write(self, params):
237 1
        if not self.external:
238
            # If session is internal we need to store a copy og object, not a reference,
239
            #otherwise users may change it and we will not generate expected events
240 1
            for ntw in params.NodesToWrite:
241 1
                ntw.Value.Value.Value = copy(ntw.Value.Value.Value)
242 1
        return self.iserver.attribute_service.write(params, self.user)
243
244 1
    def browse(self, params):
245 1
        return self.iserver.view_service.browse(params)
246
247 1
    def translate_browsepaths_to_nodeids(self, params):
248 1
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
249
250 1
    def add_nodes(self, params):
251 1
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
252
253 1
    def delete_nodes(self, params):
254 1
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
255
256 1
    def add_references(self, params):
257
        return self.iserver.node_mgt_service.add_references(params, self.user)
258
259 1
    def delete_references(self, params):
260
        return self.iserver.node_mgt_service.delete_references(params, self.user)
261
262 1
    def add_method_callback(self, methodid, callback):
263 1
        return self.aspace.add_method_callback(methodid, callback)
264
265 1
    def call(self, params):
266 1
        return self.iserver.method_service.call(params)
267
268 1
    def create_subscription(self, params, callback):
269 1
        result = self.subscription_service.create_subscription(params, callback)
270 1
        with self._lock:
271 1
            self.subscriptions.append(result.SubscriptionId)
272 1
        return result
273
274 1
    def create_monitored_items(self, params):
275 1
        return self.subscription_service.create_monitored_items(params)
276
277 1
    def modify_monitored_items(self, params):
278
        return self.subscription_service.modify_monitored_items(params)
279
280 1
    def republish(self, params):
281
        return self.subscription_service.republish(params)
282
283 1
    def delete_subscriptions(self, ids):
284 1
        for i in ids:
285 1
            with self._lock:
286 1
                if i in self.subscriptions:
287 1
                    self.subscriptions.remove(i)
288 1
        return self.subscription_service.delete_subscriptions(ids)
289
290 1
    def delete_monitored_items(self, params):
291 1
        return self.subscription_service.delete_monitored_items(params)
292
293 1
    def publish(self, acks=None):
294 1
        if acks is None:
295 1
            acks = []
296
        return self.subscription_service.publish(acks)
297