Passed
Pull Request — master (#183)
by
unknown
02:12
created

asyncua.server.internal_subscription   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 134
dl 0
loc 222
rs 8.8798
c 0
b 0
f 0
wmc 44

18 Methods

Rating   Name   Duplication   Size   Complexity  
A InternalSubscription.start() 0 4 2
A InternalSubscription.__init__() 0 24 1
A InternalSubscription.__str__() 0 2 1
A InternalSubscription._trigger_publish() 0 7 3
A InternalSubscription.stop() 0 7 2
A InternalSubscription._subscription_loop() 0 14 4
A InternalSubscription.has_published_results() 0 9 5
A InternalSubscription.enqueue_event() 0 8 1
A InternalSubscription.republish() 0 8 2
A InternalSubscription._pop_publish_result() 0 20 3
A InternalSubscription.enqueue_datachange_event() 0 8 1
A InternalSubscription._enqueue_event() 0 12 4
A InternalSubscription._pop_triggered_events() 0 7 2
A InternalSubscription._pop_triggered_statuschanges() 0 6 2
B InternalSubscription.publish_results() 0 23 6
A InternalSubscription._pop_triggered_datachanges() 0 8 2
A InternalSubscription.enqueue_statuschange() 0 7 1
A InternalSubscription.publish() 0 9 2

How to fix   Complexity   

Complexity

Complex classes like asyncua.server.internal_subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
import asyncio
7
8
from typing import Union, Iterable, Dict, List
9
from asyncua import ua
10
from .monitored_item_service import MonitoredItemService
11
from .address_space import AddressSpace
12
13
14
class InternalSubscription:
15
    """
16
    Server internal subscription.
17
    Runs the publication loop and stores the Publication Results until they are acknowledged.
18
    """
19
20
    def __init__(self, loop: asyncio.AbstractEventLoop, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
21
                 callback=None, no_acks=False):
22
        """
23
        :param loop: Event loop instance
24
        :param data: Create Subscription Result
25
        :param aspace: Server Address Space
26
        :param callback: Callback for publishing
27
        :param no_acks: If true no acknowledging will be expected (for server internal subscriptions)
28
        """
29
        self.logger = logging.getLogger(__name__)
30
        self.loop: asyncio.AbstractEventLoop = loop
31
        self.data: ua.CreateSubscriptionResult = data
32
        self.pub_result_callback = callback
33
        self.monitored_item_srv = MonitoredItemService(self, aspace)
34
        self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
35
        self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
36
        self._triggered_statuschanges: list = []
37
        self._notification_seq = 1
38
        self._not_acknowledged_results: Dict[int, ua.PublishResult] = {}
39
        self._startup = True
40
        self._keep_alive_count = 0
41
        self._publish_cycles_count = 0
42
        self._task = None
43
        self.no_acks = no_acks
44
45
    def __str__(self):
46
        return f"Subscription(id:{self.data.SubscriptionId})"
47
48
    async def start(self):
49
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
50
        if self.data.RevisedPublishingInterval > 0.0:
51
            self._task = self.loop.create_task(self._subscription_loop())
52
53
    async def stop(self):
54
        if self._task:
55
            self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
56
            self._task.cancel()
57
            await self._task
58
            self._task = None
59
        self.monitored_item_srv.delete_all_monitored_items()
60
61
    def _trigger_publish(self):
62
        """
63
        Trigger immediate publication (if requested by the PublishingInterval).
64
        """
65
        if not self._task and self.data.RevisedPublishingInterval <= 0.0:
66
            # Publish immediately (as fast as possible)
67
            self.publish_results()
68
69
    async def _subscription_loop(self):
70
        """
71
        Start the publication loop running at the RevisedPublishingInterval.
72
        """
73
        try:
74
            while True:
75
                await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
76
                self.publish_results()
77
        except asyncio.CancelledError:
78
            self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
79
            pass
80
        except Exception:
81
            # seems this except is necessary to log errors
82
            self.logger.exception("Exception in subscription loop")
83
84
    def has_published_results(self):
85
        if self._startup or self._triggered_datachanges or self._triggered_events:
86
            return True
87
        if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
88
            self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
89
                self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
90
            return True
91
        self._keep_alive_count += 1
92
        return False
93
94
    def publish_results(self):
95
        """
96
        Publish all enqueued data changes, events and status changes though the callback.
97
        """
98
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
99
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
100
                self._publish_cycles_count, self.data.RevisedLifetimeCount)
101
            # FIXME this will never be send since we do not have publish request anyway
102
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
103
        result = None
104
        if self.has_published_results():
105
            if not self.no_acks:
106
                self._publish_cycles_count += 1
107
            result = self._pop_publish_result()
108
        if result is not None:
109
            #self.logger.info('publish_results for %s', self.data.SubscriptionId)
110
            # The callback can be:
111
            #    Subscription.publish_callback -> server internal subscription
112
            #    UaProcessor.forward_publish_response -> client subscription
113
            if asyncio.iscoroutinefunction(self.pub_result_callback):
114
                asyncio.create_task(self.pub_result_callback(result))
115
            else:
116
                self.loop.call_soon(self.pub_result_callback, result)
117
118
    def _pop_publish_result(self) -> ua.PublishResult:
119
        """
120
        Return a `PublishResult` with all enqueued data changes, events and status changes.
121
        Clear all queues.
122
        """
123
        result = ua.PublishResult()
124
        result.SubscriptionId = self.data.SubscriptionId
125
        self._pop_triggered_datachanges(result)
126
        self._pop_triggered_events(result)
127
        self._pop_triggered_statuschanges(result)
128
        self._keep_alive_count = 0
129
        self._startup = False
130
        result.NotificationMessage.SequenceNumber = self._notification_seq
131
        if result.NotificationMessage.NotificationData and not self.no_acks:
132
            # Acknowledgement is only expected when the Subscription is for a client.
133
            self._notification_seq += 1
134
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
135
        result.MoreNotifications = False
136
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
137
        return result
138
139
    def _pop_triggered_datachanges(self, result: ua.PublishResult):
140
        """Append all enqueued data changes to the given `PublishResult` and clear the queue."""
141
        if self._triggered_datachanges:
142
            notif = ua.DataChangeNotification()
143
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
144
            self._triggered_datachanges = {}
145
            #self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
146
            result.NotificationMessage.NotificationData.append(notif)
147
148
    def _pop_triggered_events(self, result: ua.PublishResult):
149
        """Append all enqueued events to the given `PublishResult` and clear the queue."""
150
        if self._triggered_events:
151
            notif = ua.EventNotificationList()
152
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
153
            self._triggered_events = {}
154
            result.NotificationMessage.NotificationData.append(notif)
155
            #self.logger.debug("sending event notification with %s events", len(notif.Events))
156
157
    def _pop_triggered_statuschanges(self, result: ua.PublishResult):
158
        """Append all enqueued status changes to the given `PublishResult` and clear the queue."""
159
        if self._triggered_statuschanges:
160
            notif = ua.StatusChangeNotification()
161
            notif.Status = self._triggered_statuschanges.pop(0)
162
            result.NotificationMessage.NotificationData.append(notif)
163
            #self.logger.debug("sending event notification %s", notif.Status)
164
165
    def publish(self, acks: Iterable[int]):
166
        """
167
        Reset publish cycle count, acknowledge PublishResults.
168
        :param acks: Sequence number of the PublishResults to acknowledge
169
        """
170
        #self.logger.info("publish request with acks %s", acks)
171
        self._publish_cycles_count = 0
172
        for nb in acks:
173
            self._not_acknowledged_results.pop(nb, None)
174
175
    def republish(self, nb):
176
        #self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
177
        result = self._not_acknowledged_results.pop(nb, None)
178
        if result:
179
            self.logger.info("re-publishing ack %s in subscription %s", nb, self)
180
            return result.NotificationMessage
181
        self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
182
        return ua.NotificationMessage()
183
184
    def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
185
        """
186
        Enqueue a monitored item data change.
187
        :param mid: Monitored Item Id
188
        :param eventdata: Monitored Item Notification
189
        :param maxsize: Max queue size (0: No limit)
190
        """
191
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
192
193
    def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
194
        """
195
        Enqueue a event.
196
        :param mid: Monitored Item Id
197
        :param eventdata: Event Field List
198
        :param maxsize: Max queue size (0: No limit)
199
        """
200
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
201
202
    def enqueue_statuschange(self, code):
203
        """
204
        Enqueue a status change.
205
        :param code:
206
        """
207
        self._triggered_statuschanges.append(code)
208
        self._trigger_publish()
209
210
    def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
211
                       queue: dict):
212
        if mid not in queue:
213
            # New Monitored Item Id
214
            queue[mid] = [eventdata]
215
            self._trigger_publish()
216
            return
217
        if size != 0:
218
            # Limit queue size
219
            if len(queue[mid]) >= size:
220
                queue[mid].pop(0)
221
        queue[mid].append(eventdata)
222