Passed
Pull Request — master (#365)
by
unknown
02:53
created

SubscriptionService.condition_refresh2()   A

Complexity

Conditions 5

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 11
nop 4
dl 0
loc 11
rs 9.3333
c 0
b 0
f 0
1
"""
2
server side implementation of subscription service
3
"""
4
5
import asyncio
6
import logging
7
from typing import Dict, Iterable
8
9
from asyncua import ua, uamethod
10
from asyncua.common import utils
11
from .address_space import AddressSpace
12
from .internal_subscription import InternalSubscription
13
14
15
class SubscriptionService:
16
    """
17
    Manages subscriptions on the server side.
18
    There is one `SubscriptionService` instance for every `Server`/`InternalServer`.
19
    """
20
21
    def __init__(self, loop: asyncio.AbstractEventLoop, aspace: AddressSpace):
22
        self.logger = logging.getLogger(__name__)
23
        self.loop: asyncio.AbstractEventLoop = loop
24
        self.aspace: AddressSpace = aspace
25
        self.subscriptions: Dict[int, InternalSubscription] = {}
26
        self._sub_id_counter = 77
27
        self.standard_events = {}
28
29
    @property
30
    def active_subscription_ids(self):
31
        return self.subscriptions.keys()
32
33
    async def create_subscription(self, params, callback=None, external=False):
34
        self.logger.info("create subscription")
35
        result = ua.CreateSubscriptionResult()
36
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
37
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
38
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
39
        self._sub_id_counter += 1
40
        result.SubscriptionId = self._sub_id_counter
41
        internal_sub = InternalSubscription(self.loop, result, self.aspace, callback=callback, no_acks=not external)
42
        await internal_sub.start()
43
        self.subscriptions[result.SubscriptionId] = internal_sub
44
        return result
45
46
    def modify_subscription(self, params, callback):
47
        # Requested params are ignored, result = params set during create_subscription.
48
        self.logger.info("modify subscription with callback: %s", callback)
49
        result = ua.ModifySubscriptionResult()
50
        try:
51
            sub = self.subscriptions[params.SubscriptionId]
52
            result.RevisedPublishingInterval = sub.data.RevisedPublishingInterval
53
            result.RevisedLifetimeCount = sub.data.RevisedLifetimeCount
54
            result.RevisedMaxKeepAliveCount = sub.data.RevisedMaxKeepAliveCount
55
56
            return result
57
        except KeyError:
58
            raise utils.ServiceError(ua.StatusCodes.BadSubscriptionIdInvalid)
59
60
    async def delete_subscriptions(self, ids):
61
        self.logger.info("delete subscriptions: %s", ids)
62
        res = []
63
        existing_subs = []
64
        for i in ids:
65
            sub = self.subscriptions.pop(i, None)
66
            if sub is None:
67
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
68
            else:
69
                existing_subs.append(sub)
70
                res.append(ua.StatusCode())
71
        await asyncio.gather(*[sub.stop() for sub in existing_subs])
72
        return res
73
74
    def publish(self, acks: Iterable[ua.SubscriptionAcknowledgement]):
75
        self.logger.info("publish request with acks %s", acks)
76
        for subid, sub in self.subscriptions.items():
77
            sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
78
79
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
80
        self.logger.info("create monitored items")
81
        if params.SubscriptionId not in self.subscriptions:
82
            res = []
83
            for _ in params.ItemsToCreate:
84
                response = ua.MonitoredItemCreateResult()
85
                response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
86
                res.append(response)
87
            return res
88
        return await self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
89
90
    def modify_monitored_items(self, params):
91
        self.logger.info("modify monitored items")
92
        if params.SubscriptionId not in self.subscriptions:
93
            res = []
94
            for _ in params.ItemsToModify:
95
                result = ua.MonitoredItemModifyResult()
96
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
97
                res.append(result)
98
            return res
99
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
100
101
    def delete_monitored_items(self, params):
102
        self.logger.info("delete monitored items")
103
        if params.SubscriptionId not in self.subscriptions:
104
            res = []
105
            for _ in params.MonitoredItemIds:
106
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
107
            return res
108
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
109
            params.MonitoredItemIds)
110
111
    def republish(self, params):
112
        if params.SubscriptionId not in self.subscriptions:
113
            # TODO: what should I do?
114
            return ua.NotificationMessage()
115
        return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
116
117
    async def trigger_event(self, event):
118
        for sub in self.subscriptions.values():
119
            await sub.monitored_item_srv.trigger_event(event)
120
121
    @uamethod
122
    async def condition_refresh(self, parent, sub_id):
123
        if sub_id not in self.subscriptions:
124
            return ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
125
        if ua.ObjectIds.RefreshStartEventType in self.standard_events:
126
            self.standard_events[ua.ObjectIds.RefreshStartEventType].trigger()
127
        await self.subscriptions[sub_id].monitored_item_srv.condition_refresh()
128
        if ua.ObjectIds.RefreshEndEventType in self.standard_events:
129
            self.standard_events[ua.ObjectIds.RefreshEndEventType].trigger()
130
131
    @uamethod
132
    def condition_refresh2(self, parent, sub_id, mid):
133
        if sub_id not in self.subscriptions:
134
            return ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
135
        if mid not in self.subscriptions[sub_id].monitored_item_srv._monitored_items:
136
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
137
        if ua.ObjectIds.RefreshStartEventType in self.standard_events:
138
            self.standard_events[ua.ObjectIds.RefreshStartEventType].trigger()
139
        self.subscriptions[sub_id].monitored_item_srv.condition_refresh2(mid)
140
        if ua.ObjectIds.RefreshEndEventType in self.standard_events:
141
            self.standard_events[ua.ObjectIds.RefreshEndEventType].trigger()
142