Passed
Pull Request — master (#120)
by Olivier
02:31
created

InternalSession.activate_session()   B

Complexity

Conditions 6

Size

Total Lines 19
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 19
nop 2
dl 0
loc 19
rs 8.5166
c 0
b 0
f 0
1
import logging
2
from enum import Enum
3
from typing import Coroutine, Iterable, Optional
4
5
from asyncua import ua
6
from ..common.callback import CallbackType, ServerItemCallback
7
from ..common.utils import create_nonce, ServiceError
8
from .address_space import AddressSpace
9
from .users import User
10
from .subscription_service import SubscriptionService
11
12
13
class SessionState(Enum):
14
    Created = 0
15
    Activated = 1
16
    Closed = 2
17
18
19
class InternalSession:
20
    """
21
22
    """
23
    max_connections = 1000
24
    _current_connections = 0
25
    _counter = 10
26
    _auth_counter = 1000
27
28
    def __init__(self, internal_server, aspace: AddressSpace, submgr: SubscriptionService, name, user=User.Anonymous,
29
                 external=False):
30
        self.logger = logging.getLogger(__name__)
31
        self.iserver = internal_server
32
        # define if session is external, we need to copy some objects if it is internal
33
        self.external = external
34
        self.aspace: AddressSpace = aspace
35
        self.subscription_service: SubscriptionService = submgr
36
        self.name = name
37
        self.user = user
38
        self.nonce = None
39
        self.state = SessionState.Created
40
        self.session_id = ua.NodeId(self._counter)
41
        InternalSession._counter += 1
42
        self.subscriptions = []
43
        self.auth_token = ua.NodeId(self._auth_counter)
44
        InternalSession._auth_counter += 1
45
        self.logger.info('Created internal session %s', self.name)
46
47
    def __str__(self):
48
        return f'InternalSession(name:{self.name}, user:{self.user}, id:{self.session_id}, auth_token:{self.auth_token})'
49
50
51
    async def get_endpoints(self, params=None, sockname=None):
52
        return await self.iserver.get_endpoints(params, sockname)
53
54
    async def create_session(self, params, sockname=None):
55
        self.logger.info('Create session request')
56
        result = ua.CreateSessionResult()
57
        result.SessionId = self.session_id
58
        result.AuthenticationToken = self.auth_token
59
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
60
        result.MaxRequestMessageSize = 65536
61
        self.nonce = create_nonce(32)
62
        result.ServerNonce = self.nonce
63
        result.ServerEndpoints = await self.get_endpoints(sockname=sockname)
64
65
        return result
66
67
    async def close_session(self, delete_subs=True):
68
        self.logger.info('close session %s', self.name)
69
        if self.state == SessionState.Activated:
70
            InternalSession._current_connections -= 1
71
        if InternalSession._current_connections < 0:
72
            InternalSession._current_connections = 0
73
        self.state = SessionState.Closed
74
        await self.delete_subscriptions(self.subscriptions)
75
76
    def activate_session(self, params):
77
        self.logger.info('activate session')
78
        result = ua.ActivateSessionResult()
79
        if self.state != SessionState.Created:
80
            raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
81
        if InternalSession._current_connections >= InternalSession.max_connections:
82
            raise ServiceError(ua.StatusCodes.BadMaxConnectionsReached)
83
        self.nonce = create_nonce(32)
84
        result.ServerNonce = self.nonce
85
        for _ in params.ClientSoftwareCertificates:
86
            result.Results.append(ua.StatusCode())
87
        self.state = SessionState.Activated
88
        InternalSession._current_connections += 1
89
        id_token = params.UserIdentityToken
90
        if isinstance(id_token, ua.UserNameIdentityToken):
91
            if self.iserver.check_user_token(self, id_token) is False:
92
                raise ServiceError(ua.StatusCodes.BadUserAccessDenied)
93
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
94
        return result
95
96
    async def read(self, params):
97
        results = self.iserver.attribute_service.read(params)
98
        return results
99
100
    def history_read(self, params) -> Coroutine:
101
        return self.iserver.history_manager.read_history(params)
102
103
    async def write(self, params):
104
        return self.iserver.attribute_service.write(params, self.user)
105
106
    async def browse(self, params):
107
        return self.iserver.view_service.browse(params)
108
109
    async def translate_browsepaths_to_nodeids(self, params):
110
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
111
112
    async def add_nodes(self, params):
113
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
114
115
    async def delete_nodes(self, params):
116
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
117
118
    async def add_references(self, params):
119
        return self.iserver.node_mgt_service.add_references(params, self.user)
120
121
    async def delete_references(self, params):
122
        return self.iserver.node_mgt_service.delete_references(params, self.user)
123
124
    def add_method_callback(self, methodid, callback):
125
        return self.aspace.add_method_callback(methodid, callback)
126
127
    def call(self, params):
128
        """COROUTINE"""
129
        return self.iserver.method_service.call(params)
130
131
    async def create_subscription(self, params, callback=None):
132
        result = await self.subscription_service.create_subscription(params, callback, external=self.external)
133
        self.subscriptions.append(result.SubscriptionId)
134
        return result
135
136
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
137
        """Returns Future"""
138
        subscription_result = await self.subscription_service.create_monitored_items(params)
139
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionCreated,
140
            ServerItemCallback(params, subscription_result))
141
        return subscription_result
142
143
    async def modify_monitored_items(self, params):
144
        subscription_result = self.subscription_service.modify_monitored_items(params)
145
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionModified,
146
            ServerItemCallback(params, subscription_result))
147
        return subscription_result
148
149
    def republish(self, params):
150
        return self.subscription_service.republish(params)
151
152
    async def delete_subscriptions(self, ids):
153
        # This is an async method, dues to symmetry with client code
154
        return await self.subscription_service.delete_subscriptions(ids)
155
156
    async def delete_monitored_items(self, params):
157
        # This is an async method, dues to symmetry with client code
158
        subscription_result = self.subscription_service.delete_monitored_items(params)
159
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionDeleted,
160
            ServerItemCallback(params, subscription_result))
161
        return subscription_result
162
163
    def publish(self, acks: Optional[Iterable[ua.SubscriptionAcknowledgement]] = None):
164
        return self.subscription_service.publish(acks or [])
165