Passed
Push — master ( 3fb232...b005e0 )
by Olivier
02:21
created

asyncua.server.internal_server.InternalSession.__init__()   A

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 15
nop 7
dl 0
loc 15
rs 9.65
c 0
b 0
f 0
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6
import asyncio
7
from datetime import datetime, timedelta
8
from copy import copy
9
from struct import unpack_from
10
import os
11
import logging
12
from urllib.parse import urlparse
13
from typing import Coroutine
14
15
from asyncua import ua
16
from ..common.callback import CallbackDispatcher
17
from ..common.node import Node
18
from .history import HistoryManager
19
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
20
from .subscription_service import SubscriptionService
21
from .standard_address_space import standard_address_space
22
from .users import User
23
from .internal_session import InternalSession
24
25
try:
26
    from asyncua.crypto import uacrypto
27
except ImportError:
28
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
29
    uacrypto = False
30
31
32
class ServerDesc:
33
    def __init__(self, serv, cap=None):
34
        self.Server = serv
35
        self.Capabilities = cap
36
37
38
def default_user_manager(iserver, isession, username, password):
39
    """
40
    Default user_manager, does nothing much but check for admin
41
    """
42
    if iserver.allow_remote_admin and username in ("admin", "Admin"):
43
        isession.user = User.Admin
44
    return True
45
46
47
class InternalServer:
48
    """
49
    There is one `InternalServer` for every `Server`.
50
    """
51
    def __init__(self, loop: asyncio.AbstractEventLoop):
52
        self.loop: asyncio.AbstractEventLoop = loop
53
        self.logger = logging.getLogger(__name__)
54
        self.server_callback_dispatcher = CallbackDispatcher()
55
        self.endpoints = []
56
        self._channel_id_counter = 5
57
        self.allow_remote_admin = True
58
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
59
        self._known_servers = {}  # used if we are a discovery server
60
        self.certificate = None
61
        self.private_key = None
62
        self.aspace = AddressSpace()
63
        self.attribute_service = AttributeService(self.aspace)
64
        self.view_service = ViewService(self.aspace)
65
        self.method_service = MethodService(self.aspace)
66
        self.node_mgt_service = NodeManagementService(self.aspace)
67
        self.asyncio_transports = []
68
        self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
69
        self.history_manager = HistoryManager(self)
70
        self.user_manager = default_user_manager  # defined at the end of this file
71
        # create a session to use on server side
72
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
73
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
74
75
    async def init(self, shelffile=None):
76
        await self.load_standard_address_space(shelffile)
77
        await self._address_space_fixes()
78
        await self.setup_nodes()
79
        await self.history_manager.init()
80
81
    async def setup_nodes(self):
82
        """
83
        Set up some nodes as defined by spec
84
        """
85
        uries = ['http://opcfoundation.org/UA/']
86
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
87
        await ns_node.set_value(uries)
88
89
    async def load_standard_address_space(self, shelf_file=None):
90
        if shelf_file:
91
            is_file = (await self.loop.run_in_executor(None, os.path.isfile, shelf_file)
92
                       or await self.loop.run_in_executor(None, os.path.isfile, f'{shelf_file}.db'))
93
            if is_file:
94
                # import address space from shelf
95
                await self.loop.run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
96
                return
97
        # import address space from code generated from xml
98
        standard_address_space.fill_address_space(self.node_mgt_service)
99
        # import address space directly from xml, this has performance impact so disabled
100
        # importer = xmlimporter.XmlImporter(self.node_mgt_service)
101
        # importer.import_xml("/path/to/python-asyncua/schemas/Opc.Ua.NodeSet2.xml", self)
102
        if shelf_file:
103
            # path was supplied, but file doesn't exist - create one for next start up
104
            await self.loop.run_in_executor(None, self.aspace.make_aspace_shelf, shelf_file)
105
106
    def _address_space_fixes(self) -> Coroutine:
107
        """
108
        Looks like the xml definition of address space has some error. This is a good place to fix them
109
        """
110
        it = ua.AddReferencesItem()
111
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
112
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
113
        it.IsForward = False
114
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
115
        it.TargetNodeClass = ua.NodeClass.Object
116
        it2 = ua.AddReferencesItem()
117
        it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
118
        it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
119
        it2.IsForward = False
120
        it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
121
        it2.TargetNodeClass = ua.NodeClass.Object
122
        return self.isession.add_references([it, it2])
123
124
    def load_address_space(self, path):
125
        """
126
        Load address space from path
127
        """
128
        self.aspace.load(path)
129
130
    def dump_address_space(self, path):
131
        """
132
        Dump current address space to path
133
        """
134
        self.aspace.dump(path)
135
136
    async def start(self):
137
        self.logger.info('starting internal server')
138
        for edp in self.endpoints:
139
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
140
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
141
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
142
        if not self.disabled_clock:
143
            self._set_current_time()
144
145
    async def stop(self):
146
        self.logger.info('stopping internal server')
147
        await self.isession.close_session()
148
        await self.history_manager.stop()
149
150
    def _set_current_time(self):
151
        self.loop.create_task(self.current_time_node.set_value(datetime.utcnow()))
152
        self.loop.call_later(1, self._set_current_time)
153
154
    def get_new_channel_id(self):
155
        self._channel_id_counter += 1
156
        return self._channel_id_counter
157
158
    def add_endpoint(self, endpoint):
159
        self.endpoints.append(endpoint)
160
161
    async def get_endpoints(self, params=None, sockname=None):
162
        self.logger.info('get endpoint')
163
        if sockname:
164
            # return to client the ip address it has access to
165
            edps = []
166
            for edp in self.endpoints:
167
                edp1 = copy(edp)
168
                url = urlparse(edp1.EndpointUrl)
169
                url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
170
                edp1.EndpointUrl = url.geturl()
171
                edps.append(edp1)
172
            return edps
173
        return self.endpoints[:]
174
175
    def find_servers(self, params):
176
        if not params.ServerUris:
177
            return [desc.Server for desc in self._known_servers.values()]
178
        servers = []
179
        for serv in self._known_servers.values():
180
            serv_uri = serv.Server.ApplicationUri.split(':')
181
            for uri in params.ServerUris:
182
                uri = uri.split(':')
183
                if serv_uri[:len(uri)] == uri:
184
                    servers.append(serv.Server)
185
                    break
186
        return servers
187
188
    def register_server(self, server, conf=None):
189
        appdesc = ua.ApplicationDescription()
190
        appdesc.ApplicationUri = server.ServerUri
191
        appdesc.ProductUri = server.ProductUri
192
        # FIXME: select name from client locale
193
        appdesc.ApplicationName = server.ServerNames[0]
194
        appdesc.ApplicationType = server.ServerType
195
        appdesc.DiscoveryUrls = server.DiscoveryUrls
196
        # FIXME: select discovery uri using reachability from client network
197
        appdesc.GatewayServerUri = server.GatewayServerUri
198
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
199
200
    def register_server2(self, params):
201
        return self.register_server(params.Server, params.DiscoveryConfiguration)
202
203
    def create_session(self, name, user=User.Anonymous, external=False):
204
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
205
206
    async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
207
        """
208
        Set attribute Historizing of node to True and start storing data for history
209
        """
210
        await node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
211
        await node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
212
        await node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
213
        await self.history_manager.historize_data_change(node, period, count)
214
215
    async def disable_history_data_change(self, node):
216
        """
217
        Set attribute Historizing of node to False and stop storing data for history
218
        """
219
        await node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
220
        await node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
221
        await node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
222
        await self.history_manager.dehistorize(node)
223
224
    async def enable_history_event(self, source, period=timedelta(days=7), count=0):
225
        """
226
        Set attribute History Read of object events to True and start storing data for history
227
        """
228
        event_notifier = await source.get_event_notifier()
229
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
230
            raise ua.UaError('Node does not generate events', event_notifier)
231
        if ua.EventNotifier.HistoryRead not in event_notifier:
232
            event_notifier.add(ua.EventNotifier.HistoryRead)
233
            await source.set_event_notifier(event_notifier)
234
        await self.history_manager.historize_event(source, period, count)
235
236
    async def disable_history_event(self, source):
237
        """
238
        Set attribute History Read of node to False and stop storing data for history
239
        """
240
        await source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
241
        await self.history_manager.dehistorize(source)
242
243
    def subscribe_server_callback(self, event, handle):
244
        """
245
        Create a subscription from event to handle
246
        """
247
        self.server_callback_dispatcher.addListener(event, handle)
248
249
    def unsubscribe_server_callback(self, event, handle):
250
        """
251
        Remove a subscription from event to handle
252
        """
253
        self.server_callback_dispatcher.removeListener(event, handle)
254
255
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
256
        """
257
        directly write datavalue to the Attribute, bypassing some checks and structure creation
258
        so it is a little faster
259
        """
260
        self.aspace.set_attribute_value(nodeid, attr, datavalue)
261
262
    def set_user_manager(self, user_manager):
263
        """
264
        set up a function which that will check for authorize users. Input function takes username
265
        and password as parameters and returns True of user is allowed access, False otherwise.
266
        """
267
        self.user_manager = user_manager
268
269
    def check_user_token(self, isession, token):
270
        """
271
        unpack the username and password for the benefit of the user defined user manager
272
        """
273
        user_name = token.UserName
274
        password = token.Password
275
        # decrypt password if we can
276
        if str(token.EncryptionAlgorithm) != "None":
277
            if not uacrypto:
278
                return False
279
            try:
280
                if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
281
                    raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
282
                elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
283
                    raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
284
                else:
285
                    self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
286
                    return False
287
                length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
288
                password = raw_pw[4:4 + length]
289
                password = password.decode('utf-8')
290
            except Exception:
291
                self.logger.exception("Unable to decrypt password")
292
                return False
293
        # call user_manager
294
        return self.user_manager(self, isession, user_name, password)
295