Passed
Pull Request — master (#45)
by
unknown
02:29
created

asyncua.server.internal_subscription.MonitoredItemService.deadband_callback()   B

Complexity

Conditions 6

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 10
nop 3
dl 0
loc 10
rs 8.6666
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from asyncua import ua
9
from .monitored_item_service import MonitoredItemService
10
11
12
class InternalSubscription:
13
    """
14
15
    """
16
    def __init__(self, subservice, data, addressspace, callback=None):
17
        self.logger = logging.getLogger(__name__)
18
        self.aspace = addressspace
19
        self.subservice = subservice
20
        self.data = data
21
        self.pub_result_callback = callback
22
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
23
        self.task = None
24
        self._triggered_datachanges = {}
25
        self._triggered_events = {}
26
        self._triggered_statuschanges = []
27
        self._notification_seq = 1
28
        self._not_acknowledged_results = {}
29
        self._startup = True
30
        self._keep_alive_count = 0
31
        self._publish_cycles_count = 0
32
        self._task = None
33
34
    def __str__(self):
35
        return f"Subscription(id:{self.data.SubscriptionId})"
36
37
    async def start(self):
38
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
39
        if self.data.RevisedPublishingInterval > 0.0:
40
            self._task = self.subservice.loop.create_task(self._subscription_loop())
41
42
    async def stop(self):
43
        self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
44
        self._task.cancel()
45
        await self._task
46
        self.monitored_item_srv.delete_all_monitored_items()
47
48
    def _trigger_publish(self):
49
        if self._task and self.data.RevisedPublishingInterval <= 0.0:
50
            self.publish_results()
51
52
    async def _subscription_loop(self):
53
        try:
54
            while True:
55
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
56
                self.publish_results()
57
        except asyncio.CancelledError:
58
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
59
            pass
60
        except Exception:
61
            # seems this except is necessary to print errors
62
            self.logger.exception("Exception in subscription loop")
63
64
    def has_published_results(self):
65
        if self._startup or self._triggered_datachanges or self._triggered_events:
66
            return True
67
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
68
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
69
                              self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
70
            return True
71
        self._keep_alive_count += 1
72
        return False
73
74
    def publish_results(self):
75
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
76
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
77
                                self._publish_cycles_count, self.data.RevisedLifetimeCount)
78
            # FIXME this will never be send since we do not have publish request anyway
79
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
80
        result = None
81
        if self.has_published_results():
82
            # FIXME: should we pop a publish request here? or we do not care?
83
            self._publish_cycles_count += 1
84
            result = self._pop_publish_result()
85
        if result is not None:
86
            self.logger.info('publish_results for %s', self.data.SubscriptionId)
87
            self.pub_result_callback(result)
88
89
    def _pop_publish_result(self):
90
        result = ua.PublishResult()
91
        result.SubscriptionId = self.data.SubscriptionId
92
        self._pop_triggered_datachanges(result)
93
        self._pop_triggered_events(result)
94
        self._pop_triggered_statuschanges(result)
95
        self._keep_alive_count = 0
96
        self._startup = False
97
        result.NotificationMessage.SequenceNumber = self._notification_seq
98
        if result.NotificationMessage.NotificationData:
99
            self._notification_seq += 1
100
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
101
        result.MoreNotifications = False
102
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
103
        return result
104
105
    def _pop_triggered_datachanges(self, result):
106
        if self._triggered_datachanges:
107
            notif = ua.DataChangeNotification()
108
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
109
            self._triggered_datachanges = {}
110
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
111
            result.NotificationMessage.NotificationData.append(notif)
112
113
    def _pop_triggered_events(self, result):
114
        if self._triggered_events:
115
            notif = ua.EventNotificationList()
116
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
117
            self._triggered_events = {}
118
            result.NotificationMessage.NotificationData.append(notif)
119
            self.logger.debug("sending event notification with %s events", len(notif.Events))
120
121
    def _pop_triggered_statuschanges(self, result):
122
        if self._triggered_statuschanges:
123
            notif = ua.StatusChangeNotification()
124
            notif.Status = self._triggered_statuschanges.pop(0)
125
            result.NotificationMessage.NotificationData.append(notif)
126
            self.logger.debug("sending event notification %s", notif.Status)
127
128
    def publish(self, acks):
129
        """
130
        :param acks:
131
        :return:
132
        """
133
        self.logger.info("publish request with acks %s", acks)
134
        self._publish_cycles_count = 0
135
        for nb in acks:
136
            self._not_acknowledged_results.pop(nb, None)
137
138
    def republish(self, nb):
139
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
140
        notificationMessage = self._not_acknowledged_results.pop(nb, None)
141
        if notificationMessage:
142
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
143
            return notificationMessage
144
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
145
        return ua.NotificationMessage()
146
147
    def enqueue_datachange_event(self, mid, eventdata, maxsize):
148
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
149
150
    def enqueue_event(self, mid, eventdata, maxsize):
151
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
152
153
    def enqueue_statuschange(self, code):
154
        self._triggered_statuschanges.append(code)
155
        self._trigger_publish()
156
157
    def _enqueue_event(self, mid, eventdata, size, queue):
158
        if mid not in queue:
159
            queue[mid] = [eventdata]
160
            self._trigger_publish()
161
            return
162
        if size != 0:
163
            if len(queue[mid]) >= size:
164
                queue[mid].pop(0)
165
        queue[mid].append(eventdata)
166
167