Passed
Pull Request — master (#118)
by
unknown
02:21
created

InternalSubscription.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from typing import Union, Iterable, Dict, List
9
from asyncua import ua
10
from .monitored_item_service import MonitoredItemService
11
from .address_space import AddressSpace
12
13
14
class InternalSubscription:
15
    """
16
    Server internal subscription.
17
    Runs the publication loop and stores the Publication Results until they are acknowledged.
18
    """
19
20
    def __init__(self, loop: asyncio.AbstractEventLoop, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
21
                 callback=None, no_acks=False):
22
        """
23
        :param loop: Event loop instance
24
        :param data: Create Subscription Result
25
        :param aspace: Server Address Space
26
        :param callback: Callback for publishing
27
        :param no_acks: If true no acknowledging will be expected (for server internal subscriptions)
28
        """
29
        self.logger = logging.getLogger(__name__)
30
        self.loop: asyncio.AbstractEventLoop = loop
31
        self.data: ua.CreateSubscriptionResult = data
32
        self.pub_result_callback = callback
33
        self.monitored_item_srv = MonitoredItemService(self, aspace)
34
        self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
35
        self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
36
        self._triggered_statuschanges: list = []
37
        self._notification_seq = 1
38
        self._not_acknowledged_results: Dict[int, ua.PublishResult] = {}
39
        self._startup = True
40
        self._keep_alive_count = 0
41
        self._publish_cycles_count = 0
42
        self._task = None
43
        self.no_acks = no_acks
44
45
    def __str__(self):
46
        return f"Subscription(id:{self.data.SubscriptionId})"
47
48
    async def start(self):
49
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
50
        if self.data.RevisedPublishingInterval > 0.0:
51
            self._task = self.loop.create_task(self._subscription_loop())
52
53
    async def stop(self):
54
        if self._task:
55
            self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
56
            self._task.cancel()
57
            await self._task
58
            self._task = None
59
        else:
60
            self.logger.debug(f"internal subscription has no task to stop")
61
        self.monitored_item_srv.delete_all_monitored_items()
62
63
    def _trigger_publish(self):
64
        """
65
        Trigger immediate publication (if requested by the PublishingInterval).
66
        """
67
        if not self._task and self.data.RevisedPublishingInterval <= 0.0:
68
            # Publish immediately (as fast as possible)
69
            self.publish_results()
70
71
    async def _subscription_loop(self):
72
        """
73
        Start the publication loop running at the RevisedPublishingInterval.
74
        """
75
        try:
76
            while True:
77
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
78
                self.publish_results()
79
        except asyncio.CancelledError:
80
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
81
            pass
82
        except Exception:
83
            # seems this except is necessary to log errors
84
            self.logger.exception("Exception in subscription loop")
85
86
    def has_published_results(self):
87
        if self._startup or self._triggered_datachanges or self._triggered_events:
88
            return True
89
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
90
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
91
                self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
92
            return True
93
        self._keep_alive_count += 1
94
        return False
95
96
    def publish_results(self):
97
        """
98
        Publish all enqueued data changes, events and status changes though the callback.
99
        """
100
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
101
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
102
                self._publish_cycles_count, self.data.RevisedLifetimeCount)
103
            # FIXME this will never be send since we do not have publish request anyway
104
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
105
        result = None
106
        if self.has_published_results():
107
            if not self.no_acks:
108
                self._publish_cycles_count += 1
109
            result = self._pop_publish_result()
110
        if result is not None:
111
            #self.logger.info('publish_results for %s', self.data.SubscriptionId)
112
            # The callback can be:
113
            #    Subscription.publish_callback -> server internal subscription
114
            #    UaProcessor.forward_publish_response -> client subscription
115
            self.pub_result_callback(result)
116
117
    def _pop_publish_result(self) -> ua.PublishResult:
118
        """
119
        Return a `PublishResult` with all enqueued data changes, events and status changes.
120
        Clear all queues.
121
        """
122
        result = ua.PublishResult()
123
        result.SubscriptionId = self.data.SubscriptionId
124
        self._pop_triggered_datachanges(result)
125
        self._pop_triggered_events(result)
126
        self._pop_triggered_statuschanges(result)
127
        self._keep_alive_count = 0
128
        self._startup = False
129
        result.NotificationMessage.SequenceNumber = self._notification_seq
130
        if result.NotificationMessage.NotificationData and not self.no_acks:
131
            # Acknowledgement is only expected when the Subscription is for a client.
132
            self._notification_seq += 1
133
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
134
        result.MoreNotifications = False
135
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
136
        return result
137
138
    def _pop_triggered_datachanges(self, result: ua.PublishResult):
139
        """Append all enqueued data changes to the given `PublishResult` and clear the queue."""
140
        if self._triggered_datachanges:
141
            notif = ua.DataChangeNotification()
142
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
143
            self._triggered_datachanges = {}
144
            #self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
145
            result.NotificationMessage.NotificationData.append(notif)
146
147
    def _pop_triggered_events(self, result: ua.PublishResult):
148
        """Append all enqueued events to the given `PublishResult` and clear the queue."""
149
        if self._triggered_events:
150
            notif = ua.EventNotificationList()
151
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
152
            self._triggered_events = {}
153
            result.NotificationMessage.NotificationData.append(notif)
154
            #self.logger.debug("sending event notification with %s events", len(notif.Events))
155
156
    def _pop_triggered_statuschanges(self, result: ua.PublishResult):
157
        """Append all enqueued status changes to the given `PublishResult` and clear the queue."""
158
        if self._triggered_statuschanges:
159
            notif = ua.StatusChangeNotification()
160
            notif.Status = self._triggered_statuschanges.pop(0)
161
            result.NotificationMessage.NotificationData.append(notif)
162
            #self.logger.debug("sending event notification %s", notif.Status)
163
164
    def publish(self, acks: Iterable[int]):
165
        """
166
        Reset publish cycle count, acknowledge PublishResults.
167
        :param acks: Sequence number of the PublishResults to acknowledge
168
        """
169
        #self.logger.info("publish request with acks %s", acks)
170
        self._publish_cycles_count = 0
171
        for nb in acks:
172
            self._not_acknowledged_results.pop(nb, None)
173
174
    def republish(self, nb):
175
        #self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
176
        notification_message = self._not_acknowledged_results.pop(nb, None)
177
        if notification_message:
178
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
179
            return notification_message
180
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
181
        return ua.NotificationMessage()
182
183
    def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
184
        """
185
        Enqueue a monitored item data change.
186
        :param mid: Monitored Item Id
187
        :param eventdata: Monitored Item Notification
188
        :param maxsize: Max queue size (0: No limit)
189
        """
190
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
191
192
    def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
193
        """
194
        Enqueue a event.
195
        :param mid: Monitored Item Id
196
        :param eventdata: Event Field List
197
        :param maxsize: Max queue size (0: No limit)
198
        """
199
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
200
201
    def enqueue_statuschange(self, code):
202
        """
203
        Enqueue a status change.
204
        :param code:
205
        """
206
        self._triggered_statuschanges.append(code)
207
        self._trigger_publish()
208
209
    def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
210
                       queue: dict):
211
        if mid not in queue:
212
            # New Monitored Item Id
213
            queue[mid] = [eventdata]
214
            self._trigger_publish()
215
            return
216
        if size != 0:
217
            # Limit queue size
218
            if len(queue[mid]) >= size:
219
                queue[mid].pop(0)
220
        queue[mid].append(eventdata)
221