Passed
Pull Request — master (#45)
by
unknown
02:35
created

InternalSession.write()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import logging
2
from enum import Enum
3
from typing import Coroutine, Iterable, Optional
4
5
from asyncua import ua
6
from ..common.callback import CallbackType, ServerItemCallback
7
from ..common.utils import create_nonce, ServiceError
8
from .users import User
9
from .subscription_service import SubscriptionService
10
11
12
class SessionState(Enum):
13
    Created = 0
14
    Activated = 1
15
    Closed = 2
16
17
18
class InternalSession:
19
    """
20
21
    """
22
    _counter = 10
23
    _auth_counter = 1000
24
25
    def __init__(self, internal_server, aspace, submgr: SubscriptionService, name, user=User.Anonymous, external=False):
26
        self.logger = logging.getLogger(__name__)
27
        self.iserver = internal_server
28
        # define if session is external, we need to copy some objects if it is internal
29
        self.external = external
30
        self.aspace = aspace
31
        self.subscription_service: SubscriptionService = submgr
32
        self.name = name
33
        self.user = user
34
        self.nonce = None
35
        self.state = SessionState.Created
36
        self.session_id = ua.NodeId(self._counter)
37
        InternalSession._counter += 1
38
        self.auth_token = ua.NodeId(self._auth_counter)
39
        InternalSession._auth_counter += 1
40
        self.logger.info('Created internal session %s', self.name)
41
42
    def __str__(self):
43
        return f'InternalSession(name:{self.name}, user:{self.user}, id:{self.session_id}, auth_token:{self.auth_token})'
44
45
    async def get_endpoints(self, params=None, sockname=None):
46
        return await self.iserver.get_endpoints(params, sockname)
47
48
    async def create_session(self, params, sockname=None):
49
        self.logger.info('Create session request')
50
        result = ua.CreateSessionResult()
51
        result.SessionId = self.session_id
52
        result.AuthenticationToken = self.auth_token
53
        result.RevisedSessionTimeout = params.RequestedSessionTimeout
54
        result.MaxRequestMessageSize = 65536
55
        self.nonce = create_nonce(32)
56
        result.ServerNonce = self.nonce
57
        result.ServerEndpoints = await self.get_endpoints(sockname=sockname)
58
59
        return result
60
61
    async def close_session(self, delete_subs=True):
62
        self.logger.info('close session %s', self.name)
63
        self.state = SessionState.Closed
64
        await self.delete_subscriptions(list(self.subscription_service.subscriptions.keys()))
65
66
    def activate_session(self, params):
67
        self.logger.info('activate session')
68
        result = ua.ActivateSessionResult()
69
        if self.state != SessionState.Created:
70
            raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
71
        self.nonce = create_nonce(32)
72
        result.ServerNonce = self.nonce
73
        for _ in params.ClientSoftwareCertificates:
74
            result.Results.append(ua.StatusCode())
75
        self.state = SessionState.Activated
76
        id_token = params.UserIdentityToken
77
        if isinstance(id_token, ua.UserNameIdentityToken):
78
            if self.iserver.check_user_token(self, id_token) is False:
79
                raise ServiceError(ua.StatusCodes.BadUserAccessDenied)
80
        self.logger.info("Activated internal session %s for user %s", self.name, self.user)
81
        return result
82
83
    async def read(self, params):
84
        results = self.iserver.attribute_service.read(params)
85
        return results
86
87
    def history_read(self, params) -> Coroutine:
88
        return self.iserver.history_manager.read_history(params)
89
90
    async def write(self, params):
91
        return self.iserver.attribute_service.write(params, self.user)
92
93
    async def browse(self, params):
94
        return self.iserver.view_service.browse(params)
95
96
    async def translate_browsepaths_to_nodeids(self, params):
97
        return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
98
99
    async def add_nodes(self, params):
100
        return self.iserver.node_mgt_service.add_nodes(params, self.user)
101
102
    async def delete_nodes(self, params):
103
        return self.iserver.node_mgt_service.delete_nodes(params, self.user)
104
105
    async def add_references(self, params):
106
        return self.iserver.node_mgt_service.add_references(params, self.user)
107
108
    async def delete_references(self, params):
109
        return self.iserver.node_mgt_service.delete_references(params, self.user)
110
111
    async def add_method_callback(self, methodid, callback):
112
        return self.aspace.add_method_callback(methodid, callback)
113
114
    def call(self, params):
115
        """COROUTINE"""
116
        return self.iserver.method_service.call(params)
117
118
    async def create_subscription(self, params, callback=None):
119
        result = await self.subscription_service.create_subscription(params, callback)
120
        return result
121
122
    async def create_monitored_items(self, params):
123
        """Returns Future"""
124
        subscription_result = await self.subscription_service.create_monitored_items(params)
125
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionCreated,
126
            ServerItemCallback(params, subscription_result))
127
        return subscription_result
128
129
    async def modify_monitored_items(self, params):
130
        subscription_result = self.subscription_service.modify_monitored_items(params)
131
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionModified,
132
            ServerItemCallback(params, subscription_result))
133
        return subscription_result
134
135
    def republish(self, params):
136
        return self.subscription_service.republish(params)
137
138
    async def delete_subscriptions(self, ids):
139
        # This is an async method, dues to symmetry with client code
140
        return await self.subscription_service.delete_subscriptions(ids)
141
142
    async def delete_monitored_items(self, params):
143
        # This is an async method, dues to symmetry with client code
144
        subscription_result = self.subscription_service.delete_monitored_items(params)
145
        self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionDeleted,
146
            ServerItemCallback(params, subscription_result))
147
        return subscription_result
148
149
    def publish(self, acks: Optional[Iterable[ua.SubscriptionAcknowledgement]] = None):
150
        return self.subscription_service.publish(acks or [])
151