Passed
Pull Request — master (#45)
by
unknown
02:29
created

InternalServer.add_endpoint()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6
from datetime import datetime, timedelta
7
from copy import copy
8
from struct import unpack_from
9
import os
10
import logging
11
from urllib.parse import urlparse
12
from typing import Coroutine
13
14
from asyncua import ua
15
from ..common.callback import CallbackDispatcher
16
from ..common.node import Node
17
from .history import HistoryManager
18
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
19
from .subscription_service import SubscriptionService
20
from .standard_address_space import standard_address_space
21
from .users import User
22
from .internal_session import InternalSession
23
24
try:
25
    from asyncua.crypto import uacrypto
26
except ImportError:
27
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
28
    uacrypto = False
29
30
31
class ServerDesc:
32
    def __init__(self, serv, cap=None):
33
        self.Server = serv
34
        self.Capabilities = cap
35
36
37
def default_user_manager(iserver, isession, username, password):
38
    """
39
    Default user_manager, does nothing much but check for admin
40
    """
41
    if iserver.allow_remote_admin and username in ("admin", "Admin"):
42
        isession.user = User.Admin
43
    return True
44
45
46
class InternalServer:
47
    """
48
    There is one `InternalServer` for every `Server`.
49
    """
50
    def __init__(self, loop):
51
        self.loop = loop
52
        self.logger = logging.getLogger(__name__)
53
        self.server_callback_dispatcher = CallbackDispatcher()
54
        self.endpoints = []
55
        self._channel_id_counter = 5
56
        self.allow_remote_admin = True
57
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
58
        self._known_servers = {}  # used if we are a discovery server
59
        self.certificate = None
60
        self.private_key = None
61
        self.aspace = AddressSpace()
62
        self.attribute_service = AttributeService(self.aspace)
63
        self.view_service = ViewService(self.aspace)
64
        self.method_service = MethodService(self.aspace)
65
        self.node_mgt_service = NodeManagementService(self.aspace)
66
        self.asyncio_transports = []
67
        self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
68
        self.history_manager = HistoryManager(self)
69
70
        self.user_manager = default_user_manager  # defined at the end of this file
71
72
        # create a session to use on server side
73
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
74
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
75
76
    async def init(self, shelffile=None):
77
        await self.load_standard_address_space(shelffile)
78
        await self._address_space_fixes()
79
        await self.setup_nodes()
80
        await self.history_manager.init()
81
82
    async def setup_nodes(self):
83
        """
84
        Set up some nodes as defined by spec
85
        """
86
        uries = ['http://opcfoundation.org/UA/']
87
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
88
        await ns_node.set_value(uries)
89
90
    async def load_standard_address_space(self, shelf_file=None):
91
        if shelf_file:
92
            is_file = (await self.loop.run_in_executor(None, os.path.isfile, shelf_file)
93
                       or await self.loop.run_in_executor(None, os.path.isfile, f'{shelf_file}.db'))
94
            if is_file:
95
                # import address space from shelf
96
                await self.loop.run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
97
                return
98
        # import address space from code generated from xml
99
        standard_address_space.fill_address_space(self.node_mgt_service)
100
        # import address space directly from xml, this has performance impact so disabled
101
        # importer = xmlimporter.XmlImporter(self.node_mgt_service)
102
        # importer.import_xml("/path/to/python-asyncua/schemas/Opc.Ua.NodeSet2.xml", self)
103
        if shelf_file:
104
            # path was supplied, but file doesn't exist - create one for next start up
105
            await self.loop.run_in_executor(None, self.aspace.make_aspace_shelf, shelf_file)
106
107
    def _address_space_fixes(self) -> Coroutine:
108
        """
109
        Looks like the xml definition of address space has some error. This is a good place to fix them
110
        """
111
        it = ua.AddReferencesItem()
112
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
113
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
114
        it.IsForward = False
115
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
116
        it.TargetNodeClass = ua.NodeClass.Object
117
        it2 = ua.AddReferencesItem()
118
        it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
119
        it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
120
        it2.IsForward = False
121
        it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
122
        it2.TargetNodeClass = ua.NodeClass.Object
123
        return self.isession.add_references([it, it2])
124
125
    def load_address_space(self, path):
126
        """
127
        Load address space from path
128
        """
129
        self.aspace.load(path)
130
131
    def dump_address_space(self, path):
132
        """
133
        Dump current address space to path
134
        """
135
        self.aspace.dump(path)
136
137
    async def start(self):
138
        self.logger.info('starting internal server')
139
        for edp in self.endpoints:
140
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
141
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
142
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
143
        if not self.disabled_clock:
144
            self._set_current_time()
145
146
    async def stop(self):
147
        self.logger.info('stopping internal server')
148
        await self.isession.close_session()
149
        await self.history_manager.stop()
150
151
    def _set_current_time(self):
152
        self.loop.create_task(self.current_time_node.set_value(datetime.utcnow()))
153
        self.loop.call_later(1, self._set_current_time)
154
155
    def get_new_channel_id(self):
156
        self._channel_id_counter += 1
157
        return self._channel_id_counter
158
159
    def add_endpoint(self, endpoint):
160
        self.endpoints.append(endpoint)
161
162
    async def get_endpoints(self, params=None, sockname=None):
163
        self.logger.info('get endpoint')
164
        if sockname:
165
            # return to client the ip address it has access to
166
            edps = []
167
            for edp in self.endpoints:
168
                edp1 = copy(edp)
169
                url = urlparse(edp1.EndpointUrl)
170
                url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
171
                edp1.EndpointUrl = url.geturl()
172
                edps.append(edp1)
173
            return edps
174
        return self.endpoints[:]
175
176
    def find_servers(self, params):
177
        if not params.ServerUris:
178
            return [desc.Server for desc in self._known_servers.values()]
179
        servers = []
180
        for serv in self._known_servers.values():
181
            serv_uri = serv.Server.ApplicationUri.split(':')
182
            for uri in params.ServerUris:
183
                uri = uri.split(':')
184
                if serv_uri[:len(uri)] == uri:
185
                    servers.append(serv.Server)
186
                    break
187
        return servers
188
189
    def register_server(self, server, conf=None):
190
        appdesc = ua.ApplicationDescription()
191
        appdesc.ApplicationUri = server.ServerUri
192
        appdesc.ProductUri = server.ProductUri
193
        # FIXME: select name from client locale
194
        appdesc.ApplicationName = server.ServerNames[0]
195
        appdesc.ApplicationType = server.ServerType
196
        appdesc.DiscoveryUrls = server.DiscoveryUrls
197
        # FIXME: select discovery uri using reachability from client network
198
        appdesc.GatewayServerUri = server.GatewayServerUri
199
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
200
201
    def register_server2(self, params):
202
        return self.register_server(params.Server, params.DiscoveryConfiguration)
203
204
    def create_session(self, name, user=User.Anonymous, external=False):
205
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
206
207
    async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
208
        """
209
        Set attribute Historizing of node to True and start storing data for history
210
        """
211
        await node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
212
        await node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
213
        await node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
214
        await self.history_manager.historize_data_change(node, period, count)
215
216
    async def disable_history_data_change(self, node):
217
        """
218
        Set attribute Historizing of node to False and stop storing data for history
219
        """
220
        await node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
221
        await node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
222
        await node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
223
        await self.history_manager.dehistorize(node)
224
225
    async def enable_history_event(self, source, period=timedelta(days=7), count=0):
226
        """
227
        Set attribute History Read of object events to True and start storing data for history
228
        """
229
        event_notifier = await source.get_event_notifier()
230
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
231
            raise ua.UaError('Node does not generate events', event_notifier)
232
        if ua.EventNotifier.HistoryRead not in event_notifier:
233
            event_notifier.add(ua.EventNotifier.HistoryRead)
234
            await source.set_event_notifier(event_notifier)
235
        await self.history_manager.historize_event(source, period, count)
236
237
    async def disable_history_event(self, source):
238
        """
239
        Set attribute History Read of node to False and stop storing data for history
240
        """
241
        await source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
242
        await self.history_manager.dehistorize(source)
243
244
    def subscribe_server_callback(self, event, handle):
245
        """
246
        Create a subscription from event to handle
247
        """
248
        self.server_callback_dispatcher.addListener(event, handle)
249
250
    def unsubscribe_server_callback(self, event, handle):
251
        """
252
        Remove a subscription from event to handle
253
        """
254
        self.server_callback_dispatcher.removeListener(event, handle)
255
256
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
257
        """
258
        directly write datavalue to the Attribute, bypasing some checks and structure creation
259
        so it is a little faster
260
        """
261
        self.aspace.set_attribute_value(nodeid, attr, datavalue)
262
263
    def set_user_manager(self, user_manager):
264
        """
265
        set up a function which that will check for authorize users. Input function takes username
266
        and password as paramters and returns True of user is allowed access, False otherwise.
267
        """
268
        self.user_manager = user_manager
269
270
    def check_user_token(self, isession, token):
271
        """
272
        unpack the username and password for the benefit of the user defined user manager
273
        """
274
        userName = token.UserName
275
        passwd = token.Password
276
277
        # decrypt password if we can
278
        if str(token.EncryptionAlgorithm) != "None":
279
            if not uacrypto:
280
                return False
281
            try:
282
                if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
283
                    raw_pw = uacrypto.decrypt_rsa15(self.private_key, passwd)
284
                elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
285
                    raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, passwd)
286
                else:
287
                    self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
288
                    return False
289
                length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
290
                passwd = raw_pw[4:4 + length]
291
                passwd = passwd.decode('utf-8')
292
            except Exception:
293
                self.logger.exception("Unable to decrypt password")
294
                return False
295
296
        # call user_manager
297
        return self.user_manager(self, isession, userName, passwd)
298