Passed
Push — master ( b0b54c...47920a )
by Olivier
02:38
created

default_user_manager()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 4
nop 4
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6
import asyncio
7
from datetime import datetime, timedelta
8
from copy import copy
9
from struct import unpack_from
10
import os
11
import logging
12
from urllib.parse import urlparse
13
from typing import Coroutine
14
15
from asyncua import ua
16
from .user_managers import PermissiveUserManager, UserManager
17
from ..common.callback import CallbackDispatcher
18
from ..common.node import Node
19
from .history import HistoryManager
20
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
21
from .subscription_service import SubscriptionService
22
from .standard_address_space import standard_address_space
23
from .users import User, UserRole
24
from .internal_session import InternalSession
25
26
try:
27
    from asyncua.crypto import uacrypto
28
except ImportError:
29
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
30
    uacrypto = False
31
32
logger = logging.getLogger()
33
34
35
class ServerDesc:
36
    def __init__(self, serv, cap=None):
37
        self.Server = serv
38
        self.Capabilities = cap
39
40
41
class InternalServer:
42
    """
43
    There is one `InternalServer` for every `Server`.
44
    """
45
46
    def __init__(self, loop: asyncio.AbstractEventLoop, user_manager: UserManager = None):
47
        self.loop: asyncio.AbstractEventLoop = loop
48
        self.logger = logging.getLogger(__name__)
49
        self.server_callback_dispatcher = CallbackDispatcher()
50
        self.endpoints = []
51
        self._channel_id_counter = 5
52
        self.allow_remote_admin = True
53
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
54
        self._known_servers = {}  # used if we are a discovery server
55
        self.certificate = None
56
        self.private_key = None
57
        self.aspace = AddressSpace()
58
        self.attribute_service = AttributeService(self.aspace)
59
        self.view_service = ViewService(self.aspace)
60
        self.method_service = MethodService(self.aspace)
61
        self.node_mgt_service = NodeManagementService(self.aspace)
62
        self.asyncio_transports = []
63
        self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
64
        self.history_manager = HistoryManager(self)
65
        if user_manager is None:
66
            logger.warning("No user manager specified. Using default permissive manager instead.")
67
            user_manager = PermissiveUserManager()
68
        self.user_manager = user_manager
69
        # create a session to use on server side
70
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal",
71
                                        user=User(role=UserRole.Admin))
72
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
73
74
    async def init(self, shelffile=None):
75
        await self.load_standard_address_space(shelffile)
76
        await self._address_space_fixes()
77
        await self.setup_nodes()
78
        await self.history_manager.init()
79
80
    async def setup_nodes(self):
81
        """
82
        Set up some nodes as defined by spec
83
        """
84
        uries = ['http://opcfoundation.org/UA/']
85
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
86
        await ns_node.write_value(uries)
87
88
        params = ua.WriteParameters()
89
        for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
90
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
91
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
92
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
93
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
94
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
95
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
96
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
97
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
98
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
99
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
100
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
101
            attr = ua.WriteValue()
102
            attr.NodeId = ua.NodeId(nodeid)
103
            attr.AttributeId = ua.AttributeIds.Value
104
            attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good))
105
            attr.Value.ServerTimestamp = datetime.utcnow()
106
            params.NodesToWrite.append(attr)
107
        result = await self.isession.write(params)
108
        result[0].check()
109
110
    async def load_standard_address_space(self, shelf_file=None):
111
        if shelf_file:
112
            is_file = (await self.loop.run_in_executor(None, os.path.isfile, shelf_file)
113
                       or await self.loop.run_in_executor(None, os.path.isfile, f'{shelf_file}.db'))
114
            if is_file:
115
                # import address space from shelf
116
                await self.loop.run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
117
                return
118
        # import address space from code generated from xml
119
        standard_address_space.fill_address_space(self.node_mgt_service)
120
        # import address space directly from xml, this has performance impact so disabled
121
        # importer = xmlimporter.XmlImporter(self.node_mgt_service)
122
        # importer.import_xml("/path/to/python-asyncua/schemas/Opc.Ua.NodeSet2.xml", self)
123
        if shelf_file:
124
            # path was supplied, but file doesn't exist - create one for next start up
125
            await self.loop.run_in_executor(None, self.aspace.make_aspace_shelf, shelf_file)
126
127
    async def _address_space_fixes(self) -> Coroutine:
128
        """
129
        Looks like the xml definition of address space has some error. This is a good place to fix them
130
        """
131
        it = ua.AddReferencesItem()
132
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
133
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
134
        it.IsForward = False
135
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
136
        it.TargetNodeClass = ua.NodeClass.Object
137
138
        it2 = ua.AddReferencesItem()
139
        it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
140
        it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
141
        it2.IsForward = False
142
        it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
143
        it2.TargetNodeClass = ua.NodeClass.Object
144
145
        results = await self.isession.add_references([it, it2])
146
        for res in results:
147
            res.check()
148
149
    def load_address_space(self, path):
150
        """
151
        Load address space from path
152
        """
153
        self.aspace.load(path)
154
155
    def dump_address_space(self, path):
156
        """
157
        Dump current address space to path
158
        """
159
        self.aspace.dump(path)
160
161
    async def start(self):
162
        self.logger.info('starting internal server')
163
        for edp in self.endpoints:
164
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
165
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).write_value(ua.ServerState.Running, ua.VariantType.Int32)
166
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).write_value(datetime.utcnow())
167
        if not self.disabled_clock:
168
            self._set_current_time()
169
170
    async def stop(self):
171
        self.logger.info('stopping internal server')
172
        self.method_service.stop()
173
        await self.isession.close_session()
174
        await self.history_manager.stop()
175
176
    def _set_current_time(self):
177
        self.loop.create_task(self.current_time_node.write_value(datetime.utcnow()))
178
        self.loop.call_later(1, self._set_current_time)
179
180
    def get_new_channel_id(self):
181
        self._channel_id_counter += 1
182
        return self._channel_id_counter
183
184
    def add_endpoint(self, endpoint):
185
        self.endpoints.append(endpoint)
186
187
    async def get_endpoints(self, params=None, sockname=None):
188
        self.logger.info('get endpoint')
189
        if sockname:
190
            # return to client the ip address it has access to
191
            edps = []
192
            for edp in self.endpoints:
193
                edp1 = copy(edp)
194
                url = urlparse(edp1.EndpointUrl)
195
                url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
196
                edp1.EndpointUrl = url.geturl()
197
                edps.append(edp1)
198
            return edps
199
        return self.endpoints[:]
200
201
    def find_servers(self, params):
202
        if not params.ServerUris:
203
            return [desc.Server for desc in self._known_servers.values()]
204
        servers = []
205
        for serv in self._known_servers.values():
206
            serv_uri = serv.Server.ApplicationUri.split(':')
207
            for uri in params.ServerUris:
208
                uri = uri.split(':')
209
                if serv_uri[:len(uri)] == uri:
210
                    servers.append(serv.Server)
211
                    break
212
        return servers
213
214
    def register_server(self, server, conf=None):
215
        appdesc = ua.ApplicationDescription()
216
        appdesc.ApplicationUri = server.ServerUri
217
        appdesc.ProductUri = server.ProductUri
218
        # FIXME: select name from client locale
219
        appdesc.ApplicationName = server.ServerNames[0]
220
        appdesc.ApplicationType = server.ServerType
221
        appdesc.DiscoveryUrls = server.DiscoveryUrls
222
        # FIXME: select discovery uri using reachability from client network
223
        appdesc.GatewayServerUri = server.GatewayServerUri
224
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
225
226
    def register_server2(self, params):
227
        return self.register_server(params.Server, params.DiscoveryConfiguration)
228
229
    def create_session(self, name, user=User(role=UserRole.Anonymous), external=False):
230
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
231
232
    async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
233
        """
234
        Set attribute Historizing of node to True and start storing data for history
235
        """
236
        await node.write_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
237
        await node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
238
        await node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
239
        await self.history_manager.historize_data_change(node, period, count)
240
241
    async def disable_history_data_change(self, node):
242
        """
243
        Set attribute Historizing of node to False and stop storing data for history
244
        """
245
        await node.write_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
246
        await node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
247
        await node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
248
        await self.history_manager.dehistorize(node)
249
250
    async def enable_history_event(self, source, period=timedelta(days=7), count=0):
251
        """
252
        Set attribute History Read of object events to True and start storing data for history
253
        """
254
        event_notifier = await source.read_event_notifier()
255
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
256
            raise ua.UaError('Node does not generate events', event_notifier)
257
        if ua.EventNotifier.HistoryRead not in event_notifier:
258
            event_notifier.add(ua.EventNotifier.HistoryRead)
259
            await source.set_event_notifier(event_notifier)
260
        await self.history_manager.historize_event(source, period, count)
261
262
    async def disable_history_event(self, source):
263
        """
264
        Set attribute History Read of node to False and stop storing data for history
265
        """
266
        await source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
267
        await self.history_manager.dehistorize(source)
268
269
    def subscribe_server_callback(self, event, handle):
270
        """
271
        Create a subscription from event to handle
272
        """
273
        self.server_callback_dispatcher.addListener(event, handle)
274
275
    def unsubscribe_server_callback(self, event, handle):
276
        """
277
        Remove a subscription from event to handle
278
        """
279
        self.server_callback_dispatcher.removeListener(event, handle)
280
281
    async def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
282
        """
283
        directly write datavalue to the Attribute, bypassing some checks and structure creation
284
        so it is a little faster
285
        """
286
        await self.aspace.write_attribute_value(nodeid, attr, datavalue)
287
288
    def set_user_manager(self, user_manager):
289
        """
290
        set up a function which that will check for authorize users. Input function takes username
291
        and password as parameters and returns True of user is allowed access, False otherwise.
292
        """
293
        self.user_manager = user_manager
294
295
    def check_user_token(self, isession, token):
296
        """
297
        unpack the username and password for the benefit of the user defined user manager
298
        """
299
        user_name = token.UserName
300
        password = token.Password
301
302
        # decrypt password if we can
303
        if str(token.EncryptionAlgorithm) != "None":
304
            if not uacrypto:
305
                return False
306
            try:
307
                if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
308
                    raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
309
                elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
310
                    raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
311
                else:
312
                    self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
313
                    return False
314
                length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
315
                password = raw_pw[4:4 + length]
316
                password = password.decode('utf-8')
317
            except Exception:
318
                self.logger.exception("Unable to decrypt password")
319
                return False
320
        # call user_manager
321
        return self.user_manager(self, isession, user_name, password)
322