Completed
Pull Request — master (#509)
by
unknown
03:15
created

MonitoredItemData   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 11
Duplicated Lines 0 %

Test Coverage

Coverage 20%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 11
ccs 2
cts 10
cp 0.2
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 9 1
1
"""
2
server side implementation of a subscription object
3
"""
4
5 1
from threading import RLock
6 1
import logging
7
# import copy
8
# import traceback
9
10 1
from opcua import ua
11
12
13 1
class MonitoredItemData(object):
14
15 1
    def __init__(self):
16
        self.client_handle = None
17
        self.callback_handle = None
18
        self.monitored_item_id = None
19
        self.mode = None
20
        self.filter = None
21
        self.mvalue = MonitoredItemValues()
22
        self.where_clause_evaluator = None
23
        self.queue_size = 0
24
25
26 1
class MonitoredItemValues(object):
27
28 1
    def __init__(self):
29
        self.current_value = None
30
        self.old_value = None
31
32 1
    def set_current_value(self, cur_val):
33
        self.old_value = self.current_value
34
        self.current_value = cur_val
35
36 1
    def get_current_value(self):
37
        return self.current_value
38
39 1
    def get_old_value(self):
40
        return self.old_value
41
42
43 1
class MonitoredItemService(object):
44
45
    """
46
    implement monitoreditem service for 1 subscription
47
    """
48
49 1
    def __init__(self, isub, aspace):
50
        self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
51
        self.isub = isub
52
        self.aspace = aspace
53
        self._lock = RLock()
54
        self._monitored_items = {}
55
        self._monitored_events = {}
56
        self._monitored_datachange = {}
57
        self._monitored_item_counter = 111
58
59 1
    def delete_all_monitored_items(self):
60
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
61
62 1
    def create_monitored_items(self, params):
63
        results = []
64
        for item in params.ItemsToCreate:
65
            with self._lock:
66
                if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
67
                    result = self._create_events_monitored_item(item)
68
                else:
69
                    result = self._create_data_change_monitored_item(item)
70
            results.append(result)
71
        return results
72
73 1
    def modify_monitored_items(self, params):
74
        results = []
75
        for item in params.ItemsToModify:
76
            results.append(self._modify_monitored_item(item))
77
        return results
78
79 1
    def trigger_datachange(self, handle, nodeid, attr):
80
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
81
        variant = self.aspace.get_attribute_value(nodeid, attr)
82
        self.datachange_callback(handle, variant)
83
84 1
    def _modify_monitored_item(self, params):
85
        with self._lock:
86
            for mdata in self._monitored_items.values():
87
                result = ua.MonitoredItemModifyResult()
88
                if mdata.monitored_item_id == params.MonitoredItemId:
89
                    result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
90
                    result.RevisedQueueSize = params.RequestedParameters.QueueSize
91
                    if params.RequestedParameters.Filter is not None:
92
                        mdata.filter = params.RequestedParameters.Filter
93
                    mdata.queue_size = params.RequestedParameters.QueueSize
94
                    return result
95
            result = ua.MonitoredItemModifyResult()
96
            result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
97
            return result
98
99 1
    def _commit_monitored_item(self, result, mdata):
100
        if result.StatusCode.is_good():
101
            self._monitored_items[result.MonitoredItemId] = mdata
102
            self._monitored_item_counter += 1
103
104 1
    def _make_monitored_item_common(self, params):
105
        result = ua.MonitoredItemCreateResult()
106
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
107
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
108
        self._monitored_item_counter += 1
109
        result.MonitoredItemId = self._monitored_item_counter
110
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
111
112
        mdata = MonitoredItemData()
113
        mdata.mode = params.MonitoringMode
114
        mdata.client_handle = params.RequestedParameters.ClientHandle
115
        mdata.monitored_item_id = result.MonitoredItemId
116
        mdata.queue_size = params.RequestedParameters.QueueSize
117
        mdata.filter = params.RequestedParameters.Filter
118
119
        return result, mdata
120
121 1
    def _create_events_monitored_item(self, params):
122
        self.logger.info("request to subscribe to events for node %s and attribute %s",
123
                         params.ItemToMonitor.NodeId,
124
                         params.ItemToMonitor.AttributeId)
125
126
        result, mdata = self._make_monitored_item_common(params)
127
        ev_notify_byte = self.aspace.get_attribute_value(
128
            params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
129
130
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
131
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
132
            return result
133
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
134
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
135
        self._commit_monitored_item(result, mdata)
136
        if params.ItemToMonitor.NodeId not in self._monitored_events:
137
            self._monitored_events[params.ItemToMonitor.NodeId] = []
138
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
139
        return result
140
141 1
    def _create_data_change_monitored_item(self, params):
142
        self.logger.info("request to subscribe to datachange for node %s and attribute %s",
143
                         params.ItemToMonitor.NodeId,
144
                         params.ItemToMonitor.AttributeId)
145
146
        result, mdata = self._make_monitored_item_common(params)
147
        result.FilterResult = params.RequestedParameters.Filter
148
        result.StatusCode, handle = self.aspace.add_datachange_callback(
149
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
150
151
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
152
        mdata.callback_handle = handle
153
        self._commit_monitored_item(result, mdata)
154
        if result.StatusCode.is_good():
155
            self._monitored_datachange[handle] = result.MonitoredItemId
156
            # force data change event generation
157
            self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
158
        return result
159
160 1
    def delete_monitored_items(self, ids):
161
        self.logger.debug("delete monitored items %s", ids)
162
        with self._lock:
163
            results = []
164
            for mid in ids:
165
                results.append(self._delete_monitored_items(mid))
166
            return results
167
168 1
    def _delete_monitored_items(self, mid):
169
        if mid not in self._monitored_items:
170
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
171
        for k, v in self._monitored_events.items():
172
            if mid in v:
173
                v.remove(mid)
174
                if not v:
175
                    self._monitored_events.pop(k)
176
                break
177
        for k, v in self._monitored_datachange.items():
178
            if v == mid:
179
                self.aspace.delete_datachange_callback(k)
180
                self._monitored_datachange.pop(k)
181
                break
182
        self._monitored_items.pop(mid)
183
        return ua.StatusCode()
184
185 1
    def datachange_callback(self, handle, value, error=None):
186
        if error:
187
            self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
188
                             handle, error)
189
            self.trigger_statuschange(error)
190
        else:
191
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
192
                             handle, value.Value)
193
            event = ua.MonitoredItemNotification()
194
            with self._lock:
195
                mid = self._monitored_datachange[handle]
196
                mdata = self._monitored_items[mid]
197
                mdata.mvalue.set_current_value(value.Value.Value)
198
                if mdata.filter:
199
                    deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
200
                else:
201
                    deadband_flag_pass = True
202
                if deadband_flag_pass:
203
                    event.ClientHandle = mdata.client_handle
204
                    event.Value = value
205
                    self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
206
207 1
    def deadband_callback(self, values, flt):
208
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
209
            return True
210
        elif flt.DeadbandType == ua.DeadbandType.Absolute and \
211
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
212
            return True
213
        elif flt.DeadbandType == ua.DeadbandType.Percent:
214
            self.logger.warn("DeadbandType Percent is not implemented !")
215
            return True
216
        else:
217
            return False
218
219 1
    def trigger_event(self, event):
220
        with self._lock:
221
            if event.SourceNode not in self._monitored_events:
222
                self.logger.debug("%s has no subscription for events %s from node: %s",
223
                                  self, event, event.SourceNode)
224
                return False
225
            self.logger.debug("%s has subscription for events %s from node: %s",
226
                              self, event, event.SourceNode)
227
            mids = self._monitored_events[event.SourceNode]
228
            for mid in mids:
229
                self._trigger_event(event, mid)
230
231 1
    def _trigger_event(self, event, mid):
232
        if mid not in self._monitored_items:
233
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s",
234
                              mid, event, self)
235
            return
236
        mdata = self._monitored_items[mid]
237
        if not mdata.where_clause_evaluator.eval(event):
238
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
239
            return
240
        fieldlist = ua.EventFieldList()
241
        fieldlist.ClientHandle = mdata.client_handle
242
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
243
        self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
244
245 1
    def trigger_statuschange(self, code):
246
        self.isub.enqueue_statuschange(code)
247
248
249 1
class InternalSubscription(object):
250
251 1
    def __init__(self, subservice, data, addressspace, callback):
252
        self.logger = logging.getLogger(__name__)
253
        self.aspace = addressspace
254
        self.subservice = subservice
255
        self.data = data
256
        self.callback = callback
257
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
258
        self.task = None
259
        self._lock = RLock()
260
        self._triggered_datachanges = {}
261
        self._triggered_events = {}
262
        self._triggered_statuschanges = []
263
        self._notification_seq = 1
264
        self._not_acknowledged_results = {}
265
        self._startup = True
266
        self._keep_alive_count = 0
267
        self._publish_cycles_count = 0
268
        self._stopev = False
269
270 1
    def __str__(self):
271
        return "Subscription(id:{0})".format(self.data.SubscriptionId)
272
273 1
    def start(self):
274
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
275
        self._subscription_loop()
276
277 1
    def stop(self):
278
        self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
279
        self._stopev = True
280
        self.monitored_item_srv.delete_all_monitored_items()
281
282 1
    def _subscription_loop(self):
283
        if not self._stopev:
284
            self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
285
286 1
    def _sub_loop(self):
287
        if self._stopev:
288
            return
289
        self.publish_results()
290
        self._subscription_loop()
291
292 1
    def has_published_results(self):
293
        with self._lock:
294
            if self._startup or self._triggered_datachanges or self._triggered_events:
295
                return True
296
            if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
297
                self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event",
298
                                  self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
299
                return True
300
            self._keep_alive_count += 1
301
            return False
302
303 1
    def publish_results(self):
304
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
305
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)",
306
                                self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
307
            # FIXME this will never be send since we do not have publish request anyway
308
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
309
            self._stopev = True
310
        result = None
311
        with self._lock:
312
            if self.has_published_results():
313
                # FIXME: should we pop a publish request here? or we do not care?
314
                self._publish_cycles_count += 1
315
                result = self._pop_publish_result()
316
        if result is not None:
317
            self.callback(result)
318
319 1
    def _pop_publish_result(self):
320
        result = ua.PublishResult()
321
        result.SubscriptionId = self.data.SubscriptionId
322
        self._pop_triggered_datachanges(result)
323
        self._pop_triggered_events(result)
324
        self._pop_triggered_statuschanges(result)
325
        self._keep_alive_count = 0
326
        self._startup = False
327
        result.NotificationMessage.SequenceNumber = self._notification_seq
328
        if len(result.NotificationMessage.NotificationData) != 0:
329
            self._notification_seq += 1
330
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
331
        result.MoreNotifications = False
332
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
333
        return result
334
335 1
    def _pop_triggered_datachanges(self, result):
336
        if self._triggered_datachanges:
337
            notif = ua.DataChangeNotification()
338
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
339
            self._triggered_datachanges = {}
340
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
341
            result.NotificationMessage.NotificationData.append(notif)
342
343 1
    def _pop_triggered_events(self, result):
344
        if self._triggered_events:
345
            notif = ua.EventNotificationList()
346
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
347
            self._triggered_events = {}
348
            result.NotificationMessage.NotificationData.append(notif)
349
            self.logger.debug("sending event notification with %s events", len(notif.Events))
350
351 1
    def _pop_triggered_statuschanges(self, result):
352
        if self._triggered_statuschanges:
353
            notif = ua.StatusChangeNotification()
354
            notif.Status = self._triggered_statuschanges.pop(0)
355
            result.NotificationMessage.NotificationData.append(notif)
356
            self.logger.debug("sending event notification %s", notif.Status)
357
358 1
    def publish(self, acks):
359
        self.logger.info("publish request with acks %s", acks)
360
        with self._lock:
361
            self._publish_cycles_count = 0
362
            for nb in acks:
363
                if nb in self._not_acknowledged_results:
364
                    self._not_acknowledged_results.pop(nb)
365
366 1
    def republish(self, nb):
367
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
368
        with self._lock:
369
            if nb in self._not_acknowledged_results:
370
                self.logger.info("re-publishing ack %s in subscription %s", nb, self)
371
                return self._not_acknowledged_results[nb].NotificationMessage
372
            else:
373
                self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
374
                return ua.NotificationMessage()
375
376 1
    def enqueue_datachange_event(self, mid, eventdata, maxsize):
377
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
378
379 1
    def enqueue_event(self, mid, eventdata, maxsize):
380
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
381
382 1
    def enqueue_statuschange(self, code):
383
        self._triggered_statuschanges.append(code)
384
385 1
    def _enqueue_event(self, mid, eventdata, size, queue):
386
        if mid not in queue:
387
            queue[mid] = [eventdata]
388
            return
389
        if size != 0:
390
            if len(queue[mid]) >= size:
391
                queue[mid].pop(0)
392
        queue[mid].append(eventdata)
393
394
395 1
class WhereClauseEvaluator(object):
396 1
    def __init__(self, logger, aspace, whereclause):
397 1
        self.logger = logger
398 1
        self.elements = whereclause.Elements
399 1
        self._aspace = aspace
400
401 1
    def eval(self, event):
402 1
        if not self.elements:
403
            return True
404
        # spec says we should only evaluate first element, which may use other elements
405 1
        try:
406 1
            res = self._eval_el(0, event)
407
        except Exception as ex:
408
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s",
409
                                  self.elements, event, ex)
410
            return False
411 1
        return res
412
413 1
    def _eval_el(self, index, event):
414 1
        el = self.elements[index]
415
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
416 1
        ops = el.FilterOperands  # just to make code more readable
417 1
        if el.FilterOperator == ua.FilterOperator.Equals:
418
            return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
419 1
        elif el.FilterOperator == ua.FilterOperator.IsNull:
420
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
421 1
        elif el.FilterOperator == ua.FilterOperator.GreaterThan:
422
            return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
423 1
        elif el.FilterOperator == ua.FilterOperator.LessThan:
424
            return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
425 1
        elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
426
            return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
427 1
        elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
428
            return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
429 1
        elif el.FilterOperator == ua.FilterOperator.Like:
430
            return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
431 1
        elif el.FilterOperator == ua.FilterOperator.Not:
432
            return not self._eval_op(ops[0], event)
433 1
        elif el.FilterOperator == ua.FilterOperator.Between:
434
            return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
435 1
        elif el.FilterOperator == ua.FilterOperator.InList:
436 1
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
437
        elif el.FilterOperator == ua.FilterOperator.And:
438
            self.elements(ops[0].Index)
439
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
440
        elif el.FilterOperator == ua.FilterOperator.Or:
441
            return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
442
        elif el.FilterOperator == ua.FilterOperator.Cast:
443
            self.logger.warn("Cast operand not implemented, assuming True")
444
            return True
445
        elif el.FilterOperator == ua.FilterOperator.OfType:
446
            return event.EventType == self._eval_op(ops[0], event)
447
        else:
448
            # TODO: implement missing operators
449
            self.logger.warning("WhereClause not implemented for element: %s", el)
450
            raise NotImplementedError
451
452 1
    def _like_operator(self, string, pattern):
453
        raise NotImplementedError
454
455 1
    def _eval_op(self, op, event):
456
        # seems spec says we should return Null if issues
457 1
        if type(op) is ua.ElementOperand:
458
            return self._eval_el(op.Index, event)
459 1
        elif type(op) is ua.AttributeOperand:
460
            if op.BrowsePath:
461
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
462
            else:
463
                return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
464
            # FIXME: check, this is probably broken
465 1
        elif type(op) is ua.SimpleAttributeOperand:
466 1
            if op.BrowsePath:
467
                # we only support depth of 1
468 1
                return getattr(event, op.BrowsePath[0].Name)
469
            else:
470
                # TODO: write code for index range.... but doe it make any sense
471
                return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
472 1
        elif type(op) is ua.LiteralOperand:
473 1
            return op.Value.Value
474
        else:
475
            self.logger.warning("Where clause element % is not of a known type", op)
476
            raise NotImplementedError
477
478
479
480