Passed
Pull Request — master (#365)
by
unknown
02:53
created

InternalServer.bind_standard_methods()   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 16
nop 1
dl 0
loc 16
rs 9.6
c 0
b 0
f 0
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6
import asyncio
7
from datetime import datetime, timedelta
8
from copy import copy
9
from struct import unpack_from
10
import os
11
import logging
12
from urllib.parse import urlparse
13
from typing import Coroutine
14
15
from asyncua import ua
16
from .user_managers import PermissiveUserManager, UserManager
17
from ..common.callback import CallbackService
18
from ..common.node import Node
19
from .history import HistoryManager
20
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
21
from .subscription_service import SubscriptionService
22
from .standard_address_space import standard_address_space
23
from .users import User, UserRole
24
from .internal_session import InternalSession
25
from .event_generator import EventGenerator
26
27
try:
28
    from asyncua.crypto import uacrypto
29
except ImportError:
30
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
31
    uacrypto = False
32
33
logger = logging.getLogger()
34
35
36
class ServerDesc:
37
    def __init__(self, serv, cap=None):
38
        self.Server = serv
39
        self.Capabilities = cap
40
41
42
class InternalServer:
43
    """
44
    There is one `InternalServer` for every `Server`.
45
    """
46
47
    def __init__(self, loop: asyncio.AbstractEventLoop, user_manager: UserManager = None):
48
        self.loop: asyncio.AbstractEventLoop = loop
49
        self.logger = logging.getLogger(__name__)
50
        self.callback_service = CallbackService()
51
        self.endpoints = []
52
        self._channel_id_counter = 5
53
        self.allow_remote_admin = True
54
        self.bind_condition_methods = False
55
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
56
        self._known_servers = {}  # used if we are a discovery server
57
        self.certificate = None
58
        self.private_key = None
59
        self.aspace = AddressSpace()
60
        self.attribute_service = AttributeService(self.aspace)
61
        self.view_service = ViewService(self.aspace)
62
        self.method_service = MethodService(self.aspace)
63
        self.node_mgt_service = NodeManagementService(self.aspace)
64
        self.asyncio_transports = []
65
        self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
66
        self.history_manager = HistoryManager(self)
67
        if user_manager is None:
68
            logger.warning("No user manager specified. Using default permissive manager instead.")
69
            user_manager = PermissiveUserManager()
70
        self.user_manager = user_manager
71
        # create a session to use on server side
72
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal",
73
                                        user=User(role=UserRole.Admin))
74
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
75
76
    async def init(self, shelffile=None):
77
        await self.load_standard_address_space(shelffile)
78
        await self._address_space_fixes()
79
        await self.setup_nodes()
80
        await self.history_manager.init()
81
        if self.bind_condition_methods:
82
            await self.bind_standard_methods()
83
84
    async def bind_standard_methods(self):
85
        refresh_start_event_type = EventGenerator(self.isession)
86
        await refresh_start_event_type.init(ua.ObjectIds.RefreshStartEventType)
87
        if isinstance(self.bind_condition_methods, int):
88
            refresh_start_event_type.event.Severity = self.bind_condition_methods
89
        self.subscription_service.standard_events[ua.ObjectIds.RefreshStartEventType] = refresh_start_event_type
90
        refresh_end_event_type = EventGenerator(self.isession)
91
        await refresh_end_event_type.init(ua.ObjectIds.RefreshEndEventType)
92
        if isinstance(self.bind_condition_methods, int):
93
            refresh_end_event_type.event.Severity = self.bind_condition_methods
94
        self.subscription_service.standard_events[ua.ObjectIds.RefreshEndEventType] = refresh_end_event_type
95
        condition_refresh_method = Node(self.isession, ua.NodeId(ua.ObjectIds.ConditionType_ConditionRefresh))
96
        self.isession.add_method_callback(condition_refresh_method.nodeid, self.subscription_service.condition_refresh)
97
        condition_refresh2_method = Node(self.isession, ua.NodeId(ua.ObjectIds.ConditionType_ConditionRefresh2))
98
        self.isession.add_method_callback(condition_refresh2_method.nodeid,
99
                                          self.subscription_service.condition_refresh2)
100
101
    async def setup_nodes(self):
102
        """
103
        Set up some nodes as defined by spec
104
        """
105
        uries = ['http://opcfoundation.org/UA/']
106
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
107
        await ns_node.write_value(uries)
108
109
        params = ua.WriteParameters()
110
        for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
111
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
112
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
113
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
114
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
115
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
116
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
117
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
118
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
119
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
120
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
121
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
122
            attr = ua.WriteValue()
123
            attr.NodeId = ua.NodeId(nodeid)
124
            attr.AttributeId = ua.AttributeIds.Value
125
            attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good))
126
            attr.Value.ServerTimestamp = datetime.utcnow()
127
            params.NodesToWrite.append(attr)
128
        result = await self.isession.write(params)
129
        result[0].check()
130
131
    async def load_standard_address_space(self, shelf_file=None):
132
        if shelf_file:
133
            is_file = (await self.loop.run_in_executor(None, os.path.isfile, shelf_file)
134
                       or await self.loop.run_in_executor(None, os.path.isfile, f'{shelf_file}.db'))
135
            if is_file:
136
                # import address space from shelf
137
                await self.loop.run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
138
                return
139
        # import address space from code generated from xml
140
        standard_address_space.fill_address_space(self.node_mgt_service)
141
        # import address space directly from xml, this has performance impact so disabled
142
        # importer = xmlimporter.XmlImporter(self.node_mgt_service)
143
        # importer.import_xml("/path/to/python-asyncua/schemas/Opc.Ua.NodeSet2.xml", self)
144
        if shelf_file:
145
            # path was supplied, but file doesn't exist - create one for next start up
146
            await self.loop.run_in_executor(None, self.aspace.make_aspace_shelf, shelf_file)
147
148
    async def _address_space_fixes(self) -> Coroutine:
149
        """
150
        Looks like the xml definition of address space has some error. This is a good place to fix them
151
        """
152
        it = ua.AddReferencesItem()
153
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
154
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
155
        it.IsForward = False
156
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
157
        it.TargetNodeClass = ua.NodeClass.Object
158
159
        it2 = ua.AddReferencesItem()
160
        it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
161
        it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
162
        it2.IsForward = False
163
        it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
164
        it2.TargetNodeClass = ua.NodeClass.Object
165
166
        results = await self.isession.add_references([it, it2])
167
        for res in results:
168
            res.check()
169
170
    def load_address_space(self, path):
171
        """
172
        Load address space from path
173
        """
174
        self.aspace.load(path)
175
176
    def dump_address_space(self, path):
177
        """
178
        Dump current address space to path
179
        """
180
        self.aspace.dump(path)
181
182
    async def start(self):
183
        self.logger.info('starting internal server')
184
        for edp in self.endpoints:
185
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
186
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).write_value(ua.ServerState.Running,
187
                                                                                                 ua.VariantType.Int32)
188
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).write_value(datetime.utcnow())
189
        if not self.disabled_clock:
190
            self._set_current_time()
191
192
    async def stop(self):
193
        self.logger.info('stopping internal server')
194
        self.method_service.stop()
195
        await self.isession.close_session()
196
        await self.history_manager.stop()
197
198
    def _set_current_time(self):
199
        self.loop.create_task(self.current_time_node.write_value(datetime.utcnow()))
200
        self.loop.call_later(1, self._set_current_time)
201
202
    def get_new_channel_id(self):
203
        self._channel_id_counter += 1
204
        return self._channel_id_counter
205
206
    def add_endpoint(self, endpoint):
207
        self.endpoints.append(endpoint)
208
209
    async def get_endpoints(self, params=None, sockname=None):
210
        self.logger.info('get endpoint')
211
        if sockname:
212
            # return to client the ip address it has access to
213
            edps = []
214
            for edp in self.endpoints:
215
                edp1 = copy(edp)
216
                url = urlparse(edp1.EndpointUrl)
217
                url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
218
                edp1.EndpointUrl = url.geturl()
219
                edps.append(edp1)
220
            return edps
221
        return self.endpoints[:]
222
223
    def find_servers(self, params):
224
        if not params.ServerUris:
225
            return [desc.Server for desc in self._known_servers.values()]
226
        servers = []
227
        for serv in self._known_servers.values():
228
            serv_uri = serv.Server.ApplicationUri.split(':')
229
            for uri in params.ServerUris:
230
                uri = uri.split(':')
231
                if serv_uri[:len(uri)] == uri:
232
                    servers.append(serv.Server)
233
                    break
234
        return servers
235
236
    def register_server(self, server, conf=None):
237
        appdesc = ua.ApplicationDescription()
238
        appdesc.ApplicationUri = server.ServerUri
239
        appdesc.ProductUri = server.ProductUri
240
        # FIXME: select name from client locale
241
        appdesc.ApplicationName = server.ServerNames[0]
242
        appdesc.ApplicationType = server.ServerType
243
        appdesc.DiscoveryUrls = server.DiscoveryUrls
244
        # FIXME: select discovery uri using reachability from client network
245
        appdesc.GatewayServerUri = server.GatewayServerUri
246
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
247
248
    def register_server2(self, params):
249
        return self.register_server(params.Server, params.DiscoveryConfiguration)
250
251
    def create_session(self, name, user=User(role=UserRole.Anonymous), external=False):
252
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
253
254
    async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
255
        """
256
        Set attribute Historizing of node to True and start storing data for history
257
        """
258
        await node.write_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
259
        await node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
260
        await node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
261
        await self.history_manager.historize_data_change(node, period, count)
262
263
    async def disable_history_data_change(self, node):
264
        """
265
        Set attribute Historizing of node to False and stop storing data for history
266
        """
267
        await node.write_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
268
        await node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
269
        await node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
270
        await self.history_manager.dehistorize(node)
271
272
    async def enable_history_event(self, source, period=timedelta(days=7), count=0):
273
        """
274
        Set attribute History Read of object events to True and start storing data for history
275
        """
276
        event_notifier = await source.read_event_notifier()
277
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
278
            raise ua.UaError('Node does not generate events', event_notifier)
279
        if ua.EventNotifier.HistoryRead not in event_notifier:
280
            event_notifier.add(ua.EventNotifier.HistoryRead)
281
            await source.set_event_notifier(event_notifier)
282
        await self.history_manager.historize_event(source, period, count)
283
284
    async def disable_history_event(self, source):
285
        """
286
        Set attribute History Read of node to False and stop storing data for history
287
        """
288
        await source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
289
        await self.history_manager.dehistorize(source)
290
291
    def subscribe_server_callback(self, event, handle):
292
        """
293
        Create a subscription from event to handle
294
        """
295
        self.callback_service.addListener(event, handle)
296
297
    def unsubscribe_server_callback(self, event, handle):
298
        """
299
        Remove a subscription from event to handle
300
        """
301
        self.callback_service.removeListener(event, handle)
302
303
    async def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
304
        """
305
        directly write datavalue to the Attribute, bypassing some checks and structure creation
306
        so it is a little faster
307
        """
308
        await self.aspace.write_attribute_value(nodeid, attr, datavalue)
309
310
    def set_user_manager(self, user_manager):
311
        """
312
        set up a function which that will check for authorize users. Input function takes username
313
        and password as parameters and returns True of user is allowed access, False otherwise.
314
        """
315
        self.user_manager = user_manager
316
317
    def check_user_token(self, isession, token):
318
        """
319
        unpack the username and password for the benefit of the user defined user manager
320
        """
321
        user_name = token.UserName
322
        password = token.Password
323
324
        # TODO Support all Token Types
325
        # AnonimousIdentityToken
326
        # UserIdentityToken
327
        # UserNameIdentityToken
328
        # X509IdentityToken
329
        # IssuedIdentityToken
330
331
        # decrypt password if we can
332
        if str(token.EncryptionAlgorithm) != "None":
333
            if not uacrypto:
334
                # raise  # Should I raise a significant exception?
335
                return False
336
            try:
337
                if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
338
                    raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
339
                elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
340
                    raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
341
                else:
342
                    self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
343
                    # raise  # Should I raise a significant exception?
344
                    return user_name, password
345
                length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
346
                password = raw_pw[4:4 + length]
347
                password = password.decode('utf-8')
348
            except Exception:
349
                self.logger.exception("Unable to decrypt password")
350
                return False
351
        elif type(password) == bytes:  # TODO check
352
            password = password.decode('utf-8')
353
354
        return user_name, password
355