Passed
Pull Request — master (#45)
by
unknown
01:58
created

SubscriptionService.active_subscription_ids()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
server side implementation of subscription service
3
"""
4
5
import asyncio
6
import logging
7
from typing import Dict, Iterable
8
9
from asyncua import ua
10
from .internal_subscription import InternalSubscription
11
12
13
class SubscriptionService:
14
    """
15
    Manages subscriptions on the server side.
16
    There is one `SubscriptionService` instance for every `Server`/`InternalServer`.
17
    """
18
19
    def __init__(self, loop, aspace):
20
        self.logger = logging.getLogger(__name__)
21
        self.loop = loop
22
        self.aspace = aspace
23
        self.subscriptions: Dict[int, InternalSubscription] = {}
24
        self._sub_id_counter = 77
25
26
    @property
27
    def active_subscription_ids(self):
28
        return self.subscriptions.keys()
29
30
    async def create_subscription(self, params, callback=None):
31
        self.logger.info("create subscription")
32
        result = ua.CreateSubscriptionResult()
33
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
34
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
35
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
36
        self._sub_id_counter += 1
37
        result.SubscriptionId = self._sub_id_counter
38
        internal_sub = InternalSubscription(self, result, self.aspace, callback)
39
        await internal_sub.start()
40
        self.subscriptions[result.SubscriptionId] = internal_sub
41
        return result
42
43
    async def delete_subscriptions(self, ids):
44
        self.logger.info("delete subscriptions: %s", ids)
45
        res = []
46
        existing_subs = []
47
        for i in ids:
48
            sub = self.subscriptions.pop(i, None)
49
            if sub is None:
50
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
51
            else:
52
                existing_subs.append(sub)
53
                res.append(ua.StatusCode())
54
        await asyncio.gather(*[sub.stop() for sub in existing_subs])
55
        return res
56
57
    def publish(self, acks: Iterable[ua.SubscriptionAcknowledgement]):
58
        self.logger.info("publish request with acks %s", acks)
59
        for subid, sub in self.subscriptions.items():
60
            sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
61
62
    async def create_monitored_items(self, params):
63
        self.logger.info("create monitored items")
64
        if params.SubscriptionId not in self.subscriptions:
65
            res = []
66
            for _ in params.ItemsToCreate:
67
                response = ua.MonitoredItemCreateResult()
68
                response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
69
                res.append(response)
70
            return res
71
        return await self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
72
73
    def modify_monitored_items(self, params):
74
        self.logger.info("modify monitored items")
75
        if params.SubscriptionId not in self.subscriptions:
76
            res = []
77
            for _ in params.ItemsToModify:
78
                result = ua.MonitoredItemModifyResult()
79
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
80
                res.append(result)
81
            return res
82
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
83
84
    def delete_monitored_items(self, params):
85
        self.logger.info("delete monitored items")
86
        if params.SubscriptionId not in self.subscriptions:
87
            res = []
88
            for _ in params.MonitoredItemIds:
89
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
90
            return res
91
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
92
            params.MonitoredItemIds)
93
94
    def republish(self, params):
95
        if params.SubscriptionId not in self.subscriptions:
96
            # TODO: what should I do?
97
            return ua.NotificationMessage()
98
        return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
99
100
    def trigger_event(self, event):
101
        for sub in self.subscriptions.values():
102
            sub.monitored_item_srv.trigger_event(event)
103