Completed
Pull Request — master (#184)
by
unknown
03:54
created

MonitoredItemService.__init__()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1
Metric Value
cc 1
dl 0
loc 9
rs 9.6666
ccs 9
cts 9
cp 1
crap 1
1
"""
2
server side implementation of a subscription object
3
"""
4
5 1
from threading import RLock
6 1
import logging
7 1
import copy
8
9 1
from opcua import ua
10
11
12 1
class MonitoredItemData(object):
13
14 1
    def __init__(self):
15 1
        self.client_handle = None
16 1
        self.callback_handle = None
17 1
        self.monitored_item_id = None
18 1
        self.parameters = None
19 1
        self.mode = None
20 1
        self.mfilter = None
21
22
23 1
class MonitoredItemService(object):
24
25
    """
26
    implement monitoreditem service for 1 subscription
27
    """
28
29 1
    def __init__(self, isub, aspace):
30 1
        self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
31 1
        self.isub = isub
32 1
        self.aspace = aspace
33 1
        self._lock = RLock()
34 1
        self._monitored_items = {}
35 1
        self._monitored_events = {}
36 1
        self._monitored_datachange = {}
37 1
        self._monitored_item_counter = 111
38
39 1
    def delete_all_monitored_items(self):
40 1
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
41
42 1
    def create_monitored_items(self, params):
43 1
        results = []
44 1
        for item in params.ItemsToCreate:
45 1
            with self._lock:
46 1
                if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
47
                    result = self._create_events_monitored_item(item)
48 1
                else:
49
                    result = self._create_data_change_monitored_item(item)
50
            results.append(result)
51
        return results
52
53
    def modify_monitored_items(self, params):
54 1
        results = []
55 1
        for item in params.ItemsToModify:
56 1
            results.append(self._modify_monitored_item(item))
57 1
        return results
58
59 1
    def trigger_datachange(self, handle, nodeid, attr):
60
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
61
        variant = self.aspace.get_attribute_value(nodeid, attr)
62
        self.datachange_callback(handle, variant, variant)
63
64
    def _modify_monitored_item(self, params):
65
        with self._lock:
66
            for mdata in self._monitored_items.values():
67
                result = ua.MonitoredItemModifyResult()
68
                if mdata.monitored_item_id == params.MonitoredItemId:
69
                    self.isub.data.RevisedPublishingInterval = params.RequestedParameters.SamplingInterval
70
                    result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
71
                    result.RevisedQueueSize = params.RequestedParameters.QueueSize
72
                    result.FilterResult = params.RequestedParameters.Filter
73
                    mdata.mfilter = result.FilterResult
74 1
                    mdata.parameters = result
75 1
                    return result
76 1
            result = ua.MonitoredItemModifyResult()
77 1
            result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
78 1
            return result
79 1
80 1
    def _commit_monitored_item(self, result, mdata):
81 1
        if result.StatusCode.is_good():
82
            self._monitored_items[result.MonitoredItemId] = mdata
83 1
            self._monitored_item_counter += 1
84 1
85 1
    def _make_monitored_item_common(self, params):
86 1
        result = ua.MonitoredItemCreateResult()
87 1
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
88 1
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
89
        self._monitored_item_counter += 1
90 1
        result.MonitoredItemId = self._monitored_item_counter
91
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
92 1
93 1
        mdata = MonitoredItemData()
94 1
        mdata.parameters = result
95 1
        mdata.mode = params.MonitoringMode
96 1
        mdata.client_handle = params.RequestedParameters.ClientHandle
97 1
        mdata.mfilter = params.RequestedParameters.Filter
98
        mdata.monitored_item_id = result.MonitoredItemId
99 1
100 1
        return result, mdata
101 1
102 1
    def _create_events_monitored_item(self, params):
103
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
104 1
105
        result, mdata = self._make_monitored_item_common(params)
106 1
        ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
107 1
        if ev_notify_byte is None or ev_notify_byte & 1 == 0:
108 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
109 1
            return result
110 1
        result.FilterResult = ua.EventFilterResult()
111 1
        for _ in params.RequestedParameters.Filter.SelectClauses:
112 1
            result.FilterResult.SelectClauseResults.append(ua.StatusCode())
113
        # FIXME: where clause result
114 1
        self._commit_monitored_item(result, mdata)
115
        self._monitored_events[params.ItemToMonitor.NodeId] = result.MonitoredItemId
116 1
        return result
117 1
118 1
    def _create_data_change_monitored_item(self, params):
119
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
120 1
121
        result, mdata = self._make_monitored_item_common(params)
122 1
        result.FilterResult = params.RequestedParameters.Filter
123 1
        result.StatusCode, handle = self.aspace.add_datachange_callback(params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
124 1
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
125 1
        mdata.callback_handle = handle
126 1
        self._commit_monitored_item(result, mdata)
127 1
        if result.StatusCode.is_good():
128 1
            self._monitored_datachange[handle] = result.MonitoredItemId
129
            # force data change event generation
130 1
            self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
131 1
        return result
132 1
133 1
    def delete_monitored_items(self, ids):
134 1
        self.logger.debug("delete monitored items %s", ids)
135 1
        with self._lock:
136 1
            results = []
137 1
            for mid in ids:
138 1
                results.append(self._delete_monitored_items(mid))
139 1
            return results
140 1
141 1
    def _delete_monitored_items(self, mid):
142 1
        if mid not in self._monitored_items:
143 1
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
144
        for k, v in self._monitored_events.items():
145 1
            if v == mid:
146 1
                self._monitored_events.pop(k)
147
                break
148
        for k, v in self._monitored_datachange.items():
149
            if v == mid:
150 1
                self.aspace.delete_datachange_callback(k)
151 1
                self._monitored_datachange.pop(k)
152 1
                break
153 1
        self._monitored_items.pop(mid)
154 1
        return ua.StatusCode()
155 1
156 1
    def datachange_callback(self, handle, value, value_old, error=None):
157 1
        if error:
158
            self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
159 1
                             handle, error)
160 1
            self.trigger_statuschange(error)
161 1
        else:
162 1
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
163 1
                             handle, value.Value)
164 1
            event = ua.MonitoredItemNotification()
165 1
            with self._lock:
166 1
                mid = self._monitored_datachange[handle]
167
                mdata = self._monitored_items[mid]
168
                if mdata.mfilter != None:
169 1
                    deadband_flag_pass = self.deadband_callback(value, value_old, mdata.mfilter)
170 1
                else:
171 1
                    deadband_flag_pass = True
172 1
                if deadband_flag_pass:
173 1
                    event.ClientHandle = mdata.client_handle
174 1
                    event.Value = value
175
                    self.isub.enqueue_datachange_event(mid, event, mdata.parameters.RevisedQueueSize)
176 1
177 1
    def deadband_callback(self, value, value_old, filter):
178 1
        if ((abs(value.Value.Value - value_old.Value.Value)) > filter.DeadbandValue):
179 1
            return True
180 1
        else:
181
            return False
182
183
    def trigger_event(self, event):
184
        with self._lock:
185
            if event.SourceNode not in self._monitored_events:
186 1
                self.logger.debug("%s has no subscription for events %s from node: %s", self, event, event.SourceNode)
187 1
                return False
188 1
            self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.SourceNode)
189 1
            mid = self._monitored_events[event.SourceNode]
190
            if mid not in self._monitored_items:
191
                self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
192 1
                return False
193
            mdata = self._monitored_items[mid]
194 1
            fieldlist = ua.EventFieldList()
195
            fieldlist.ClientHandle = mdata.client_handle
196
            fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event)
197
            self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize)
198 1
            return True
199
200 1
    def _get_event_fields(self, evfilter, event):
201 1
        fields = []
202 1
        for sattr in evfilter.SelectClauses:
203 1
            try:
204 1
                if not sattr.BrowsePath:
205 1
                    #val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
206 1
                    val = getattr(event, sattr.Attribute.name)
207 1
                    val = copy.deepcopy(val)
208 1
                    fields.append(ua.Variant(val))
209 1
                else:
210 1
                    name = sattr.BrowsePath[0].Name
211 1
                    val = getattr(event, name)
212 1
                    val = copy.deepcopy(val)
213 1
                    fields.append(ua.Variant(val))
214 1
            except AttributeError:
215 1
                fields.append(ua.Variant())
216 1
        return fields
217 1
218
    def trigger_statuschange(self, code):
219 1
        self.isub.enqueue_statuschange(code)
220
221
222 1
class InternalSubscription(object):
223 1
224 1
    def __init__(self, subservice, data, addressspace, callback):
225
        self.logger = logging.getLogger(__name__ + "." + str(data.SubscriptionId))
226 1
        self.aspace = addressspace
227 1
        self.subservice = subservice
228 1
        self.data = data
229 1
        self.callback = callback
230
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
231 1
        self.task = None
232
        self._lock = RLock()
233 1
        self._triggered_datachanges = {}
234 1
        self._triggered_events = {}
235
        self._triggered_statuschanges = []
236 1
        self._notification_seq = 1
237 1
        self._not_acknowledged_results = {}
238 1
        self._startup = True
239 1
        self._keep_alive_count = 0
240 1
        self._publish_cycles_count = 0
241
        self._stopev = False
242 1
243 1
    def __str__(self):
244 1
        return "Subscription(id:{})".format(self.data.SubscriptionId)
245 1
246 1
    def start(self):
247
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
248
        self._subscription_loop()
249 1
250 1
    def stop(self):
251
        self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
252 1
        self._stopev = True
253 1
        self.monitored_item_srv.delete_all_monitored_items()
254
255
    def _subscription_loop(self):
256
        #self.logger.debug("%s loop", self)
257
        if not self._stopev:
258 1
            self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
259 1
260 1
    def _sub_loop(self):
261 1
        if self._stopev:
262 1
            return
263 1
        self.publish_results()
264 1
        self._subscription_loop()
265
266 1
    def has_published_results(self):
267 1
        with self._lock:
268 1
            if self._startup or self._triggered_datachanges or self._triggered_events:
269 1
                return True
270 1
            if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
271 1
                self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event", self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
272 1
                return True
273 1
            self._keep_alive_count += 1
274 1
            return False
275 1
276 1
    def publish_results(self):
277 1
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
278 1
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
279 1
            # FIXME this will never be send since we do not have publish request anyway
280 1
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
281
            self._stopev = True
282 1
        result = None
283 1
        with self._lock:
284 1
            if self.has_published_results():  # FIXME: should we pop a publish request here? or we do not care?
285 1
                self._publish_cycles_count += 1
286 1
                result = self._pop_publish_result()
287 1
        if result is not None:
288 1
            self.callback(result)
289
290 1
    def _pop_publish_result(self):
291 1
        result = ua.PublishResult()
292 1
        result.SubscriptionId = self.data.SubscriptionId
293 1
        self._pop_triggered_datachanges(result)
294 1
        self._pop_triggered_events(result)
295 1
        self._pop_triggered_statuschanges(result)
296 1
        self._keep_alive_count = 0
297
        self._startup = False
298 1
        result.NotificationMessage.SequenceNumber = self._notification_seq
299 1
        if len(result.NotificationMessage.NotificationData) != 0:
300
            self._notification_seq += 1
301
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
302
        result.MoreNotifications = False
303
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
304
        return result
305 1
306 1
    def _pop_triggered_datachanges(self, result):
307 1
        if self._triggered_datachanges:
308 1
            notif = ua.DataChangeNotification()
309 1
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
310 1
            self._triggered_datachanges = {}
311 1
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
312
            result.NotificationMessage.NotificationData.append(notif)
313 1
314
    def _pop_triggered_events(self, result):
315
        if self._triggered_events:
316
            notif = ua.EventNotificationList()
317
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
318
            self._triggered_events = {}
319
            result.NotificationMessage.NotificationData.append(notif)
320
            self.logger.debug("sending event notification with %s events", len(notif.Events))
321
322
    def _pop_triggered_statuschanges(self, result):
323 1
        if self._triggered_statuschanges:
324 1
            notif = ua.StatusChangeNotification()
325
            notif.Status = self._triggered_statuschanges.pop(0)
326 1
            result.NotificationMessage.NotificationData.append(notif)
327 1
            self.logger.debug("sending event notification %s", notif.Status)
328
329 1
    def publish(self, acks):
330
        self.logger.info("publish request with acks %s", acks)
331
        with self._lock:
332 1
            self._publish_cycles_count = 0
333 1
            for nb in acks:
334 1
                if nb in self._not_acknowledged_results:
335 1
                    self._not_acknowledged_results.pop(nb)
336 1
337
    def republish(self, nb):
338
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
339 1
        with self._lock:
340
            if nb in self._not_acknowledged_results:
341
                self.logger.info("re-publishing ack %s in subscription %s", nb, self)
342
                return self._not_acknowledged_results[nb].NotificationMessage
343
            else:
344
                self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
345
                return ua.NotificationMessage()
346
347
    def enqueue_datachange_event(self, mid, eventdata, maxsize):
348
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
349
350
    def enqueue_event(self, mid, eventdata, maxsize):
351
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
352
353
    def enqueue_statuschange(self, code):
354
        self._triggered_statuschanges.append(code)
355
356
    def _enqueue_event(self, mid, eventdata, size, queue):
357
        if mid not in queue:
358
            queue[mid] = [eventdata]
359
            return
360
        if size != 0:
361
            if len(queue[mid]) >= size:
362
                queue[mid].pop(0)
363
        queue[mid].append(eventdata)
364