Completed
Push — master ( 4bbb48...d16ca4 )
by Olivier
11:53 queued 09:32
created

InternalServer.write_attribute_value()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""
2
Internal server implementing opcu-ua interface.
3
Can be used on server side or to implement binary/https opc-ua servers
4
"""
5
6
import asyncio
7
from datetime import datetime, timedelta
8
from copy import copy
9
from struct import unpack_from
10
import os
11
import logging
12
from urllib.parse import urlparse
13
from typing import Coroutine
14
15
from asyncua import ua
16
from ..common.callback import CallbackDispatcher
17
from ..common.node import Node
18
from .history import HistoryManager
19
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
20
from .subscription_service import SubscriptionService
21
from .standard_address_space import standard_address_space
22
from .users import User
23
from .internal_session import InternalSession
24
25
try:
26
    from asyncua.crypto import uacrypto
27
except ImportError:
28
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
29
    uacrypto = False
30
31
32
class ServerDesc:
33
    def __init__(self, serv, cap=None):
34
        self.Server = serv
35
        self.Capabilities = cap
36
37
38
def default_user_manager(iserver, isession, username, password):
39
    """
40
    Default user_manager, does nothing much but check for admin
41
    """
42
    if iserver.allow_remote_admin and username in ("admin", "Admin"):
43
        isession.user = User.Admin
44
    return True
45
46
47
class InternalServer:
48
    """
49
    There is one `InternalServer` for every `Server`.
50
    """
51
52
    def __init__(self, loop: asyncio.AbstractEventLoop):
53
        self.loop: asyncio.AbstractEventLoop = loop
54
        self.logger = logging.getLogger(__name__)
55
        self.server_callback_dispatcher = CallbackDispatcher()
56
        self.endpoints = []
57
        self._channel_id_counter = 5
58
        self.allow_remote_admin = True
59
        self.disabled_clock = False  # for debugging we may want to disable clock that writes too much in log
60
        self._known_servers = {}  # used if we are a discovery server
61
        self.certificate = None
62
        self.private_key = None
63
        self.aspace = AddressSpace()
64
        self.attribute_service = AttributeService(self.aspace)
65
        self.view_service = ViewService(self.aspace)
66
        self.method_service = MethodService(self.aspace)
67
        self.node_mgt_service = NodeManagementService(self.aspace)
68
        self.asyncio_transports = []
69
        self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
70
        self.history_manager = HistoryManager(self)
71
        self.user_manager = default_user_manager  # defined at the end of this file
72
        # create a session to use on server side
73
        self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
74
        self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
75
76
    async def init(self, shelffile=None):
77
        await self.load_standard_address_space(shelffile)
78
        await self._address_space_fixes()
79
        await self.setup_nodes()
80
        await self.history_manager.init()
81
82
    async def setup_nodes(self):
83
        """
84
        Set up some nodes as defined by spec
85
        """
86
        uries = ['http://opcfoundation.org/UA/']
87
        ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
88
        await ns_node.write_value(uries)
89
90
        params = ua.WriteParameters()
91
        for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
92
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
93
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
94
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
95
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
96
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
97
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
98
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
99
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
100
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
101
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
102
                       ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
103
            attr = ua.WriteValue()
104
            attr.NodeId = ua.NodeId(nodeid)
105
            attr.AttributeId = ua.AttributeIds.Value
106
            attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good))
107
            attr.Value.ServerTimestamp = datetime.utcnow()
108
            params.NodesToWrite.append(attr)
109
        result = await self.isession.write(params)
110
        result[0].check()
111
112
    async def load_standard_address_space(self, shelf_file=None):
113
        if shelf_file:
114
            is_file = (await self.loop.run_in_executor(None, os.path.isfile, shelf_file)
115
                       or await self.loop.run_in_executor(None, os.path.isfile, f'{shelf_file}.db'))
116
            if is_file:
117
                # import address space from shelf
118
                await self.loop.run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
119
                return
120
        # import address space from code generated from xml
121
        standard_address_space.fill_address_space(self.node_mgt_service)
122
        # import address space directly from xml, this has performance impact so disabled
123
        # importer = xmlimporter.XmlImporter(self.node_mgt_service)
124
        # importer.import_xml("/path/to/python-asyncua/schemas/Opc.Ua.NodeSet2.xml", self)
125
        if shelf_file:
126
            # path was supplied, but file doesn't exist - create one for next start up
127
            await self.loop.run_in_executor(None, self.aspace.make_aspace_shelf, shelf_file)
128
129
    async def _address_space_fixes(self) -> Coroutine:
130
        """
131
        Looks like the xml definition of address space has some error. This is a good place to fix them
132
        """
133
        it = ua.AddReferencesItem()
134
        it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
135
        it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
136
        it.IsForward = False
137
        it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
138
        it.TargetNodeClass = ua.NodeClass.Object
139
140
        it2 = ua.AddReferencesItem()
141
        it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
142
        it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
143
        it2.IsForward = False
144
        it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
145
        it2.TargetNodeClass = ua.NodeClass.Object
146
147
        results = await self.isession.add_references([it, it2])
148
        for res in results:
149
            res.check()
150
151
    def load_address_space(self, path):
152
        """
153
        Load address space from path
154
        """
155
        self.aspace.load(path)
156
157
    def dump_address_space(self, path):
158
        """
159
        Dump current address space to path
160
        """
161
        self.aspace.dump(path)
162
163
    async def start(self):
164
        self.logger.info('starting internal server')
165
        for edp in self.endpoints:
166
            self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
167
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).write_value(ua.ServerState.Running, ua.VariantType.Int32)
168
        await Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).write_value(datetime.utcnow())
169
        if not self.disabled_clock:
170
            self._set_current_time()
171
172
    async def stop(self):
173
        self.logger.info('stopping internal server')
174
        self.method_service.stop()
175
        await self.isession.close_session()
176
        await self.history_manager.stop()
177
178
    def _set_current_time(self):
179
        self.loop.create_task(self.current_time_node.write_value(datetime.utcnow()))
180
        self.loop.call_later(1, self._set_current_time)
181
182
    def get_new_channel_id(self):
183
        self._channel_id_counter += 1
184
        return self._channel_id_counter
185
186
    def add_endpoint(self, endpoint):
187
        self.endpoints.append(endpoint)
188
189
    async def get_endpoints(self, params=None, sockname=None):
190
        self.logger.info('get endpoint')
191
        if sockname:
192
            # return to client the ip address it has access to
193
            edps = []
194
            for edp in self.endpoints:
195
                edp1 = copy(edp)
196
                url = urlparse(edp1.EndpointUrl)
197
                url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
198
                edp1.EndpointUrl = url.geturl()
199
                edps.append(edp1)
200
            return edps
201
        return self.endpoints[:]
202
203
    def find_servers(self, params):
204
        if not params.ServerUris:
205
            return [desc.Server for desc in self._known_servers.values()]
206
        servers = []
207
        for serv in self._known_servers.values():
208
            serv_uri = serv.Server.ApplicationUri.split(':')
209
            for uri in params.ServerUris:
210
                uri = uri.split(':')
211
                if serv_uri[:len(uri)] == uri:
212
                    servers.append(serv.Server)
213
                    break
214
        return servers
215
216
    def register_server(self, server, conf=None):
217
        appdesc = ua.ApplicationDescription()
218
        appdesc.ApplicationUri = server.ServerUri
219
        appdesc.ProductUri = server.ProductUri
220
        # FIXME: select name from client locale
221
        appdesc.ApplicationName = server.ServerNames[0]
222
        appdesc.ApplicationType = server.ServerType
223
        appdesc.DiscoveryUrls = server.DiscoveryUrls
224
        # FIXME: select discovery uri using reachability from client network
225
        appdesc.GatewayServerUri = server.GatewayServerUri
226
        self._known_servers[server.ServerUri] = ServerDesc(appdesc, conf)
227
228
    def register_server2(self, params):
229
        return self.register_server(params.Server, params.DiscoveryConfiguration)
230
231
    def create_session(self, name, user=User.Anonymous, external=False):
232
        return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
233
234
    async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
235
        """
236
        Set attribute Historizing of node to True and start storing data for history
237
        """
238
        await node.write_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
239
        await node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
240
        await node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
241
        await self.history_manager.historize_data_change(node, period, count)
242
243
    async def disable_history_data_change(self, node):
244
        """
245
        Set attribute Historizing of node to False and stop storing data for history
246
        """
247
        await node.write_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
248
        await node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
249
        await node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
250
        await self.history_manager.dehistorize(node)
251
252
    async def enable_history_event(self, source, period=timedelta(days=7), count=0):
253
        """
254
        Set attribute History Read of object events to True and start storing data for history
255
        """
256
        event_notifier = await source.read_event_notifier()
257
        if ua.EventNotifier.SubscribeToEvents not in event_notifier:
258
            raise ua.UaError('Node does not generate events', event_notifier)
259
        if ua.EventNotifier.HistoryRead not in event_notifier:
260
            event_notifier.add(ua.EventNotifier.HistoryRead)
261
            await source.set_event_notifier(event_notifier)
262
        await self.history_manager.historize_event(source, period, count)
263
264
    async def disable_history_event(self, source):
265
        """
266
        Set attribute History Read of node to False and stop storing data for history
267
        """
268
        await source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
269
        await self.history_manager.dehistorize(source)
270
271
    def subscribe_server_callback(self, event, handle):
272
        """
273
        Create a subscription from event to handle
274
        """
275
        self.server_callback_dispatcher.addListener(event, handle)
276
277
    def unsubscribe_server_callback(self, event, handle):
278
        """
279
        Remove a subscription from event to handle
280
        """
281
        self.server_callback_dispatcher.removeListener(event, handle)
282
283
    def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
284
        """
285
        directly write datavalue to the Attribute, bypassing some checks and structure creation
286
        so it is a little faster
287
        """
288
        self.aspace.write_attribute_value(nodeid, attr, datavalue)
289
290
    def set_user_manager(self, user_manager):
291
        """
292
        set up a function which that will check for authorize users. Input function takes username
293
        and password as parameters and returns True of user is allowed access, False otherwise.
294
        """
295
        self.user_manager = user_manager
296
297
    def check_user_token(self, isession, token):
298
        """
299
        unpack the username and password for the benefit of the user defined user manager
300
        """
301
        user_name = token.UserName
302
        password = token.Password
303
        # decrypt password if we can
304
        if str(token.EncryptionAlgorithm) != "None":
305
            if not uacrypto:
306
                return False
307
            try:
308
                if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
309
                    raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
310
                elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
311
                    raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
312
                else:
313
                    self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
314
                    return False
315
                length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
316
                password = raw_pw[4:4 + length]
317
                password = password.decode('utf-8')
318
            except Exception:
319
                self.logger.exception("Unable to decrypt password")
320
                return False
321
        # call user_manager
322
        return self.user_manager(self, isession, user_name, password)
323