Passed
Pull Request — master (#45)
by
unknown
01:58
created

asyncua.server.internal_subscription.MonitoredItemService.deadband_callback()   B

Complexity

Conditions 6

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 10
nop 3
dl 0
loc 10
rs 8.6666
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from typing import Union, Iterable, Dict, List
9
from asyncua import ua
10
from .monitored_item_service import MonitoredItemService
11
12
13
class InternalSubscription:
14
    """
15
16
    """
17
18
    def __init__(self, subservice, data: ua.CreateSubscriptionResult, addressspace, callback=None):
19
        self.logger = logging.getLogger(__name__)
20
        self.aspace = addressspace
21
        self.subservice = subservice
22
        self.data: ua.CreateSubscriptionResult = data
23
        self.pub_result_callback = callback
24
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
25
        self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
26
        self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
27
        self._triggered_statuschanges: list = []
28
        self._notification_seq = 1
29
        self._not_acknowledged_results: Dict[int, ua.PublishResult] = {}
30
        self._startup = True
31
        self._keep_alive_count = 0
32
        self._publish_cycles_count = 0
33
        self._task = None
34
35
    def __str__(self):
36
        return f"Subscription(id:{self.data.SubscriptionId})"
37
38
    async def start(self):
39
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
40
        if self.data.RevisedPublishingInterval > 0.0:
41
            self._task = self.subservice.loop.create_task(self._subscription_loop())
42
43
    async def stop(self):
44
        self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
45
        self._task.cancel()
46
        await self._task
47
        self._task = None
48
        self.monitored_item_srv.delete_all_monitored_items()
49
50
    def _trigger_publish(self):
51
        """
52
        Trigger immediate publication (if requested by the PublishingInterval).
53
        """
54
        if self._task and self.data.RevisedPublishingInterval <= 0.0:
55
            # Publish immediately (as fast as possible)
56
            self.publish_results()
57
58
    async def _subscription_loop(self):
59
        """
60
        Start the publication loop running at the RevisedPublishingInterval.
61
        """
62
        try:
63
            while True:
64
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
65
                self.publish_results()
66
        except asyncio.CancelledError:
67
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
68
            pass
69
        except Exception:
70
            # seems this except is necessary to log errors
71
            self.logger.exception("Exception in subscription loop")
72
73
    def has_published_results(self):
74
        if self._startup or self._triggered_datachanges or self._triggered_events:
75
            return True
76
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
77
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
78
                self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
79
            return True
80
        self._keep_alive_count += 1
81
        return False
82
83
    def publish_results(self):
84
        """
85
        Publish all enqueued data changes, events and status changes though the callback.
86
        """
87
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
88
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
89
                self._publish_cycles_count, self.data.RevisedLifetimeCount)
90
            # FIXME this will never be send since we do not have publish request anyway
91
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
92
        result = None
93
        if self.has_published_results():
94
            self._publish_cycles_count += 1
95
            result = self._pop_publish_result()
96
        if result is not None:
97
            self.logger.info('publish_results for %s', self.data.SubscriptionId)
98
            self.pub_result_callback(result)
99
100
    def _pop_publish_result(self) -> ua.PublishResult:
101
        """
102
        Return a `PublishResult` with all enqueued data changes, events and status changes.
103
        Clear all queues.
104
        """
105
        result = ua.PublishResult()
106
        result.SubscriptionId = self.data.SubscriptionId
107
        self._pop_triggered_datachanges(result)
108
        self._pop_triggered_events(result)
109
        self._pop_triggered_statuschanges(result)
110
        self._keep_alive_count = 0
111
        self._startup = False
112
        result.NotificationMessage.SequenceNumber = self._notification_seq
113
        if result.NotificationMessage.NotificationData:
114
            self._notification_seq += 1
115
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
116
        result.MoreNotifications = False
117
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
118
        return result
119
120
    def _pop_triggered_datachanges(self, result: ua.PublishResult):
121
        """Append all enqueued data changes to the given `PublishResult` and clear the queue."""
122
        if self._triggered_datachanges:
123
            notif = ua.DataChangeNotification()
124
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
125
            self._triggered_datachanges = {}
126
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
127
            result.NotificationMessage.NotificationData.append(notif)
128
129
    def _pop_triggered_events(self, result: ua.PublishResult):
130
        """Append all enqueued events to the given `PublishResult` and clear the queue."""
131
        if self._triggered_events:
132
            notif = ua.EventNotificationList()
133
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
134
            self._triggered_events = {}
135
            result.NotificationMessage.NotificationData.append(notif)
136
            self.logger.debug("sending event notification with %s events", len(notif.Events))
137
138
    def _pop_triggered_statuschanges(self, result: ua.PublishResult):
139
        """Append all enqueued status changes to the given `PublishResult` and clear the queue."""
140
        if self._triggered_statuschanges:
141
            notif = ua.StatusChangeNotification()
142
            notif.Status = self._triggered_statuschanges.pop(0)
143
            result.NotificationMessage.NotificationData.append(notif)
144
            self.logger.debug("sending event notification %s", notif.Status)
145
146
    def publish(self, acks: Iterable[int]):
147
        """
148
        Reset publish cycle count, acknowledge PublishResults.
149
        :param acks: Sequence number of the PublishResults to acknowledge
150
        """
151
        self.logger.info("publish request with acks %s", acks)
152
        self._publish_cycles_count = 0
153
        for nb in acks:
154
            self._not_acknowledged_results.pop(nb, None)
155
156
    def republish(self, nb):
157
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
158
        notification_message = self._not_acknowledged_results.pop(nb, None)
159
        if notification_message:
160
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
161
            return notification_message
162
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
163
        return ua.NotificationMessage()
164
165
    def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize):
166
        """
167
        Enqueue a monitored item data change.
168
        :param mid: Monitored Item Id
169
        :param eventdata: Monitored Item Notification
170
        :param maxsize:
171
        """
172
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
173
174
    def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize):
175
        """
176
        Enqueue a event.
177
        :param mid: Monitored Item Id
178
        :param eventdata: Event Field List
179
        :param maxsize:
180
        """
181
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
182
183
    def enqueue_statuschange(self, code):
184
        """
185
        Enqueue a status change.
186
        :param code:
187
        """
188
        self._triggered_statuschanges.append(code)
189
        self._trigger_publish()
190
191
    def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size,
192
                       queue: dict):
193
        if mid not in queue:
194
            # New Monitored Item Id
195
            queue[mid] = [eventdata]
196
            self._trigger_publish()
197
            return
198
        if size != 0:
199
            if len(queue[mid]) >= size:
200
                queue[mid].pop(0)
201
        queue[mid].append(eventdata)
202