Passed
Push — master ( bd9d8f...4a3463 )
by Olivier
01:00 queued 14s
created

asyncua.server.internal_subscription   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 219
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 132
dl 0
loc 219
rs 8.96
c 0
b 0
f 0
wmc 43

18 Methods

Rating   Name   Duplication   Size   Complexity  
A InternalSubscription.start() 0 4 2
A InternalSubscription.__init__() 0 24 1
A InternalSubscription.__str__() 0 2 1
A InternalSubscription.enqueue_event() 0 8 1
A InternalSubscription.republish() 0 8 2
A InternalSubscription._pop_publish_result() 0 20 3
A InternalSubscription.enqueue_datachange_event() 0 8 1
A InternalSubscription._enqueue_event() 0 12 4
A InternalSubscription._trigger_publish() 0 7 3
A InternalSubscription._pop_triggered_events() 0 7 2
A InternalSubscription.stop() 0 7 2
A InternalSubscription._pop_triggered_statuschanges() 0 6 2
A InternalSubscription.publish_results() 0 20 5
A InternalSubscription._pop_triggered_datachanges() 0 8 2
A InternalSubscription.enqueue_statuschange() 0 7 1
A InternalSubscription._subscription_loop() 0 14 4
A InternalSubscription.has_published_results() 0 9 5
A InternalSubscription.publish() 0 9 2

How to fix   Complexity   

Complexity

Complex classes like asyncua.server.internal_subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from typing import Union, Iterable, Dict, List
9
from asyncua import ua
10
from .monitored_item_service import MonitoredItemService
11
from .address_space import AddressSpace
12
13
14
class InternalSubscription:
15
    """
16
    Server internal subscription.
17
    Runs the publication loop and stores the Publication Results until they are acknowledged.
18
    """
19
20
    def __init__(self, loop: asyncio.AbstractEventLoop, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
21
                 callback=None, no_acks=False):
22
        """
23
        :param loop: Event loop instance
24
        :param data: Create Subscription Result
25
        :param aspace: Server Address Space
26
        :param callback: Callback for publishing
27
        :param no_acks: If true no acknowledging will be expected (for server internal subscriptions)
28
        """
29
        self.logger = logging.getLogger(__name__)
30
        self.loop: asyncio.AbstractEventLoop = loop
31
        self.data: ua.CreateSubscriptionResult = data
32
        self.pub_result_callback = callback
33
        self.monitored_item_srv = MonitoredItemService(self, aspace)
34
        self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
35
        self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
36
        self._triggered_statuschanges: list = []
37
        self._notification_seq = 1
38
        self._not_acknowledged_results: Dict[int, ua.PublishResult] = {}
39
        self._startup = True
40
        self._keep_alive_count = 0
41
        self._publish_cycles_count = 0
42
        self._task = None
43
        self.no_acks = no_acks
44
45
    def __str__(self):
46
        return f"Subscription(id:{self.data.SubscriptionId})"
47
48
    async def start(self):
49
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
50
        if self.data.RevisedPublishingInterval > 0.0:
51
            self._task = self.loop.create_task(self._subscription_loop())
52
53
    async def stop(self):
54
        if self._task:
55
            self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
56
            self._task.cancel()
57
            await self._task
58
            self._task = None
59
        self.monitored_item_srv.delete_all_monitored_items()
60
61
    def _trigger_publish(self):
62
        """
63
        Trigger immediate publication (if requested by the PublishingInterval).
64
        """
65
        if not self._task and self.data.RevisedPublishingInterval <= 0.0:
66
            # Publish immediately (as fast as possible)
67
            self.publish_results()
68
69
    async def _subscription_loop(self):
70
        """
71
        Start the publication loop running at the RevisedPublishingInterval.
72
        """
73
        try:
74
            while True:
75
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
76
                self.publish_results()
77
        except asyncio.CancelledError:
78
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
79
            pass
80
        except Exception:
81
            # seems this except is necessary to log errors
82
            self.logger.exception("Exception in subscription loop")
83
84
    def has_published_results(self):
85
        if self._startup or self._triggered_datachanges or self._triggered_events:
86
            return True
87
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
88
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
89
                self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
90
            return True
91
        self._keep_alive_count += 1
92
        return False
93
94
    def publish_results(self):
95
        """
96
        Publish all enqueued data changes, events and status changes though the callback.
97
        """
98
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
99
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
100
                self._publish_cycles_count, self.data.RevisedLifetimeCount)
101
            # FIXME this will never be send since we do not have publish request anyway
102
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
103
        result = None
104
        if self.has_published_results():
105
            if not self.no_acks:
106
                self._publish_cycles_count += 1
107
            result = self._pop_publish_result()
108
        if result is not None:
109
            #self.logger.info('publish_results for %s', self.data.SubscriptionId)
110
            # The callback can be:
111
            #    Subscription.publish_callback -> server internal subscription
112
            #    UaProcessor.forward_publish_response -> client subscription
113
            self.pub_result_callback(result)
114
115
    def _pop_publish_result(self) -> ua.PublishResult:
116
        """
117
        Return a `PublishResult` with all enqueued data changes, events and status changes.
118
        Clear all queues.
119
        """
120
        result = ua.PublishResult()
121
        result.SubscriptionId = self.data.SubscriptionId
122
        self._pop_triggered_datachanges(result)
123
        self._pop_triggered_events(result)
124
        self._pop_triggered_statuschanges(result)
125
        self._keep_alive_count = 0
126
        self._startup = False
127
        result.NotificationMessage.SequenceNumber = self._notification_seq
128
        if result.NotificationMessage.NotificationData and not self.no_acks:
129
            # Acknowledgement is only expected when the Subscription is for a client.
130
            self._notification_seq += 1
131
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
132
        result.MoreNotifications = False
133
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
134
        return result
135
136
    def _pop_triggered_datachanges(self, result: ua.PublishResult):
137
        """Append all enqueued data changes to the given `PublishResult` and clear the queue."""
138
        if self._triggered_datachanges:
139
            notif = ua.DataChangeNotification()
140
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
141
            self._triggered_datachanges = {}
142
            #self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
143
            result.NotificationMessage.NotificationData.append(notif)
144
145
    def _pop_triggered_events(self, result: ua.PublishResult):
146
        """Append all enqueued events to the given `PublishResult` and clear the queue."""
147
        if self._triggered_events:
148
            notif = ua.EventNotificationList()
149
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
150
            self._triggered_events = {}
151
            result.NotificationMessage.NotificationData.append(notif)
152
            #self.logger.debug("sending event notification with %s events", len(notif.Events))
153
154
    def _pop_triggered_statuschanges(self, result: ua.PublishResult):
155
        """Append all enqueued status changes to the given `PublishResult` and clear the queue."""
156
        if self._triggered_statuschanges:
157
            notif = ua.StatusChangeNotification()
158
            notif.Status = self._triggered_statuschanges.pop(0)
159
            result.NotificationMessage.NotificationData.append(notif)
160
            #self.logger.debug("sending event notification %s", notif.Status)
161
162
    def publish(self, acks: Iterable[int]):
163
        """
164
        Reset publish cycle count, acknowledge PublishResults.
165
        :param acks: Sequence number of the PublishResults to acknowledge
166
        """
167
        #self.logger.info("publish request with acks %s", acks)
168
        self._publish_cycles_count = 0
169
        for nb in acks:
170
            self._not_acknowledged_results.pop(nb, None)
171
172
    def republish(self, nb):
173
        #self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
174
        notification_message = self._not_acknowledged_results.pop(nb, None)
175
        if notification_message:
176
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
177
            return notification_message
178
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
179
        return ua.NotificationMessage()
180
181
    def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
182
        """
183
        Enqueue a monitored item data change.
184
        :param mid: Monitored Item Id
185
        :param eventdata: Monitored Item Notification
186
        :param maxsize: Max queue size (0: No limit)
187
        """
188
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
189
190
    def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
191
        """
192
        Enqueue a event.
193
        :param mid: Monitored Item Id
194
        :param eventdata: Event Field List
195
        :param maxsize: Max queue size (0: No limit)
196
        """
197
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
198
199
    def enqueue_statuschange(self, code):
200
        """
201
        Enqueue a status change.
202
        :param code:
203
        """
204
        self._triggered_statuschanges.append(code)
205
        self._trigger_publish()
206
207
    def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
208
                       queue: dict):
209
        if mid not in queue:
210
            # New Monitored Item Id
211
            queue[mid] = [eventdata]
212
            self._trigger_publish()
213
            return
214
        if size != 0:
215
            # Limit queue size
216
            if len(queue[mid]) >= size:
217
                queue[mid].pop(0)
218
        queue[mid].append(eventdata)
219