Passed
Push — master ( 3fb232...b005e0 )
by Olivier
02:21
created

InternalSubscription.has_published_results()   A

Complexity

Conditions 5

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 9
nop 1
dl 0
loc 9
rs 9.3333
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from typing import Union, Iterable, Dict, List
9
from asyncua import ua
10
from .monitored_item_service import MonitoredItemService
11
from .address_space import AddressSpace
12
13
14
class InternalSubscription:
15
    """
16
17
    """
18
19
    def __init__(self, loop: asyncio.AbstractEventLoop, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
20
                 callback=None):
21
        self.logger = logging.getLogger(__name__)
22
        self.loop: asyncio.AbstractEventLoop = loop
23
        self.data: ua.CreateSubscriptionResult = data
24
        self.pub_result_callback = callback
25
        self.monitored_item_srv = MonitoredItemService(self, aspace)
26
        self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
27
        self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
28
        self._triggered_statuschanges: list = []
29
        self._notification_seq = 1
30
        self._not_acknowledged_results: Dict[int, ua.PublishResult] = {}
31
        self._startup = True
32
        self._keep_alive_count = 0
33
        self._publish_cycles_count = 0
34
        self._task = None
35
36
    def __str__(self):
37
        return f"Subscription(id:{self.data.SubscriptionId})"
38
39
    async def start(self):
40
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
41
        if self.data.RevisedPublishingInterval > 0.0:
42
            self._task = self.loop.create_task(self._subscription_loop())
43
44
    async def stop(self):
45
        self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
46
        self._task.cancel()
47
        await self._task
48
        self._task = None
49
        self.monitored_item_srv.delete_all_monitored_items()
50
51
    def _trigger_publish(self):
52
        """
53
        Trigger immediate publication (if requested by the PublishingInterval).
54
        """
55
        if self._task and self.data.RevisedPublishingInterval <= 0.0:
56
            # Publish immediately (as fast as possible)
57
            self.publish_results()
58
59
    async def _subscription_loop(self):
60
        """
61
        Start the publication loop running at the RevisedPublishingInterval.
62
        """
63
        try:
64
            while True:
65
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
66
                self.publish_results()
67
        except asyncio.CancelledError:
68
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
69
            pass
70
        except Exception:
71
            # seems this except is necessary to log errors
72
            self.logger.exception("Exception in subscription loop")
73
74
    def has_published_results(self):
75
        if self._startup or self._triggered_datachanges or self._triggered_events:
76
            return True
77
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
78
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
79
                self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
80
            return True
81
        self._keep_alive_count += 1
82
        return False
83
84
    def publish_results(self):
85
        """
86
        Publish all enqueued data changes, events and status changes though the callback.
87
        """
88
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
89
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
90
                self._publish_cycles_count, self.data.RevisedLifetimeCount)
91
            # FIXME this will never be send since we do not have publish request anyway
92
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
93
        result = None
94
        if self.has_published_results():
95
            self._publish_cycles_count += 1
96
            result = self._pop_publish_result()
97
        if result is not None:
98
            self.logger.info('publish_results for %s', self.data.SubscriptionId)
99
            self.pub_result_callback(result)
100
101
    def _pop_publish_result(self) -> ua.PublishResult:
102
        """
103
        Return a `PublishResult` with all enqueued data changes, events and status changes.
104
        Clear all queues.
105
        """
106
        result = ua.PublishResult()
107
        result.SubscriptionId = self.data.SubscriptionId
108
        self._pop_triggered_datachanges(result)
109
        self._pop_triggered_events(result)
110
        self._pop_triggered_statuschanges(result)
111
        self._keep_alive_count = 0
112
        self._startup = False
113
        result.NotificationMessage.SequenceNumber = self._notification_seq
114
        if result.NotificationMessage.NotificationData:
115
            self._notification_seq += 1
116
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
117
        result.MoreNotifications = False
118
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
119
        return result
120
121
    def _pop_triggered_datachanges(self, result: ua.PublishResult):
122
        """Append all enqueued data changes to the given `PublishResult` and clear the queue."""
123
        if self._triggered_datachanges:
124
            notif = ua.DataChangeNotification()
125
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
126
            self._triggered_datachanges = {}
127
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
128
            result.NotificationMessage.NotificationData.append(notif)
129
130
    def _pop_triggered_events(self, result: ua.PublishResult):
131
        """Append all enqueued events to the given `PublishResult` and clear the queue."""
132
        if self._triggered_events:
133
            notif = ua.EventNotificationList()
134
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
135
            self._triggered_events = {}
136
            result.NotificationMessage.NotificationData.append(notif)
137
            self.logger.debug("sending event notification with %s events", len(notif.Events))
138
139
    def _pop_triggered_statuschanges(self, result: ua.PublishResult):
140
        """Append all enqueued status changes to the given `PublishResult` and clear the queue."""
141
        if self._triggered_statuschanges:
142
            notif = ua.StatusChangeNotification()
143
            notif.Status = self._triggered_statuschanges.pop(0)
144
            result.NotificationMessage.NotificationData.append(notif)
145
            self.logger.debug("sending event notification %s", notif.Status)
146
147
    def publish(self, acks: Iterable[int]):
148
        """
149
        Reset publish cycle count, acknowledge PublishResults.
150
        :param acks: Sequence number of the PublishResults to acknowledge
151
        """
152
        self.logger.info("publish request with acks %s", acks)
153
        self._publish_cycles_count = 0
154
        for nb in acks:
155
            self._not_acknowledged_results.pop(nb, None)
156
157
    def republish(self, nb):
158
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
159
        notification_message = self._not_acknowledged_results.pop(nb, None)
160
        if notification_message:
161
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
162
            return notification_message
163
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
164
        return ua.NotificationMessage()
165
166
    def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
167
        """
168
        Enqueue a monitored item data change.
169
        :param mid: Monitored Item Id
170
        :param eventdata: Monitored Item Notification
171
        :param maxsize: Max queue size (0: No limit)
172
        """
173
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
174
175
    def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
176
        """
177
        Enqueue a event.
178
        :param mid: Monitored Item Id
179
        :param eventdata: Event Field List
180
        :param maxsize: Max queue size (0: No limit)
181
        """
182
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
183
184
    def enqueue_statuschange(self, code):
185
        """
186
        Enqueue a status change.
187
        :param code:
188
        """
189
        self._triggered_statuschanges.append(code)
190
        self._trigger_publish()
191
192
    def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
193
                       queue: dict):
194
        if mid not in queue:
195
            # New Monitored Item Id
196
            queue[mid] = [eventdata]
197
            self._trigger_publish()
198
            return
199
        if size != 0:
200
            # Limit queue size
201
            if len(queue[mid]) >= size:
202
                queue[mid].pop(0)
203
        queue[mid].append(eventdata)
204