Passed
Pull Request — master (#45)
by
unknown
02:29
created

InternalSession.write()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from enum import Enum
4
from typing import Coroutine
5
6
from asyncua import ua
7
from ..common.callback import CallbackType, ServerItemCallback
8
from ..common.utils import create_nonce, ServiceError
9
from .users import User
10
from .subscription_service import SubscriptionService
11
12
13
class SessionState(Enum):
14
    Created = 0
15
    Activated = 1
16
    Closed = 2
17
18
19
class InternalSession:
20
    """
21
22
    """
23
    _counter = 10
24
    _auth_counter = 1000
25
26
    def __init__(self, internal_server, aspace, submgr: SubscriptionService, name, user=User.Anonymous, external=False):
27
        self.logger = logging.getLogger(__name__)
28
        self.iserver = internal_server
29
        # define if session is external, we need to copy some objects if it is internal
30
        self.external = external
31
        self.aspace = aspace
32
        self.subscription_service: SubscriptionService = submgr
33
        self.name = name
34
        self.user = user
35
        self.nonce = None
36
        self.state = SessionState.Created
37
        self.session_id = ua.NodeId(self._counter)
38
        InternalSession._counter += 1
39
        self.authentication_token = ua.NodeId(self._auth_counter)
40
        InternalSession._auth_counter += 1
41
        self.logger.info('Created internal session %s', self.name)
42
43
    def __str__(self):
44
        return 'InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})'.format(
45
            self.name, self.user, self.session_id, self.authentication_token)
46
47
    async def get_endpoints(self, params=None, sockname=None):
48
        return await self.iserver.get_endpoints(params, sockname)
49
50
    async def create_session(self, params, sockname=None):
51
        self.logger.info('Create session request')
52
53
        result = ua.CreateSessionResult()
54
        result.SessionId = self.session_id
55
        result.AuthenticationToken = self.authentication_token
56
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
57
        result.MaxRequestMessageSize = 65536
58
        self.nonce = create_nonce(32)
59
        result.ServerNonce = self.nonce
60
        result.ServerEndpoints = await self.get_endpoints(sockname=sockname)
61
62
        return result
63
64
    async def close_session(self, delete_subs=True):
65
        self.logger.info('close session %s')
66
        self.state = SessionState.Closed
67
        await self.delete_subscriptions(list(self.subscription_service.subscriptions.keys()))
68
69
    def activate_session(self, params):
70
        self.logger.info('activate session')
71
        result = ua.ActivateSessionResult()
72
        if self.state != SessionState.Created:
73
            raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
74
        self.nonce = create_nonce(32)
75
        result.ServerNonce = self.nonce
76
        for _ in params.ClientSoftwareCertificates:
77
            result.Results.append(ua.StatusCode())
78
        self.state = SessionState.Activated
79
        id_token = params.UserIdentityToken
80
        if isinstance(id_token, ua.UserNameIdentityToken):
81
            if self.iserver.check_user_token(self, id_token) is False:
82
                raise ServiceError(ua.StatusCodes.BadUserAccessDenied)
83
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
84
        return result
85
86
    async def read(self, params):
87
        results = self.iserver.attribute_service.read(params)
88
        return results
89
90
    def history_read(self, params) -> Coroutine:
91
        return self.iserver.history_manager.read_history(params)
92
93
    async def write(self, params):
94
        return self.iserver.attribute_service.write(params, self.user)
95
96
    async def browse(self, params):
97
        return self.iserver.view_service.browse(params)
98
99
    async def translate_browsepaths_to_nodeids(self, params):
100
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
101
102
    async def add_nodes(self, params):
103
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
104
105
    async def delete_nodes(self, params):
106
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
107
108
    async def add_references(self, params):
109
        return self.iserver.node_mgt_service.add_references(params, self.user)
110
111
    async def delete_references(self, params):
112
        return self.iserver.node_mgt_service.delete_references(params, self.user)
113
114
    async def add_method_callback(self, methodid, callback):
115
        return self.aspace.add_method_callback(methodid, callback)
116
117
    def call(self, params):
118
        """COROUTINE"""
119
        return self.iserver.method_service.call(params)
120
121
    async def create_subscription(self, params, callback=None):
122
        result = await self.subscription_service.create_subscription(params, callback)
123
        return result
124
125
    async def create_monitored_items(self, params):
126
        """Returns Future"""
127
        subscription_result = await self.subscription_service.create_monitored_items(params)
128
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionCreated,
129
                                                         ServerItemCallback(params, subscription_result))
130
        return subscription_result
131
132
    async def modify_monitored_items(self, params):
133
        subscription_result = self.subscription_service.modify_monitored_items(params)
134
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionModified,
135
                                                         ServerItemCallback(params, subscription_result))
136
        return subscription_result
137
138
    def republish(self, params):
139
        return self.subscription_service.republish(params)
140
141
    async def delete_subscriptions(self, ids):
142
        # This is an async method, dues to symmetry with client code
143
        return await self.subscription_service.delete_subscriptions(ids)
144
145
    async def delete_monitored_items(self, params):
146
        # This is an async method, dues to symmetry with client code
147
        subscription_result = self.subscription_service.delete_monitored_items(params)
148
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionDeleted,
149
                                                         ServerItemCallback(params, subscription_result))
150
        return subscription_result
151
152
    def publish(self, acks=None):
153
        """
154
        ???
155
        """
156
        #
157
        if acks is None:
158
            acks = []
159
        return self.subscription_service.publish(acks)
160
161
162