Completed
Pull Request — master (#70)
by Olivier
03:30
created

opcua.ServerDesc   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 4
Duplicated Lines 0 %

Test Coverage

Coverage 100%
Metric Value
wmc 1
dl 0
loc 4
ccs 4
cts 4
cp 1
rs 10

1 Method

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 3 1
1
"""
2
Internal server implementing opcu-ua interface. can be used on server side or to implement binary/https opc-ua servers
3
"""
4
5 1
from datetime import datetime
6 1
from copy import copy
7 1
import logging
8 1
from threading import Lock
9 1
from enum import Enum
10 1
try:
11 1
    from urllib.parse import urlparse
12
except ImportError:
13
    from urlparse import urlparse
14
15
16 1
from opcua import ua
17 1
from opcua import utils
18 1
from opcua import Node
19 1
from opcua.address_space import AddressSpace
20 1
from opcua.address_space import AttributeService
21 1
from opcua.address_space import ViewService
22 1
from opcua.address_space import NodeManagementService
23 1
from opcua.address_space import MethodService
24 1
from opcua.subscription_service import SubscriptionService
25 1
from opcua import standard_address_space
26 1
from opcua.users import User
27 1
from opcua import xmlimporter
28
29
30 1
class SessionState(Enum):
31 1
    Created = 0
32 1
    Activated = 1
33 1
    Closed = 2
34
35
36 1
class ServerDesc(object):
37 1
    def __init__(self, serv, cap=None):
38 1
        self.Server = serv
39 1
        self.Capabilities = cap
40
41
42 1
class InternalServer(object):
43
44 1
    def __init__(self):
45 1
        self.logger = logging.getLogger(__name__)
46 1
        self.endpoints = []
47 1
        self._channel_id_counter = 5
48 1
        self.allow_remote_admin = True
49 1
        self._known_servers = {}  # used if we are a discovery server
50
51 1
        self.aspace = AddressSpace()
52 1
        self.attribute_service = AttributeService(self.aspace)
53 1
        self.view_service = ViewService(self.aspace)
54 1
        self.method_service = MethodService(self.aspace)
55 1
        self.node_mgt_service = NodeManagementService(self.aspace)
56
        # import address space from code generated from xml
57 1
        standard_address_space.fill_address_space(self.node_mgt_service)  
58
        # import address space from save db to disc
59
        #standard_address_space.fill_address_space_from_disk(self.aspace)  
60
61
        # import address space directly from xml, this has preformance impact so disabled
62
        #importer = xmlimporter.XmlImporter(self.node_mgt_service)
63
        #importer.import_xml("/home/olivier/python-opcua/schemas/Opc.Ua.NodeSet2.xml")
64
65 1
        self.loop = utils.ThreadLoop()
66 1
        self.subscription_service = SubscriptionService(self.loop, self.aspace)
67
68
        # create a session to use on server side
69 1
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
70 1
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
71 1
        uries = ["http://opcfoundation.org/UA/"]
72 1
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
73 1
        ns_node.set_value(uries)
74
75 1
    def load_address_space(self, path):
76
        self.aspace.load(path)
77
78 1
    def dump_address_space(self, path):
79
        self.aspace.dump(path)
80
81 1
    def start(self):
82 1
        self.logger.info("starting internal server")
83 1
        self.loop.start()
84 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0)
85 1
        Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.now())
86 1
        self._set_current_time()
87
88 1
    def stop(self):
89 1
        self.logger.info("stopping internal server")
90 1
        self.loop.stop()
91
92 1
    def _set_current_time(self):
93 1
        self.current_time_node.set_value(datetime.now())
94 1
        self.loop.call_later(1, self._set_current_time)
95
96 1
    def get_new_channel_id(self):
97 1
        self._channel_id_counter += 1
98 1
        return self._channel_id_counter
99
100 1
    def add_endpoint(self, endpoint):
101 1
        self.endpoints.append(endpoint)
102
103 1
    def get_endpoints(self, params=None, sockname=None):
104 1
        self.logger.info("get endpoint")
105 1
        if sockname:
106
            #return to client the ip address it has access to
107 1
            edps = []
108 1
            for edp in self.endpoints:
109 1
                edp1 = copy(edp)
110 1
                url = urlparse(edp1.EndpointUrl)
111 1
                url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
112 1
                edp1.EndpointUrl = url.geturl()
113 1
                edps.append(edp1)
114 1
                return edps
115 1
        return self.endpoints[:]
116
117 1
    def find_servers(self, params):
118
        #FIXME: implement filtering from parmams.uri 
119 1
        servers = []
120 1
        for edp in self.endpoints:
121 1
            servers.append(edp.Server)
122 1
        return servers + [desc.Server for desc in self._known_servers.values()]
123
124 1
    def register_server(self, server, conf=None):
125 1
        appdesc = ua.ApplicationDescription()
126 1
        appdesc.ApplicationUri = server.ServerUri
127 1
        appdesc.ProductUri = server.ProductUri
128 1
        appdesc.ApplicationName = server.ServerNames[0]  # FIXME: select name from client locale
129 1
        appdesc.ApplicationType = server.ServerType
130 1
        appdesc.GatewayServerUri = server.GatewayServerUri
131 1
        appdesc.DiscoveryUrls = server.DiscoveryUrls  # FIXME: select discovery uri using reachability from client network
132 1
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
133
134 1
    def register_server2(self, params):
135
        return self.register_server(params.Server, params.DiscoveryConfiguration)
136
137 1
    def create_session(self, name, user=User.Anonymous):
138 1
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user)
139
140
141 1
class InternalSession(object):
142 1
    _counter = 10
143 1
    _auth_counter = 1000
144
145 1
    def __init__(self, internal_server, aspace, submgr, name, user=User.Anonymous):
146 1
        self.logger = logging.getLogger(__name__)
147 1
        self.iserver = internal_server
148 1
        self.aspace = aspace
149 1
        self.subscription_service = submgr
150 1
        self.name = name
151 1
        self.user = user
152 1
        self.state = SessionState.Created
153 1
        self.session_id = ua.NodeId(self._counter)
154 1
        InternalSession._counter += 1
155 1
        self.authentication_token = ua.NodeId(self._auth_counter)
156 1
        InternalSession._auth_counter += 1
157 1
        self.nonce = utils.create_nonce()
158 1
        self.subscriptions = []
159
        #self.logger.debug("Created internal session %s for user %s", self.name, self.user)
160 1
        print("Created internal session {} for user {}".format(self.name, self.user))
161 1
        self._lock = Lock()
162
163 1
    def __str__(self):
164
        return "InternalSession(name:{}, user:{}, id:{}, auth_token:{})".format(self.name, self.user, self.session_id, self.authentication_token)
165
166 1
    def get_endpoints(self, params=None, sockname=None):
167 1
        return self.iserver.get_endpoints(params, sockname)
168
169 1
    def create_session(self, params, sockname=None):
170 1
        self.logger.info("Create session request")
171
172 1
        result = ua.CreateSessionResult()
173 1
        result.SessionId = self.session_id
174 1
        result.AuthenticationToken = self.authentication_token
175 1
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
176 1
        result.MaxRequestMessageSize = 65536
177 1
        result.ServerNonce = self.nonce
178 1
        result.ServerEndpoints = self.get_endpoints(sockname=sockname)
179
180 1
        return result
181
182 1
    def close_session(self, delete_subs):
183 1
        self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
184 1
        self.state = SessionState.Closed
185 1
        self.delete_subscriptions(self.subscriptions[:])
186
187 1
    def activate_session(self, params):
188 1
        self.logger.info("activate session")
189 1
        result = ua.ActivateSessionResult()
190 1
        if not self.state == SessionState.Created:
191
            raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
192 1
        result.ServerNonce = self.nonce
193 1
        for _ in params.ClientSoftwareCertificates:
194
            result.Results.append(ua.StatusCode())
195 1
        self.state = SessionState.Activated
196 1
        id_token = params.UserIdentityToken
197 1
        if isinstance(id_token, ua.UserNameIdentityToken):
198 1
            if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
199 1
                self.user = User.Admin
200 1
        return result
201
202 1
    def read(self, params):
203 1
        return self.iserver.attribute_service.read(params)
204
205 1
    def write(self, params):
206 1
        return self.iserver.attribute_service.write(params, self.user)
207
208 1
    def browse(self, params):
209 1
        return self.iserver.view_service.browse(params)
210
211 1
    def translate_browsepaths_to_nodeids(self, params):
212 1
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
213
214 1
    def add_nodes(self, params):
215 1
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
216
217 1
    def add_references(self, params):
218
        return self.iserver.node_mgt_service.add_references(params, self.user)
219
220 1
    def add_method_callback(self, methodid, callback):
221 1
        return self.aspace.add_method_callback(methodid, callback)
222
223 1
    def call(self, params):
224 1
        return self.iserver.method_service.call(params)
225
226 1
    def create_subscription(self, params, callback):
227 1
        result = self.subscription_service.create_subscription(params, callback)
228 1
        with self._lock:
229 1
            self.subscriptions.append(result.SubscriptionId)
230 1
        return result
231
232 1
    def create_monitored_items(self, params):
233 1
        return self.subscription_service.create_monitored_items(params)
234
235 1
    def modify_monitored_items(self, params):
236
        return self.subscription_service.modify_monitored_items(params)
237
238 1
    def republish(self, params):
239
        return self.subscription_service.republish(params)
240
241 1
    def delete_subscriptions(self, ids):
242 1
        for i in ids:
243 1
            with self._lock:
244 1
                if i in self.subscriptions:
245 1
                    self.subscriptions.remove(i)
246 1
        return self.subscription_service.delete_subscriptions(ids)
247
248 1
    def delete_monitored_items(self, params):
249 1
        return self.subscription_service.delete_monitored_items(params)
250
251 1
    def publish(self, acks=None):
252 1
        if acks is None:
253 1
            acks = []
254
        return self.subscription_service.publish(acks)
255