Test Failed
Pull Request — master (#112)
by
unknown
02:26
created

SubscriptionService.condition_refresh2()   A

Complexity

Conditions 5

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 11
nop 4
dl 0
loc 11
rs 9.3333
c 0
b 0
f 0
1
"""
2
server side implementation of subscription service
3
"""
4
5
import asyncio
6
import logging
7
from typing import Dict, Iterable
8
9
from asyncua import ua, uamethod
10
from .address_space import AddressSpace
11
from .internal_subscription import InternalSubscription
12
13
14
class SubscriptionService:
15
    """
16
    Manages subscriptions on the server side.
17
    There is one `SubscriptionService` instance for every `Server`/`InternalServer`.
18
    """
19
20
    def __init__(self, loop: asyncio.AbstractEventLoop, aspace: AddressSpace):
21
        self.logger = logging.getLogger(__name__)
22
        self.loop: asyncio.AbstractEventLoop = loop
23
        self.aspace: AddressSpace = aspace
24
        self.subscriptions: Dict[int, InternalSubscription] = {}
25
        self._sub_id_counter = 77
26
        self.standard_events = {}
27
28
    @property
29
    def active_subscription_ids(self):
30
        return self.subscriptions.keys()
31
32
    async def create_subscription(self, params, callback=None, external=False):
33
        self.logger.info("create subscription")
34
        result = ua.CreateSubscriptionResult()
35
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
36
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
37
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
38
        self._sub_id_counter += 1
39
        result.SubscriptionId = self._sub_id_counter
40
        internal_sub = InternalSubscription(self.loop, result, self.aspace, callback=callback, no_acks=not external)
41
        await internal_sub.start()
42
        self.subscriptions[result.SubscriptionId] = internal_sub
43
        return result
44
45
    async def delete_subscriptions(self, ids):
46
        self.logger.info("delete subscriptions: %s", ids)
47
        res = []
48
        existing_subs = []
49
        for i in ids:
50
            sub = self.subscriptions.pop(i, None)
51
            if sub is None:
52
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
53
            else:
54
                existing_subs.append(sub)
55
                res.append(ua.StatusCode())
56
        await asyncio.gather(*[sub.stop() for sub in existing_subs])
57
        return res
58
59
    def publish(self, acks: Iterable[ua.SubscriptionAcknowledgement]):
60
        self.logger.info("publish request with acks %s", acks)
61
        for subid, sub in self.subscriptions.items():
62
            sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
63
64
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
65
        self.logger.info("create monitored items")
66
        if params.SubscriptionId not in self.subscriptions:
67
            res = []
68
            for _ in params.ItemsToCreate:
69
                response = ua.MonitoredItemCreateResult()
70
                response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
71
                res.append(response)
72
            return res
73
        return await self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
74
75
    def modify_monitored_items(self, params):
76
        self.logger.info("modify monitored items")
77
        if params.SubscriptionId not in self.subscriptions:
78
            res = []
79
            for _ in params.ItemsToModify:
80
                result = ua.MonitoredItemModifyResult()
81
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
82
                res.append(result)
83
            return res
84
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
85
86
    def delete_monitored_items(self, params):
87
        self.logger.info("delete monitored items")
88
        if params.SubscriptionId not in self.subscriptions:
89
            res = []
90
            for _ in params.MonitoredItemIds:
91
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
92
            return res
93
        return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
94
            params.MonitoredItemIds)
95
96
    def republish(self, params):
97
        if params.SubscriptionId not in self.subscriptions:
98
            # TODO: what should I do?
99
            return ua.NotificationMessage()
100
        return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
101
102
    async def trigger_event(self, event):
103
        for sub in self.subscriptions.values():
104
            await sub.monitored_item_srv.trigger_event(event)
105
106
    @uamethod
107
    def condition_refresh(self, parent, sub_id):
108
        if sub_id not in self.subscriptions:
109
            return ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
110
        if ua.ObjectIds.RefreshStartEventType in self.standard_events:
111
            self.standard_events[ua.ObjectIds.RefreshStartEventType].trigger()
112
        self.subscriptions[sub_id].monitored_item_srv.condition_refresh()
113
        if ua.ObjectIds.RefreshEndEventType in self.standard_events:
114
            self.standard_events[ua.ObjectIds.RefreshEndEventType].trigger()
115
116
    @uamethod
117
    def condition_refresh2(self, parent, sub_id, mid):
118
        if sub_id not in self.subscriptions:
119
            return ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
120
        if mid not in self.subscriptions[sub_id].monitored_item_srv._monitored_items:
121
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
122
        if ua.ObjectIds.RefreshStartEventType in self.standard_events:
123
            self.standard_events[ua.ObjectIds.RefreshStartEventType].trigger()
124
        self.subscriptions[sub_id].monitored_item_srv.condition_refresh2(mid)
125
        if ua.ObjectIds.RefreshEndEventType in self.standard_events:
126
            self.standard_events[ua.ObjectIds.RefreshEndEventType].trigger()
127