Passed
Pull Request — master (#112)
by
unknown
02:30
created

asyncua.server.internal_server   C

Complexity

Total Complexity 54

Size/Duplication

Total Lines 344
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 241
dl 0
loc 344
rs 6.4799
c 0
b 0
f 0
wmc 54

1 Function

Rating   Name   Duplication   Size   Complexity  
A default_user_manager() 0 7 3

28 Methods

Rating   Name   Duplication   Size   Complexity  
A InternalServer.__init__() 0 24 1
A InternalServer.init() 0 7 2
A ServerDesc.__init__() 0 3 1
A InternalServer.get_endpoints() 0 13 3
A InternalServer.load_address_space() 0 5 1
B InternalServer.check_user_token() 0 26 6
A InternalServer.dump_address_space() 0 5 1
A InternalServer.unsubscribe_server_callback() 0 5 1
A InternalServer.enable_history_data_change() 0 8 1
A InternalServer.subscribe_server_callback() 0 5 1
A InternalServer.add_endpoint() 0 2 1
A InternalServer.load_standard_address_space() 0 16 4
A InternalServer.create_session() 0 2 1
A InternalServer.disable_history_data_change() 0 8 1
A InternalServer.get_new_channel_id() 0 3 1
A InternalServer.start() 0 8 3
A InternalServer.register_server() 0 11 1
A InternalServer.enable_history_event() 0 11 3
A InternalServer.set_user_manager() 0 6 1
A InternalServer.set_attribute_value() 0 6 1
A InternalServer.disable_history_event() 0 6 1
A InternalServer._address_space_fixes() 0 21 2
A InternalServer._set_current_time() 0 3 1
A InternalServer.bind_standard_methods() 0 16 3
A InternalServer.stop() 0 5 1
A InternalServer.setup_nodes() 0 29 2
A InternalServer.find_servers() 0 12 5
A InternalServer.register_server2() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like asyncua.server.internal_server often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6
import asyncio
7
from datetime import datetime, timedelta
8
from copy import copy
9
from struct import unpack_from
10
import os
11
import logging
12
from urllib.parse import urlparse
13
from typing import Coroutine
14
15
from asyncua import ua
16
from ..common.callback import CallbackDispatcher
17
from ..common.node import Node
18
from .history import HistoryManager
19
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
20
from .subscription_service import SubscriptionService
21
from .standard_address_space import standard_address_space
22
from .users import User
23
from .internal_session import InternalSession
24
from .event_generator import EventGenerator
25
26
try:
27
    from asyncua.crypto import uacrypto
28
except ImportError:
29
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
30
    uacrypto = False
31
32
33
class ServerDesc:
34
    def __init__(self, serv, cap=None):
35
        self.Server = serv
36
        self.Capabilities = cap
37
38
39
def default_user_manager(iserver, isession, username, password):
40
    """
41
    Default user_manager, does nothing much but check for admin
42
    """
43
    if iserver.allow_remote_admin and username in ("admin", "Admin"):
44
        isession.user = User.Admin
45
    return True
46
47
48
class InternalServer:
49
    """
50
    There is one `InternalServer` for every `Server`.
51
    """
52
53
    def __init__(self, loop: asyncio.AbstractEventLoop):
54
        self.loop: asyncio.AbstractEventLoop = loop
55
        self.logger = logging.getLogger(__name__)
56
        self.server_callback_dispatcher = CallbackDispatcher()
57
        self.endpoints = []
58
        self._channel_id_counter = 5
59
        self.allow_remote_admin = True
60
        self.bind_condition_methods = False
61
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
62
        self._known_servers = {}  # used if we are a discovery server
63
        self.certificate = None
64
        self.private_key = None
65
        self.aspace = AddressSpace()
66
        self.attribute_service = AttributeService(self.aspace)
67
        self.view_service = ViewService(self.aspace)
68
        self.method_service = MethodService(self.aspace)
69
        self.node_mgt_service = NodeManagementService(self.aspace)
70
        self.asyncio_transports = []
71
        self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
72
        self.history_manager = HistoryManager(self)
73
        self.user_manager = default_user_manager  # defined at the end of this file
74
        # create a session to use on server side
75
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
76
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
77
78
    async def init(self, shelffile=None):
79
        await self.load_standard_address_space(shelffile)
80
        await self._address_space_fixes()
81
        await self.setup_nodes()
82
        await self.history_manager.init()
83
        if self.bind_condition_methods:
84
            await self.bind_standard_methods()
85
86
    async def bind_standard_methods(self):
87
        refresh_start_event_type = EventGenerator(self.isession)
88
        await refresh_start_event_type.init(ua.ObjectIds.RefreshStartEventType)
89
        if isinstance(self.bind_condition_methods, int):
90
            refresh_start_event_type.event.Severity = self.bind_condition_methods
91
        self.subscription_service.standard_events[ua.ObjectIds.RefreshStartEventType] = refresh_start_event_type
92
        refresh_end_event_type = EventGenerator(self.isession)
93
        await refresh_end_event_type.init(ua.ObjectIds.RefreshEndEventType)
94
        if isinstance(self.bind_condition_methods, int):
95
            refresh_end_event_type.event.Severity = self.bind_condition_methods
96
        self.subscription_service.standard_events[ua.ObjectIds.RefreshEndEventType] = refresh_end_event_type
97
        condition_refresh_method = Node(self.isession, ua.NodeId(ua.ObjectIds.ConditionType_ConditionRefresh))
98
        self.isession.add_method_callback(condition_refresh_method.nodeid, self.subscription_service.condition_refresh)
99
        condition_refresh2_method = Node(self.isession, ua.NodeId(ua.ObjectIds.ConditionType_ConditionRefresh2))
100
        self.isession.add_method_callback(condition_refresh2_method.nodeid,
101
                                          self.subscription_service.condition_refresh2)
102
103
    async def setup_nodes(self):
104
        """
105
        Set up some nodes as defined by spec
106
        """
107
        uries = ['http://opcfoundation.org/UA/']
108
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
109
        await ns_node.set_value(uries)
110
111
        params = ua.WriteParameters()
112
        for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
113
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
114
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
115
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
116
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
117
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
118
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
119
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
120
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
121
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
122
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
123
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
124
            attr = ua.WriteValue()
125
            attr.NodeId = ua.NodeId(nodeid)
126
            attr.AttributeId = ua.AttributeIds.Value
127
            attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good))
128
            attr.Value.ServerTimestamp = datetime.utcnow()
129
            params.NodesToWrite.append(attr)
130
        result = await self.isession.write(params)
131
        result[0].check()
132
133
    async def load_standard_address_space(self, shelf_file=None):
134
        if shelf_file:
135
            is_file = (await self.loop.run_in_executor(None, os.path.isfile, shelf_file)
136
                       or await self.loop.run_in_executor(None, os.path.isfile, f'{shelf_file}.db'))
137
            if is_file:
138
                # import address space from shelf
139
                await self.loop.run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
140
                return
141
        # import address space from code generated from xml
142
        standard_address_space.fill_address_space(self.node_mgt_service)
143
        # import address space directly from xml, this has performance impact so disabled
144
        # importer = xmlimporter.XmlImporter(self.node_mgt_service)
145
        # importer.import_xml("/path/to/python-asyncua/schemas/Opc.Ua.NodeSet2.xml", self)
146
        if shelf_file:
147
            # path was supplied, but file doesn't exist - create one for next start up
148
            await self.loop.run_in_executor(None, self.aspace.make_aspace_shelf, shelf_file)
149
150
    async def _address_space_fixes(self) -> Coroutine:
151
        """
152
        Looks like the xml definition of address space has some error. This is a good place to fix them
153
        """
154
        it = ua.AddReferencesItem()
155
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
156
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
157
        it.IsForward = False
158
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
159
        it.TargetNodeClass = ua.NodeClass.Object
160
161
        it2 = ua.AddReferencesItem()
162
        it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
163
        it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
164
        it2.IsForward = False
165
        it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
166
        it2.TargetNodeClass = ua.NodeClass.Object
167
168
        results = await self.isession.add_references([it, it2])
169
        for res in results:
170
            res.check()
171
172
    def load_address_space(self, path):
173
        """
174
        Load address space from path
175
        """
176
        self.aspace.load(path)
177
178
    def dump_address_space(self, path):
179
        """
180
        Dump current address space to path
181
        """
182
        self.aspace.dump(path)
183
184
    async def start(self):
185
        self.logger.info('starting internal server')
186
        for edp in self.endpoints:
187
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
188
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
189
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
190
        if not self.disabled_clock:
191
            self._set_current_time()
192
193
    async def stop(self):
194
        self.logger.info('stopping internal server')
195
        self.method_service.stop()
196
        await self.isession.close_session()
197
        await self.history_manager.stop()
198
199
    def _set_current_time(self):
200
        self.loop.create_task(self.current_time_node.set_value(datetime.utcnow()))
201
        self.loop.call_later(1, self._set_current_time)
202
203
    def get_new_channel_id(self):
204
        self._channel_id_counter += 1
205
        return self._channel_id_counter
206
207
    def add_endpoint(self, endpoint):
208
        self.endpoints.append(endpoint)
209
210
    async def get_endpoints(self, params=None, sockname=None):
211
        self.logger.info('get endpoint')
212
        if sockname:
213
            # return to client the ip address it has access to
214
            edps = []
215
            for edp in self.endpoints:
216
                edp1 = copy(edp)
217
                url = urlparse(edp1.EndpointUrl)
218
                url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
219
                edp1.EndpointUrl = url.geturl()
220
                edps.append(edp1)
221
            return edps
222
        return self.endpoints[:]
223
224
    def find_servers(self, params):
225
        if not params.ServerUris:
226
            return [desc.Server for desc in self._known_servers.values()]
227
        servers = []
228
        for serv in self._known_servers.values():
229
            serv_uri = serv.Server.ApplicationUri.split(':')
230
            for uri in params.ServerUris:
231
                uri = uri.split(':')
232
                if serv_uri[:len(uri)] == uri:
233
                    servers.append(serv.Server)
234
                    break
235
        return servers
236
237
    def register_server(self, server, conf=None):
238
        appdesc = ua.ApplicationDescription()
239
        appdesc.ApplicationUri = server.ServerUri
240
        appdesc.ProductUri = server.ProductUri
241
        # FIXME: select name from client locale
242
        appdesc.ApplicationName = server.ServerNames[0]
243
        appdesc.ApplicationType = server.ServerType
244
        appdesc.DiscoveryUrls = server.DiscoveryUrls
245
        # FIXME: select discovery uri using reachability from client network
246
        appdesc.GatewayServerUri = server.GatewayServerUri
247
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
248
249
    def register_server2(self, params):
250
        return self.register_server(params.Server, params.DiscoveryConfiguration)
251
252
    def create_session(self, name, user=User.Anonymous, external=False):
253
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
254
255
    async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
256
        """
257
        Set attribute Historizing of node to True and start storing data for history
258
        """
259
        await node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
260
        await node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
261
        await node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
262
        await self.history_manager.historize_data_change(node, period, count)
263
264
    async def disable_history_data_change(self, node):
265
        """
266
        Set attribute Historizing of node to False and stop storing data for history
267
        """
268
        await node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
269
        await node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
270
        await node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
271
        await self.history_manager.dehistorize(node)
272
273
    async def enable_history_event(self, source, period=timedelta(days=7), count=0):
274
        """
275
        Set attribute History Read of object events to True and start storing data for history
276
        """
277
        event_notifier = await source.get_event_notifier()
278
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
279
            raise ua.UaError('Node does not generate events', event_notifier)
280
        if ua.EventNotifier.HistoryRead not in event_notifier:
281
            event_notifier.add(ua.EventNotifier.HistoryRead)
282
            await source.set_event_notifier(event_notifier)
283
        await self.history_manager.historize_event(source, period, count)
284
285
    async def disable_history_event(self, source):
286
        """
287
        Set attribute History Read of node to False and stop storing data for history
288
        """
289
        await source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
290
        await self.history_manager.dehistorize(source)
291
292
    def subscribe_server_callback(self, event, handle):
293
        """
294
        Create a subscription from event to handle
295
        """
296
        self.server_callback_dispatcher.addListener(event, handle)
297
298
    def unsubscribe_server_callback(self, event, handle):
299
        """
300
        Remove a subscription from event to handle
301
        """
302
        self.server_callback_dispatcher.removeListener(event, handle)
303
304
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
305
        """
306
        directly write datavalue to the Attribute, bypassing some checks and structure creation
307
        so it is a little faster
308
        """
309
        self.aspace.set_attribute_value(nodeid, attr, datavalue)
310
311
    def set_user_manager(self, user_manager):
312
        """
313
        set up a function which that will check for authorize users. Input function takes username
314
        and password as parameters and returns True of user is allowed access, False otherwise.
315
        """
316
        self.user_manager = user_manager
317
318
    def check_user_token(self, isession, token):
319
        """
320
        unpack the username and password for the benefit of the user defined user manager
321
        """
322
        user_name = token.UserName
323
        password = token.Password
324
        # decrypt password if we can
325
        if str(token.EncryptionAlgorithm) != "None":
326
            if not uacrypto:
327
                return False
328
            try:
329
                if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
330
                    raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
331
                elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
332
                    raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
333
                else:
334
                    self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
335
                    return False
336
                length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
337
                password = raw_pw[4:4 + length]
338
                password = password.decode('utf-8')
339
            except Exception:
340
                self.logger.exception("Unable to decrypt password")
341
                return False
342
        # call user_manager
343
        return self.user_manager(self, isession, user_name, password)
344