Completed
Pull Request — master (#184)
by
unknown
03:58
created

MonitoredItemService._modify_monitored_item()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 4

Importance

Changes 3
Bugs 0 Features 1
Metric Value
cc 4
dl 0
loc 15
ccs 12
cts 12
cp 1
crap 4
rs 9.2
c 3
b 0
f 1
1
"""
2
server side implementation of a subscription object
3
"""
4
5 1
from threading import RLock
6 1
import logging
7 1
import copy
8
9 1
from opcua import ua
10
11
12 1
class MonitoredItemData(object):
13
14 1
    def __init__(self):
15 1
        self.client_handle = None
16 1
        self.callback_handle = None
17 1
        self.monitored_item_id = None
18 1
        self.parameters = None
19 1
        self.mode = None
20 1
        self.mfilter = None
21
        self.mvalue = MonitoredItemValues()
22
        self.where_clause_evaluator = None
23 1
24
class MonitoredItemValues(object):
25
26
    def __init__(self):
27
        self.current_value = None
28
        self.old_value = None
29 1
30 1
    def set_current_value(self, cur_val):
31 1
        self.old_value = self.current_value
32 1
        self.current_value = cur_val
33 1
34 1
    def get_current_value(self):
35 1
        return self.current_value
36 1
37 1
    def get_old_value(self):
38
        return self.old_value
39 1
40 1
class MonitoredItemService(object):
41
42 1
    """
43 1
    implement monitoreditem service for 1 subscription
44 1
    """
45 1
46 1
    def __init__(self, isub, aspace):
47
        self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
48 1
        self.isub = isub
49
        self.aspace = aspace
50
        self._lock = RLock()
51
        self._monitored_items = {}
52
        self._monitored_events = {}
53
        self._monitored_datachange = {}
54 1
        self._monitored_item_counter = 111
55 1
56 1
    def delete_all_monitored_items(self):
57 1
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
58
59 1
    def create_monitored_items(self, params):
60
        results = []
61
        for item in params.ItemsToCreate:
62
            with self._lock:
63
                if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
64
                    result = self._create_events_monitored_item(item)
65
                else:
66
                    result = self._create_data_change_monitored_item(item)
67
            results.append(result)
68
        return results
69
70
    def modify_monitored_items(self, params):
71
        results = []
72
        for item in params.ItemsToModify:
73
            results.append(self._modify_monitored_item(item))
74 1
        return results
75 1
76 1
    def trigger_datachange(self, handle, nodeid, attr):
77 1
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
78 1
        variant = self.aspace.get_attribute_value(nodeid, attr)
79 1
        self.datachange_callback(handle, variant)
80 1
81 1
    def _modify_monitored_item(self, params):
82
        with self._lock:
83 1
            for mdata in self._monitored_items.values():
84 1
                result = ua.MonitoredItemModifyResult()
85 1
                if mdata.monitored_item_id == params.MonitoredItemId:
86 1
                    self.isub.data.RevisedPublishingInterval = params.RequestedParameters.SamplingInterval
87 1
                    result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
88 1
                    result.RevisedQueueSize = params.RequestedParameters.QueueSize
89
                    result.FilterResult = params.RequestedParameters.Filter
90 1
                    mdata.mfilter = result.FilterResult
91
                    mdata.parameters = result
92 1
                    return result
93 1
            result = ua.MonitoredItemModifyResult()
94 1
            result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
95 1
            return result
96 1
97 1
    def _commit_monitored_item(self, result, mdata):
98
        if result.StatusCode.is_good():
99 1
            self._monitored_items[result.MonitoredItemId] = mdata
100 1
            self._monitored_item_counter += 1
101 1
102 1
    def _make_monitored_item_common(self, params):
103
        result = ua.MonitoredItemCreateResult()
104 1
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
105
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
106 1
        self._monitored_item_counter += 1
107 1
        result.MonitoredItemId = self._monitored_item_counter
108 1
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
109 1
110 1
        mdata = MonitoredItemData()
111 1
        mdata.parameters = result
112 1
        mdata.mode = params.MonitoringMode
113
        mdata.client_handle = params.RequestedParameters.ClientHandle
114 1
        mdata.mfilter = params.RequestedParameters.Filter
115
        mdata.monitored_item_id = result.MonitoredItemId
116 1
117 1
        return result, mdata
118 1
119
    def _create_events_monitored_item(self, params):
120 1
        self.logger.info("request to subscribe to events for node %s and attribute %s",
121
                         params.ItemToMonitor.NodeId,
122 1
                         params.ItemToMonitor.AttributeId)
123 1
124 1
        result, mdata = self._make_monitored_item_common(params)
125 1
        ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
126 1
        if ev_notify_byte is None or ev_notify_byte & 1 == 0:
127 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
128 1
            return result
129
        result.FilterResult = ua.EventFilterResult()
130 1
        for _ in params.RequestedParameters.Filter.SelectClauses:
131 1
            result.FilterResult.SelectClauseResults.append(ua.StatusCode())
132 1
        # TODO: spec says we should check WhereClause here
133 1
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.mfilter.WhereClause)
134 1
        self._commit_monitored_item(result, mdata)
135 1
        if params.ItemToMonitor.NodeId not in self._monitored_events:
136 1
            self._monitored_events[params.ItemToMonitor.NodeId] = []
137 1
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
138 1
        return result
139 1
140 1
    def _create_data_change_monitored_item(self, params):
141 1
        self.logger.info("request to subscribe to datachange for node %s and attribute %s",
142 1
                         params.ItemToMonitor.NodeId,
143 1
                         params.ItemToMonitor.AttributeId)
144
145 1
        result, mdata = self._make_monitored_item_common(params)
146 1
        result.FilterResult = params.RequestedParameters.Filter
147
        result.StatusCode, handle = self.aspace.add_datachange_callback(params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
148
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
149
        mdata.callback_handle = handle
150 1
        self._commit_monitored_item(result, mdata)
151 1
        if result.StatusCode.is_good():
152 1
            self._monitored_datachange[handle] = result.MonitoredItemId
153 1
            # force data change event generation
154 1
            self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
155 1
        return result
156 1
157 1
    def delete_monitored_items(self, ids):
158
        self.logger.debug("delete monitored items %s", ids)
159 1
        with self._lock:
160 1
            results = []
161 1
            for mid in ids:
162 1
                results.append(self._delete_monitored_items(mid))
163 1
            return results
164 1
165 1
    def _delete_monitored_items(self, mid):
166 1
        if mid not in self._monitored_items:
167
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
168
        for k, v in self._monitored_events.items():
169 1
            if mid in v:
170 1
                v.remove(mid)
171 1
                if not v:
172 1
                    self._monitored_events.pop(k)
173 1
                break
174 1
        for k, v in self._monitored_datachange.items():
175
            if v == mid:
176 1
                self.aspace.delete_datachange_callback(k)
177 1
                self._monitored_datachange.pop(k)
178 1
                break
179 1
        self._monitored_items.pop(mid)
180 1
        return ua.StatusCode()
181
182
    def datachange_callback(self, handle, value, error=None):
183
        if error:
184
            self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self,
185
                             handle, error)
186 1
            self.trigger_statuschange(error)
187 1
        else:
188 1
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
189 1
                             handle, value.Value)
190
            event = ua.MonitoredItemNotification()
191
            with self._lock:
192 1
                mid = self._monitored_datachange[handle]
193
                mdata = self._monitored_items[mid]
194 1
                mdata.mvalue.set_current_value(value.Value.Value)
195
                if mdata.mfilter != None:
196
                    deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.mfilter)
197
                else:
198 1
                    deadband_flag_pass = True
199
                if deadband_flag_pass:
200 1
                    event.ClientHandle = mdata.client_handle
201 1
                    event.Value = value
202 1
                    self.isub.enqueue_datachange_event(mid, event, mdata.parameters.RevisedQueueSize)
203 1
204 1
    def deadband_callback(self, values, filter):
205 1
        if (values.get_old_value() == None) or ((abs(values.get_current_value() - values.get_old_value())) > filter.DeadbandValue):
206 1
            return True
207 1
        else:
208 1
            return False
209 1
210 1
    def trigger_event(self, event):
211 1
        with self._lock:
212 1
            if event.SourceNode not in self._monitored_events:
213 1
                self.logger.debug("%s has no subscription for events %s from node: %s",
214 1
                                  self, event, event.SourceNode)
215 1
                return False
216 1
            self.logger.debug("%s has subscription for events %s from node: %s",
217 1
                              self, event, event.SourceNode)
218
            mids = self._monitored_events[event.SourceNode]
219 1
            for mid in mids:
220
                self._trigger_event(event, mid)
221
222 1
    def _trigger_event(self, event, mid):
223 1
        if mid not in self._monitored_items:
224 1
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
225
            return
226 1
        mdata = self._monitored_items[mid]
227 1
        if not mdata.where_clause_evaluator.eval(event):
228 1
            self.logger.debug("Event does not fit WhereClause, not generating event", mid, event, self)
229 1
            return
230
        fieldlist = ua.EventFieldList()
231 1
        fieldlist.ClientHandle = mdata.client_handle
232
        fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event)
233 1
        self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize)
234 1
235
    def _get_event_fields(self, evfilter, event):
236 1
        fields = []
237 1
        for sattr in evfilter.SelectClauses:
238 1
            try:
239 1
                if not sattr.BrowsePath:
240 1
                    val = getattr(event, sattr.Attribute.name)
241
                    val = copy.deepcopy(val)
242 1
                    fields.append(ua.Variant(val))
243 1
                else:
244 1
                    name = sattr.BrowsePath[0].Name
245 1
                    val = getattr(event, name)
246 1
                    val = copy.deepcopy(val)
247
                    fields.append(ua.Variant(val))
248
            except AttributeError:
249 1
                fields.append(ua.Variant())
250 1
        return fields
251
252 1
    def trigger_statuschange(self, code):
253 1
        self.isub.enqueue_statuschange(code)
254
255
256
class InternalSubscription(object):
257
258 1
    def __init__(self, subservice, data, addressspace, callback):
259 1
        self.logger = logging.getLogger(__name__ + "." + str(data.SubscriptionId))
260 1
        self.aspace = addressspace
261 1
        self.subservice = subservice
262 1
        self.data = data
263 1
        self.callback = callback
264 1
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
265
        self.task = None
266 1
        self._lock = RLock()
267 1
        self._triggered_datachanges = {}
268 1
        self._triggered_events = {}
269 1
        self._triggered_statuschanges = []
270 1
        self._notification_seq = 1
271 1
        self._not_acknowledged_results = {}
272 1
        self._startup = True
273 1
        self._keep_alive_count = 0
274 1
        self._publish_cycles_count = 0
275 1
        self._stopev = False
276 1
277 1
    def __str__(self):
278 1
        return "Subscription(id:{})".format(self.data.SubscriptionId)
279 1
280 1
    def start(self):
281
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
282 1
        self._subscription_loop()
283 1
284 1
    def stop(self):
285 1
        self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
286 1
        self._stopev = True
287 1
        self.monitored_item_srv.delete_all_monitored_items()
288 1
289
    def _subscription_loop(self):
290 1
        #self.logger.debug("%s loop", self)
291 1
        if not self._stopev:
292 1
            self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
293 1
294 1
    def _sub_loop(self):
295 1
        if self._stopev:
296 1
            return
297
        self.publish_results()
298 1
        self._subscription_loop()
299 1
300
    def has_published_results(self):
301
        with self._lock:
302
            if self._startup or self._triggered_datachanges or self._triggered_events:
303
                return True
304
            if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
305 1
                self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event", self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
306 1
                return True
307 1
            self._keep_alive_count += 1
308 1
            return False
309 1
310 1
    def publish_results(self):
311 1
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
312
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
313 1
            # FIXME this will never be send since we do not have publish request anyway
314
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
315
            self._stopev = True
316
        result = None
317
        with self._lock:
318
            if self.has_published_results():  # FIXME: should we pop a publish request here? or we do not care?
319
                self._publish_cycles_count += 1
320
                result = self._pop_publish_result()
321
        if result is not None:
322
            self.callback(result)
323 1
324 1
    def _pop_publish_result(self):
325
        result = ua.PublishResult()
326 1
        result.SubscriptionId = self.data.SubscriptionId
327 1
        self._pop_triggered_datachanges(result)
328
        self._pop_triggered_events(result)
329 1
        self._pop_triggered_statuschanges(result)
330
        self._keep_alive_count = 0
331
        self._startup = False
332 1
        result.NotificationMessage.SequenceNumber = self._notification_seq
333 1
        if len(result.NotificationMessage.NotificationData) != 0:
334 1
            self._notification_seq += 1
335 1
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
336 1
        result.MoreNotifications = False
337
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
338
        return result
339 1
340
    def _pop_triggered_datachanges(self, result):
341
        if self._triggered_datachanges:
342
            notif = ua.DataChangeNotification()
343
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
344
            self._triggered_datachanges = {}
345
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
346
            result.NotificationMessage.NotificationData.append(notif)
347
348
    def _pop_triggered_events(self, result):
349
        if self._triggered_events:
350
            notif = ua.EventNotificationList()
351
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
352
            self._triggered_events = {}
353
            result.NotificationMessage.NotificationData.append(notif)
354
            self.logger.debug("sending event notification with %s events", len(notif.Events))
355
356
    def _pop_triggered_statuschanges(self, result):
357
        if self._triggered_statuschanges:
358
            notif = ua.StatusChangeNotification()
359
            notif.Status = self._triggered_statuschanges.pop(0)
360
            result.NotificationMessage.NotificationData.append(notif)
361
            self.logger.debug("sending event notification %s", notif.Status)
362
363
    def publish(self, acks):
364
        self.logger.info("publish request with acks %s", acks)
365
        with self._lock:
366
            self._publish_cycles_count = 0
367
            for nb in acks:
368
                if nb in self._not_acknowledged_results:
369
                    self._not_acknowledged_results.pop(nb)
370
371
    def republish(self, nb):
372
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
373
        with self._lock:
374
            if nb in self._not_acknowledged_results:
375
                self.logger.info("re-publishing ack %s in subscription %s", nb, self)
376
                return self._not_acknowledged_results[nb].NotificationMessage
377
            else:
378
                self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
379
                return ua.NotificationMessage()
380
381
    def enqueue_datachange_event(self, mid, eventdata, maxsize):
382
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
383
384
    def enqueue_event(self, mid, eventdata, maxsize):
385
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
386
387
    def enqueue_statuschange(self, code):
388
        self._triggered_statuschanges.append(code)
389
390
    def _enqueue_event(self, mid, eventdata, size, queue):
391
        if mid not in queue:
392
            queue[mid] = [eventdata]
393
            return
394
        if size != 0:
395
            if len(queue[mid]) >= size:
396
                queue[mid].pop(0)
397
        queue[mid].append(eventdata)
398
399
400
class WhereClauseEvaluator(object):
401
    def __init__(self, logger, aspace, whereclause):
402
        self.logger = logger
403
        self.elements = whereclause.Elements
404
        self._aspace = aspace
405
406
    def eval(self, event):
407
        if not self.elements:
408
            return True
409
        # spec says we should only evaluate first element, which may use other elements
410
        try:
411
            res = self._eval_el(0, event)
412
        except Exception as ex:
413
            self.logger.warning("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event, ex)
414
            return False
415
        return res
416
417
    def _eval_el(self, index, event):
418
        el = self.elements[index]
419
        #ops = [self._eval_op(op, event) for op in el.FilterOperands]
420
        ops = el.FilterOperands  # just to make code more readable
421
        if el.FilterOperator == ua.FilterOperator.Equals:
422
            return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
423
        elif el.FilterOperator == ua.FilterOperator.IsNull:
424
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
425
        elif el.FilterOperator == ua.FilterOperator.GreaterThan:
426
            return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
427
        elif el.FilterOperator == ua.FilterOperator.LessThan:
428
            return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
429
        elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
430
            return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
431
        elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
432
            return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
433
        elif el.FilterOperator == ua.FilterOperator.Like:
434
            return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
435
        elif el.FilterOperator == ua.FilterOperator.Not:
436
            return not self._eval_op(ops[0], event)
437
        elif el.FilterOperator == ua.FilterOperator.Between:
438
            return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
439
        elif el.FilterOperator == ua.FilterOperator.InList:
440
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
441
        elif el.FilterOperator == ua.FilterOperator.And:
442
            self.elements(ops[0].Index)
443
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
444
        elif el.FilterOperator == ua.FilterOperator.Or:
445
            return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
446
        elif el.FilterOperator == ua.FilterOperator.Cast:
447
            self.logger.warn("Cast operand not implemented, assuming True")
448
            return True
449
        elif el.FilterOperator == ua.FilterOperator.OfType:
450
            self.logger.warn("OfType operand not implemented, assuming True")
451
            return True
452
        else:
453
            # TODO: implement missing operators
454
            print("WhereClause not implemented for element: %s", el)
455
            raise NotImplementedError
456
457
    def _like_operator(self, string, pattern):
458
        raise NotImplementedError
459
460
    def _eval_op(self, op, event):
461
        # seems spec says we should return Null if issues
462
        if type(op) is ua.ElementOperand:
463
            return self._eval_el(op.Index, event)
464
        elif type(op) is ua.AttributeOperand:
465
            if op.BrowsePath:
466
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
467
            else:
468
                return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
469
            # FIXME: check, this is probably broken
470
        elif type(op) is ua.SimpleAttributeOperand:
471
            if op.BrowsePath:
472
                # we only support depth of 1
473
                return getattr(event, op.BrowsePath[0].Name)
474
            else:
475
                # TODO: write code for index range.... but doe it make any sense
476
                return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
477
        elif type(op) is ua.LiteralOperand:
478
            return op.Value.Value
479
        else:
480
            self.logger.warning("Where clause element % is not of a known type", op)
481
            raise NotImplementedError
482
483
484
485