Completed
Push — master ( 6c504f...d79d20 )
by Olivier
04:07 queued 01:36
created

SubscriptionService.modify_subscription()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 11
nop 3
dl 0
loc 13
rs 9.85
c 0
b 0
f 0
1
"""
2
server side implementation of subscription service
3
"""
4
5
import asyncio
6
import logging
7
from typing import Dict, Iterable
8
9
from asyncua import ua
10
from asyncua.common import utils
11
from .address_space import AddressSpace
12
from .internal_subscription import InternalSubscription
13
14
15
class SubscriptionService:
16
    """
17
    Manages subscriptions on the server side.
18
    There is one `SubscriptionService` instance for every `Server`/`InternalServer`.
19
    """
20
21
    def __init__(self, loop: asyncio.AbstractEventLoop, aspace: AddressSpace):
22
        self.logger = logging.getLogger(__name__)
23
        self.loop: asyncio.AbstractEventLoop = loop
24
        self.aspace: AddressSpace = aspace
25
        self.subscriptions: Dict[int, InternalSubscription] = {}
26
        self._sub_id_counter = 77
27
28
    @property
29
    def active_subscription_ids(self):
30
        return self.subscriptions.keys()
31
32
    async def create_subscription(self, params, callback=None, external=False):
33
        self.logger.info("create subscription")
34
        result = ua.CreateSubscriptionResult()
35
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
36
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
37
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
38
        self._sub_id_counter += 1
39
        result.SubscriptionId = self._sub_id_counter
40
        internal_sub = InternalSubscription(self.loop, result, self.aspace, callback=callback, no_acks=not external)
41
        await internal_sub.start()
42
        self.subscriptions[result.SubscriptionId] = internal_sub
43
        return result
44
45
    def modify_subscription(self, params, callback):
46
        # Requested params are ignored, result = params set during create_subscription.
47
        self.logger.info("modify subscription with callback: %s", callback)
48
        result = ua.ModifySubscriptionResult()
49
        try:
50
            sub = self.subscriptions[params.SubscriptionId]
51
            result.RevisedPublishingInterval = sub.data.RevisedPublishingInterval
52
            result.RevisedLifetimeCount = sub.data.RevisedLifetimeCount
53
            result.RevisedMaxKeepAliveCount = sub.data.RevisedMaxKeepAliveCount
54
55
            return result
56
        except KeyError:
57
            raise utils.ServiceError(ua.StatusCodes.BadSubscriptionIdInvalid)
58
59
    async def delete_subscriptions(self, ids):
60
        self.logger.info("delete subscriptions: %s", ids)
61
        res = []
62
        existing_subs = []
63
        for i in ids:
64
            sub = self.subscriptions.pop(i, None)
65
            if sub is None:
66
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
67
            else:
68
                existing_subs.append(sub)
69
                res.append(ua.StatusCode())
70
        await asyncio.gather(*[sub.stop() for sub in existing_subs])
71
        return res
72
73
    def publish(self, acks: Iterable[ua.SubscriptionAcknowledgement]):
74
        self.logger.info("publish request with acks %s", acks)
75
        for subid, sub in self.subscriptions.items():
76
            sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
77
78
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
79
        self.logger.info("create monitored items")
80
        if params.SubscriptionId not in self.subscriptions:
81
            res = []
82
            for _ in params.ItemsToCreate:
83
                response = ua.MonitoredItemCreateResult()
84
                response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
85
                res.append(response)
86
            return res
87
        return await self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
88
89
    def modify_monitored_items(self, params):
90
        self.logger.info("modify monitored items")
91
        if params.SubscriptionId not in self.subscriptions:
92
            res = []
93
            for _ in params.ItemsToModify:
94
                result = ua.MonitoredItemModifyResult()
95
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
96
                res.append(result)
97
            return res
98
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
99
100
    def delete_monitored_items(self, params):
101
        self.logger.info("delete monitored items")
102
        if params.SubscriptionId not in self.subscriptions:
103
            res = []
104
            for _ in params.MonitoredItemIds:
105
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
106
            return res
107
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
108
            params.MonitoredItemIds)
109
110
    def republish(self, params):
111
        if params.SubscriptionId not in self.subscriptions:
112
            # TODO: what should I do?
113
            return ua.NotificationMessage()
114
        return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
115
116
    async def trigger_event(self, event):
117
        for sub in self.subscriptions.values():
118
            await sub.monitored_item_srv.trigger_event(event)
119