Passed
Push — master ( 3fb232...b005e0 )
by Olivier
02:21
created

asyncua.server.subscription_service   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 104
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 82
dl 0
loc 104
rs 10
c 0
b 0
f 0
wmc 21

10 Methods

Rating   Name   Duplication   Size   Complexity  
A SubscriptionService.modify_monitored_items() 0 10 3
A SubscriptionService.delete_subscriptions() 0 13 3
A SubscriptionService.delete_monitored_items() 0 9 3
A SubscriptionService.publish() 0 4 2
A SubscriptionService.republish() 0 5 2
A SubscriptionService.active_subscription_ids() 0 3 1
A SubscriptionService.create_subscription() 0 12 1
A SubscriptionService.trigger_event() 0 3 2
A SubscriptionService.create_monitored_items() 0 10 3
A SubscriptionService.__init__() 0 6 1
1
"""
2
server side implementation of subscription service
3
"""
4
5
import asyncio
6
import logging
7
from typing import Dict, Iterable
8
9
from asyncua import ua
10
from .address_space import AddressSpace
11
from .internal_subscription import InternalSubscription
12
13
14
class SubscriptionService:
15
    """
16
    Manages subscriptions on the server side.
17
    There is one `SubscriptionService` instance for every `Server`/`InternalServer`.
18
    """
19
20
    def __init__(self, loop: asyncio.AbstractEventLoop, aspace: AddressSpace):
21
        self.logger = logging.getLogger(__name__)
22
        self.loop: asyncio.AbstractEventLoop = loop
23
        self.aspace: AddressSpace = aspace
24
        self.subscriptions: Dict[int, InternalSubscription] = {}
25
        self._sub_id_counter = 77
26
27
    @property
28
    def active_subscription_ids(self):
29
        return self.subscriptions.keys()
30
31
    async def create_subscription(self, params, callback=None):
32
        self.logger.info("create subscription")
33
        result = ua.CreateSubscriptionResult()
34
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
35
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
36
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
37
        self._sub_id_counter += 1
38
        result.SubscriptionId = self._sub_id_counter
39
        internal_sub = InternalSubscription(self.loop, result, self.aspace, callback)
40
        await internal_sub.start()
41
        self.subscriptions[result.SubscriptionId] = internal_sub
42
        return result
43
44
    async def delete_subscriptions(self, ids):
45
        self.logger.info("delete subscriptions: %s", ids)
46
        res = []
47
        existing_subs = []
48
        for i in ids:
49
            sub = self.subscriptions.pop(i, None)
50
            if sub is None:
51
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
52
            else:
53
                existing_subs.append(sub)
54
                res.append(ua.StatusCode())
55
        await asyncio.gather(*[sub.stop() for sub in existing_subs])
56
        return res
57
58
    def publish(self, acks: Iterable[ua.SubscriptionAcknowledgement]):
59
        self.logger.info("publish request with acks %s", acks)
60
        for subid, sub in self.subscriptions.items():
61
            sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
62
63
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
64
        self.logger.info("create monitored items")
65
        if params.SubscriptionId not in self.subscriptions:
66
            res = []
67
            for _ in params.ItemsToCreate:
68
                response = ua.MonitoredItemCreateResult()
69
                response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
70
                res.append(response)
71
            return res
72
        return await self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
73
74
    def modify_monitored_items(self, params):
75
        self.logger.info("modify monitored items")
76
        if params.SubscriptionId not in self.subscriptions:
77
            res = []
78
            for _ in params.ItemsToModify:
79
                result = ua.MonitoredItemModifyResult()
80
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
81
                res.append(result)
82
            return res
83
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
84
85
    def delete_monitored_items(self, params):
86
        self.logger.info("delete monitored items")
87
        if params.SubscriptionId not in self.subscriptions:
88
            res = []
89
            for _ in params.MonitoredItemIds:
90
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
91
            return res
92
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
93
            params.MonitoredItemIds)
94
95
    def republish(self, params):
96
        if params.SubscriptionId not in self.subscriptions:
97
            # TODO: what should I do?
98
            return ua.NotificationMessage()
99
        return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
100
101
    def trigger_event(self, event):
102
        for sub in self.subscriptions.values():
103
            sub.monitored_item_srv.trigger_event(event)
104