Completed
Push — master ( 6c504f...d79d20 )
by Olivier
04:07 queued 01:36
created

InternalSession.modify_subscription()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import logging
2
from enum import Enum
3
from typing import Coroutine, Iterable, Optional
4
5
from asyncua import ua
6
from ..common.callback import CallbackType, ServerItemCallback
7
from ..common.utils import create_nonce, ServiceError
8
from .address_space import AddressSpace
9
from .users import User, UserRole
10
from .subscription_service import SubscriptionService
11
12
13
class SessionState(Enum):
14
    Created = 0
15
    Activated = 1
16
    Closed = 2
17
18
19
class InternalSession:
20
    """
21
22
    """
23
    max_connections = 1000
24
    _current_connections = 0
25
    _counter = 10
26
    _auth_counter = 1000
27
28
    def __init__(self, internal_server, aspace: AddressSpace, submgr: SubscriptionService, name,
29
                 user=User(role=UserRole.Anonymous), external=False):
30
        self.logger = logging.getLogger(__name__)
31
        self.iserver = internal_server
32
        # define if session is external, we need to copy some objects if it is internal
33
        self.external = external
34
        self.aspace: AddressSpace = aspace
35
        self.subscription_service: SubscriptionService = submgr
36
        self.name = name
37
        self.user = user
38
        self.nonce = None
39
        self.state = SessionState.Created
40
        self.session_id = ua.NodeId(self._counter)
41
        InternalSession._counter += 1
42
        self.subscriptions = []
43
        self.auth_token = ua.NodeId(self._auth_counter)
44
        InternalSession._auth_counter += 1
45
        self.logger.info('Created internal session %s', self.name)
46
47
    def __str__(self):
48
        return f'InternalSession(name:{self.name},' \
49
               f' user:{self.user}, id:{self.session_id}, auth_token:{self.auth_token})'
50
51
    async def get_endpoints(self, params=None, sockname=None):
52
        return await self.iserver.get_endpoints(params, sockname)
53
54
    async def create_session(self, params, sockname=None):
55
        self.logger.info('Create session request')
56
        result = ua.CreateSessionResult()
57
        result.SessionId = self.session_id
58
        result.AuthenticationToken = self.auth_token
59
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
60
        result.MaxRequestMessageSize = 65536
61
        self.nonce = create_nonce(32)
62
        result.ServerNonce = self.nonce
63
        result.ServerEndpoints = await self.get_endpoints(sockname=sockname)
64
65
        return result
66
67
    async def close_session(self, delete_subs=True):
68
        self.logger.info('close session %s', self.name)
69
        if self.state == SessionState.Activated:
70
            InternalSession._current_connections -= 1
71
        if InternalSession._current_connections < 0:
72
            InternalSession._current_connections = 0
73
        self.state = SessionState.Closed
74
        await self.delete_subscriptions(self.subscriptions)
75
76
    def activate_session(self, params, peer_certificate):
77
        self.logger.info('activate session')
78
        result = ua.ActivateSessionResult()
79
        if self.state != SessionState.Created:
80
            raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
81
        if InternalSession._current_connections >= InternalSession.max_connections:
82
            raise ServiceError(ua.StatusCodes.BadMaxConnectionsReached)
83
        self.nonce = create_nonce(32)
84
        result.ServerNonce = self.nonce
85
        for _ in params.ClientSoftwareCertificates:
86
            result.Results.append(ua.StatusCode())
87
        self.state = SessionState.Activated
88
        InternalSession._current_connections += 1
89
        id_token = params.UserIdentityToken
90
        if self.iserver.user_manager is not None:
91
            if isinstance(id_token, ua.UserNameIdentityToken):
92
                username = id_token.UserName
93
                password = id_token.Password
94
            else:
95
                username, password = None, None
96
97
            user = self.iserver.user_manager.get_user(self.iserver, username=username, password=password,
98
                                                      certificate=peer_certificate)
99
            if user is None:
100
                raise ServiceError(ua.StatusCodes.BadUserAccessDenied)
101
            else:
102
                self.user = user
103
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
104
        return result
105
106
    async def read(self, params):
107
        if self.user is None:
108
            user = User()
109
        else:
110
            user = self.user
111
        await self.iserver.callback_service.dispatch(CallbackType.PreRead,
112
                                                     ServerItemCallback(params, None, user))
113
        results = self.iserver.attribute_service.read(params)
114
        await self.iserver.callback_service.dispatch(CallbackType.PostRead,
115
                                                     ServerItemCallback(params, results, user))
116
        return results
117
118
    async def history_read(self, params) -> Coroutine:
119
        return await self.iserver.history_manager.read_history(params)
120
121
    async def write(self, params):
122
        if self.user is None:
123
            user = User()
124
        else:
125
            user = self.user
126
        await self.iserver.callback_service.dispatch(CallbackType.PreWrite,
127
                                                     ServerItemCallback(params, None, user))
128
        write_result = await self.iserver.attribute_service.write(params, user=user)
129
        await self.iserver.callback_service.dispatch(CallbackType.PostWrite,
130
                                                     ServerItemCallback(params, write_result, user))
131
        return write_result
132
133
    async def browse(self, params):
134
        return self.iserver.view_service.browse(params)
135
136
    async def translate_browsepaths_to_nodeids(self, params):
137
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
138
139
    async def add_nodes(self, params):
140
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
141
142
    async def delete_nodes(self, params):
143
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
144
145
    async def add_references(self, params):
146
        return self.iserver.node_mgt_service.add_references(params, self.user)
147
148
    async def delete_references(self, params):
149
        return self.iserver.node_mgt_service.delete_references(params, self.user)
150
151
    def add_method_callback(self, methodid, callback):
152
        return self.aspace.add_method_callback(methodid, callback)
153
154
    async def call(self, params):
155
        """COROUTINE"""
156
        return await self.iserver.method_service.call(params)
157
158
    async def create_subscription(self, params, callback=None):
159
        result = await self.subscription_service.create_subscription(params, callback, external=self.external)
160
        self.subscriptions.append(result.SubscriptionId)
161
        return result
162
163
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
164
        """Returns Future"""
165
        subscription_result = await self.subscription_service.create_monitored_items(params)
166
        await self.iserver.callback_service.dispatch(CallbackType.ItemSubscriptionCreated,
167
                                                     ServerItemCallback(params, subscription_result))
168
        return subscription_result
169
170
    async def modify_monitored_items(self, params):
171
        subscription_result = self.subscription_service.modify_monitored_items(params)
172
        await self.iserver.callback_service.dispatch(CallbackType.ItemSubscriptionModified,
173
                                                     ServerItemCallback(params, subscription_result))
174
        return subscription_result
175
176
    def republish(self, params):
177
        return self.subscription_service.republish(params)
178
179
    async def delete_subscriptions(self, ids):
180
        # This is an async method, dues to symmetry with client code
181
        return await self.subscription_service.delete_subscriptions(ids)
182
183
    async def delete_monitored_items(self, params):
184
        # This is an async method, dues to symmetry with client code
185
        subscription_result = self.subscription_service.delete_monitored_items(params)
186
        await self.iserver.callback_service.dispatch(CallbackType.ItemSubscriptionDeleted,
187
                                                     ServerItemCallback(params, subscription_result))
188
189
        return subscription_result
190
191
    def publish(self, acks: Optional[Iterable[ua.SubscriptionAcknowledgement]] = None):
192
        return self.subscription_service.publish(acks or [])
193
194
    def modify_subscription(self, params, callback):
195
        return self.subscription_service.modify_subscription(params, callback)
196