Passed
Pull Request — master (#45)
by
unknown
02:35
created

asyncua.server.internal_subscription.MonitoredItemService.datachange_callback()   A

Complexity

Conditions 4

Size

Total Lines 21
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 18
nop 4
dl 0
loc 21
rs 9.5
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A asyncua.server.internal_subscription.InternalSubscription._enqueue_event() 0 11 4
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from typing import Union, Iterable, Dict, List
9
from asyncua import ua
10
from .monitored_item_service import MonitoredItemService
11
12
13
class InternalSubscription:
14
    """
15
16
    """
17
18
    def __init__(self, subservice, data: ua.CreateSubscriptionResult, addressspace, callback=None):
19
        self.logger = logging.getLogger(__name__)
20
        self.aspace = addressspace
21
        self.subservice = subservice
22
        self.data: ua.CreateSubscriptionResult = data
23
        self.pub_result_callback = callback
24
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
25
        self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
26
        self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
27
        self._triggered_statuschanges: list = []
28
        self._notification_seq = 1
29
        self._not_acknowledged_results: Dict[int, ua.PublishResult] = {}
30
        self._startup = True
31
        self._keep_alive_count = 0
32
        self._publish_cycles_count = 0
33
        self._task = None
34
35
    def __str__(self):
36
        return f"Subscription(id:{self.data.SubscriptionId})"
37
38
    async def start(self):
39
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
40
        if self.data.RevisedPublishingInterval > 0.0:
41
            self._task = self.subservice.loop.create_task(self._subscription_loop())
42
43
    async def stop(self):
44
        self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
45
        self._task.cancel()
46
        await self._task
47
        self._task = None
48
        self.monitored_item_srv.delete_all_monitored_items()
49
50
    def _trigger_publish(self):
51
        if self._task and self.data.RevisedPublishingInterval <= 0.0:  # ToDo: check against Spec.
52
            self.publish_results()
53
54
    async def _subscription_loop(self):
55
        """Publication cycle."""
56
        try:
57
            while True:
58
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
59
                self.publish_results()
60
        except asyncio.CancelledError:
61
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
62
            pass
63
        except Exception:
64
            # seems this except is necessary to log errors
65
            self.logger.exception("Exception in subscription loop")
66
67
    def has_published_results(self):
68
        if self._startup or self._triggered_datachanges or self._triggered_events:
69
            return True
70
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
71
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
72
                self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
73
            return True
74
        self._keep_alive_count += 1
75
        return False
76
77
    def publish_results(self):
78
        """
79
        Publish all enqueued data changes, events and status changes though the callback.
80
        """
81
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
82
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
83
                self._publish_cycles_count, self.data.RevisedLifetimeCount)
84
            # FIXME this will never be send since we do not have publish request anyway
85
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
86
        result = None
87
        if self.has_published_results():
88
            self._publish_cycles_count += 1
89
            result = self._pop_publish_result()
90
        if result is not None:
91
            self.logger.info('publish_results for %s', self.data.SubscriptionId)
92
            self.pub_result_callback(result)
93
94
    def _pop_publish_result(self) -> ua.PublishResult:
95
        """
96
        Return a `PublishResult` with all enqueued data changes, events and status changes.
97
        Clear all queues.
98
        """
99
        result = ua.PublishResult()
100
        result.SubscriptionId = self.data.SubscriptionId
101
        self._pop_triggered_datachanges(result)
102
        self._pop_triggered_events(result)
103
        self._pop_triggered_statuschanges(result)
104
        self._keep_alive_count = 0
105
        self._startup = False
106
        result.NotificationMessage.SequenceNumber = self._notification_seq
107
        if result.NotificationMessage.NotificationData:
108
            self._notification_seq += 1
109
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
110
        result.MoreNotifications = False
111
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
112
        return result
113
114
    def _pop_triggered_datachanges(self, result: ua.PublishResult):
115
        """Append all enqueued data changes to the given `PublishResult` and clear the queue."""
116
        if self._triggered_datachanges:
117
            notif = ua.DataChangeNotification()
118
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
119
            self._triggered_datachanges = {}
120
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
121
            result.NotificationMessage.NotificationData.append(notif)
122
123
    def _pop_triggered_events(self, result: ua.PublishResult):
124
        """Append all enqueued events to the given `PublishResult` and clear the queue."""
125
        if self._triggered_events:
126
            notif = ua.EventNotificationList()
127
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
128
            self._triggered_events = {}
129
            result.NotificationMessage.NotificationData.append(notif)
130
            self.logger.debug("sending event notification with %s events", len(notif.Events))
131
132
    def _pop_triggered_statuschanges(self, result: ua.PublishResult):
133
        """Append all enqueued status changes to the given `PublishResult` and clear the queue."""
134
        if self._triggered_statuschanges:
135
            notif = ua.StatusChangeNotification()
136
            notif.Status = self._triggered_statuschanges.pop(0)
137
            result.NotificationMessage.NotificationData.append(notif)
138
            self.logger.debug("sending event notification %s", notif.Status)
139
140
    def publish(self, acks: Iterable[int]):
141
        """
142
        Reset publish cycle count, acknowledge PublishResults.
143
        :param acks: Sequence number of the PublishResults to acknowledge
144
        """
145
        self.logger.info("publish request with acks %s", acks)
146
        self._publish_cycles_count = 0
147
        for nb in acks:
148
            self._not_acknowledged_results.pop(nb, None)
149
150
    def republish(self, nb):
151
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
152
        notification_message = self._not_acknowledged_results.pop(nb, None)
153
        if notification_message:
154
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
155
            return notification_message
156
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
157
        return ua.NotificationMessage()
158
159
    def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize):
160
        """
161
        Enqueue a monitored item data change.
162
        :param mid: Monitored Item Id
163
        :param eventdata: Monitored Item Notification
164
        :param maxsize:
165
        """
166
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
167
168
    def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize):
169
        """
170
        Enqueue a event.
171
        :param mid: Monitored Item Id
172
        :param eventdata: Event Field List
173
        :param maxsize:
174
        """
175
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
176
177
    def enqueue_statuschange(self, code):
178
        """
179
        Enqueue a status change.
180
        :param code:
181
        """
182
        self._triggered_statuschanges.append(code)
183
        self._trigger_publish()
184
185
    def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size,
186
                       queue: dict):
187
        if mid not in queue:
188
            # New Monitored Item Id
189
            queue[mid] = [eventdata]
190
            self._trigger_publish()
191
            return
192
        if size != 0:
193
            if len(queue[mid]) >= size:
194
                queue[mid].pop(0)
195
        queue[mid].append(eventdata)
196