Completed
Push — master ( 5a9df5...39e3a6 )
by Olivier
223:12 queued 212:11
created

opcua.server.InternalSubscription.publish()   A

Complexity

Conditions 3

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3
Metric Value
cc 3
dl 0
loc 5
ccs 5
cts 5
cp 1
crap 3
rs 9.4286
1
"""
2
server side implementation of a subscription object
3
"""
4
5 1
from threading import RLock
6 1
import logging
7 1
import copy
8
9 1
from opcua import ua
10
11
12 1
class MonitoredItemData(object):
13
14 1
    def __init__(self):
15 1
        self.client_handle = None
16 1
        self.callback_handle = None
17 1
        self.monitored_item_id = None
18 1
        self.parameters = None
19 1
        self.mode = None
20
21
22 1
class MonitoredItemService(object):
23
24
    """
25
    implement monitoreditem service for 1 subscription
26
    """
27
28 1
    def __init__(self, isub, aspace):
29 1
        self.logger = logging.getLogger(__name__ + str(isub.data.SubscriptionId))
30 1
        self.isub = isub
31 1
        self.aspace = aspace
32 1
        self._lock = RLock()
33 1
        self._monitored_items = {}
34 1
        self._monitored_events = {}
35 1
        self._monitored_datachange = {}
36 1
        self._monitored_item_counter = 111
37
38 1
    def delete_all_monitored_items(self):
39 1
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
40
41 1
    def create_monitored_items(self, params):
42 1
        results = []
43 1
        for item in params.ItemsToCreate:
44 1
            results.append(self._create_monitored_item(item))
45 1
        return results
46
47 1
    def modify_monitored_items(self, params):
48
        results = []
49
        for item in params.ItemsToModify:
50
            results.append(self._modify_monitored_item(item))
51
        return results
52
53 1
    def trigger_datachange(self, handle, nodeid, attr):
54 1
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
55 1
        variant = self.aspace.get_attribute_value(nodeid, attr)
56 1
        self.datachange_callback(handle, variant)
57
58 1
    def _modify_monitored_item(self, params):
59
        with self._lock:
60
            for mdata in self._monitored_items.values():
61
                result = ua.MonitoredItemCreateResult()
62
                if mdata.monitored_item_id == params.MonitoredItemId:
63
                    result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
64
                    result.RevisedQueueSize = params.RequestedParameters.QueueSize
65
                    result.FilterResult = params.RequestedParameters.Filter
66
                    mdata.parameters = result
67
                    return result
68
            # FIXME modify event subscriptions
69
            result = ua.MonitoredItemCreateResult()
70
            result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
71
            return result
72
73 1
    def _create_monitored_item(self, params):
74 1
        with self._lock:
75 1
            result = ua.MonitoredItemCreateResult()
76 1
            result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
77 1
            result.RevisedQueueSize = params.RequestedParameters.QueueSize
78 1
            self._monitored_item_counter += 1
79 1
            result.MonitoredItemId = self._monitored_item_counter
80 1
            self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
81
82 1
            mdata = MonitoredItemData()
83 1
            mdata.parameters = result
84 1
            mdata.mode = params.MonitoringMode
85 1
            mdata.client_handle = params.RequestedParameters.ClientHandle
86 1
            mdata.mfilter = params.RequestedParameters.Filter
87 1
            mdata.monitored_item_id = result.MonitoredItemId
88
89 1
            self._monitored_items[result.MonitoredItemId] = mdata
90
91 1
            if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
92 1
                self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
93 1
                if self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value != 1:
94 1
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
95 1
                result.FilterResult = ua.EventFilterResult()
96 1
                for _ in params.RequestedParameters.Filter.SelectClauses:
97 1
                    result.FilterResult.SelectClauseResults.append(ua.StatusCode())
98
                # FIXME: where clause result
99 1
                self._monitored_events[params.ItemToMonitor.NodeId] = result.MonitoredItemId
100
            else:
101 1
                self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
102 1
                result.FilterResult = params.RequestedParameters.Filter
103 1
                result.StatusCode, handle = self.aspace.add_datachange_callback(params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
104 1
                self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
105 1
                mdata.callback_handle = handle
106 1
                self._monitored_datachange[handle] = result.MonitoredItemId
107 1
                if result.StatusCode.is_good():
108
                    # force data change event generation
109 1
                    self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
110
            
111 1
            if not result.StatusCode.is_good():
112 1
                del(self._monitored_items[result.MonitoredItemId])
113 1
                self._monitored_item_counter -= 1
114
115 1
            return result
116
117 1
    def delete_monitored_items(self, ids):
118 1
        self.logger.debug("delete monitored items %s", ids)
119 1
        with self._lock:
120 1
            results = []
121 1
            for mid in ids:
122 1
                results.append(self._delete_monitored_items(mid))
123 1
            return results
124
125 1
    def _delete_monitored_items(self, mid):
126 1
        if not mid in self._monitored_items:
127 1
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
128 1
        for k, v in self._monitored_events.items():
129 1
            if v == mid:
130 1
                self._monitored_events.pop(k)
131 1
                break
132 1
        for k, v in self._monitored_datachange.items():
133 1
            if v == mid:
134 1
                self.aspace.delete_datachange_callback(k)
135 1
                self._monitored_datachange.pop(k)
136 1
                break
137 1
        self._monitored_items.pop(mid)
138 1
        return ua.StatusCode()
139
140 1
    def datachange_callback(self, handle, value):
141 1
        self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self, handle, value.Value)
142 1
        event = ua.MonitoredItemNotification()
143 1
        with self._lock:
144 1
            mid = self._monitored_datachange[handle]
145 1
            mdata = self._monitored_items[mid]
146 1
            event.ClientHandle = mdata.client_handle
147 1
            event.Value = value
148 1
            self.isub.enqueue_datachange_event(mid, event, mdata.parameters.RevisedQueueSize)
149
150 1
    def trigger_event(self, event):
151 1
        with self._lock:
152 1
            if not event.SourceNode in self._monitored_events:
153
                self.logger.debug("%s has no subscription for events %s from node: %s", self, event, event.SourceNode)
154
                return False
155 1
            self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.SourceNode)
156 1
            mid = self._monitored_events[event.SourceNode]
157 1
            if not mid in self._monitored_items:
158
                self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
159
                return False
160 1
            mdata = self._monitored_items[mid]
161 1
            fieldlist = ua.EventFieldList()
162 1
            fieldlist.ClientHandle = mdata.client_handle
163 1
            fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event)
164 1
            self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize)
165 1
            return True
166
167 1
    def _get_event_fields(self, evfilter, event):
168 1
        fields = []
169 1
        for sattr in evfilter.SelectClauses:
170 1
            try:
171 1
                if not sattr.BrowsePath:
172
                    #val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
173
                    val = getattr(event, sattr.Attribute.name)
174
                    val = copy.deepcopy(val)
175
                    fields.append(ua.Variant(val))
176
                else:
177 1
                    name = sattr.BrowsePath[0].Name
178 1
                    val = getattr(event, name)
179 1
                    val = copy.deepcopy(val)
180 1
                    fields.append(ua.Variant(val))
181
            except AttributeError:
182
                fields.append(ua.Variant())
183 1
        return fields
184
185 1
    def trigger_statuschange(self, code):
186
        self.isub.enqueue_statuschange(code)
187
188
189 1
class InternalSubscription(object):
190
191 1
    def __init__(self, subservice, data, addressspace, callback):
192 1
        self.logger = logging.getLogger(__name__ + str(data.SubscriptionId))
193 1
        self.aspace = addressspace
194 1
        self.subservice = subservice
195 1
        self.data = data
196 1
        self.callback = callback
197 1
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
198 1
        self.task = None
199 1
        self._lock = RLock()
200 1
        self._triggered_datachanges = {}
201 1
        self._triggered_events = {}
202 1
        self._triggered_statuschanges = []
203 1
        self._notification_seq = 1
204 1
        self._not_acknowledged_results = {}
205 1
        self._startup = True
206 1
        self._keep_alive_count = 0
207 1
        self._publish_cycles_count = 0
208 1
        self._stopev = False
209
210 1
    def __str__(self):
211
        return "Subscription(id:{})".format(self.data.SubscriptionId)
212
213 1
    def start(self):
214 1
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
215 1
        self._subscription_loop()
216
217 1
    def stop(self):
218 1
        self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
219 1
        self._stopev = True
220 1
        self.monitored_item_srv.delete_all_monitored_items()
221
222 1
    def _subscription_loop(self):
223
        #self.logger.debug("%s loop", self)
224 1
        if not self._stopev:
225 1
            self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
226
227 1
    def _sub_loop(self):
228 1
        if self._stopev:
229 1
            return
230 1
        self.publish_results()
231 1
        self._subscription_loop()
232
233 1
    def has_published_results(self):
234 1
        with self._lock:
235 1
            if self._startup or self._triggered_datachanges or self._triggered_events:
236 1
                return True
237 1
            if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
238
                self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event", self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
239
                return True
240 1
            self._keep_alive_count += 1
241 1
            return False
242
243 1
    def publish_results(self):
244 1
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
245
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
246
            # FIXME this will never be send since we do not have publish request anyway
247
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
248
            self._stopev = True
249 1
        with self._lock:
250 1
            if self.has_published_results():  # FIXME: should we pop a publish request here? or we do not care?
251 1
                self._publish_cycles_count += 1
252 1
                result = self._pop_publish_result()
253 1
                self.callback(result)
254
255 1
    def _pop_publish_result(self):
256 1
        result = ua.PublishResult()
257 1
        result.SubscriptionId = self.data.SubscriptionId
258 1
        self._pop_triggered_datachanges(result)
259 1
        self._pop_triggered_events(result)
260 1
        self._pop_triggered_statuschanges(result)
261 1
        self._keep_alive_count = 0
262 1
        self._startup = False
263 1
        result.NotificationMessage.SequenceNumber = self._notification_seq
264 1
        if len(result.NotificationMessage.NotificationData) != 0:
265 1
            self._notification_seq += 1
266 1
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
267 1
        result.MoreNotifications = False
268 1
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
269 1
        return result
270
271 1
    def _pop_triggered_datachanges(self, result):
272 1
        if self._triggered_datachanges:
273 1
            notif = ua.DataChangeNotification()
274 1
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
275 1
            self._triggered_datachanges = {}
276 1
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
277 1
            result.NotificationMessage.NotificationData.append(notif)
278
279 1
    def _pop_triggered_events(self, result):
280 1
        if self._triggered_events:
281 1
            notif = ua.EventNotificationList()
282 1
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
283 1
            self._triggered_events = {}
284 1
            result.NotificationMessage.NotificationData.append(notif)
285 1
            self.logger.debug("sending event notification with %s events", len(notif.Events))
286
287 1
    def _pop_triggered_statuschanges(self, result):
288 1
        if self._triggered_statuschanges:
289
            notif = ua.StatusChangeNotification()
290
            notif.Status = self._triggered_statuschanges.pop(0)
291
            result.NotificationMessage.NotificationData.append(notif)
292
            self.logger.debug("sending event notification %s", len(notif.Status))
293
294 1
    def publish(self, nb):
295 1
        with self._lock:
296 1
            self._publish_cycles_count = 0
297 1
            if nb in self._not_acknowledged_results:
298 1
                self._not_acknowledged_results.pop(nb)
299
300 1
    def republish(self, nb):
301
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
302
        with self._lock:
303
            if nb in self._not_acknowledged_results:
304
                self.logger.info("re-publishing ack %s in subscription %s", nb, self)
305
                return self._not_acknowledged_results[nb].NotificationMessage
306
            else:
307
                self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
308
                return ua.NotificationMessage()
309
310 1
    def enqueue_datachange_event(self, mid, eventdata, maxsize):
311 1
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
312
313 1
    def enqueue_event(self, mid, eventdata, maxsize):
314 1
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
315
316 1
    def enqueue_statuschange(self, code):
317
        self._triggered_statuschanges.append(code)
318
319 1
    def _enqueue_event(self, mid, eventdata, size, queue):
320 1
        if mid not in queue:
321 1
            queue[mid] = [eventdata]
322 1
            return
323 1
        if size != 0:
324 1
            if len(queue[mid]) >= size:
325 1
                queue[mid].pop(0)
326
        queue[mid].append(eventdata)
327