Passed
Pull Request — master (#45)
by
unknown
02:35
created

MonitoredItemService.deadband_callback()   B

Complexity

Conditions 6

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 10
nop 3
dl 0
loc 10
rs 8.6666
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from .address_space import AddressSpace
8
9
10
class MonitoredItemData:
11
    def __init__(self):
12
        self.client_handle = None
13
        self.callback_handle = None
14
        self.monitored_item_id = None
15
        self.mode = None
16
        self.filter = None
17
        self.mvalue = MonitoredItemValues()
18
        self.where_clause_evaluator = None
19
        self.queue_size = 0
20
21
22
class MonitoredItemValues:
23
    def __init__(self):
24
        self.current_value = None
25
        self.old_value = None
26
27
    def set_current_value(self, cur_val):
28
        self.old_value = self.current_value
29
        self.current_value = cur_val
30
31
    def get_current_value(self):
32
        return self.current_value
33
34
    def get_old_value(self):
35
        return self.old_value
36
37
38
class MonitoredItemService:
39
    """
40
    Implements monitored item service for one subscription
41
    """
42
43
    def __init__(self, isub, aspace: AddressSpace):
44
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
45
        self.isub = isub
46
        self.aspace: AddressSpace = aspace
47
        self._monitored_items = {}
48
        self._monitored_events = {}
49
        self._monitored_datachange = {}
50
        self._monitored_item_counter = 111
51
52
    def __str__(self):
53
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
54
55
    def delete_all_monitored_items(self):
56
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
57
58
    async def create_monitored_items(self, params):
59
        results = []
60
        for item in params.ItemsToCreate:
61
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
62
                result = self._create_events_monitored_item(item)
63
            else:
64
                result = self._create_data_change_monitored_item(item)
65
            results.append(result)
66
        return results
67
68
    def modify_monitored_items(self, params):
69
        results = []
70
        for item in params.ItemsToModify:
71
            results.append(self._modify_monitored_item(item))
72
        return results
73
74
    def trigger_datachange(self, handle, nodeid, attr):
75
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
76
        variant = self.aspace.get_attribute_value(nodeid, attr)
77
        self.datachange_callback(handle, variant)
78
79
    def _modify_monitored_item(self, params):
80
        for mdata in self._monitored_items.values():
81
            result = ua.MonitoredItemModifyResult()
82
            if mdata.monitored_item_id == params.MonitoredItemId:
83
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
84
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
85
                if params.RequestedParameters.Filter is not None:
86
                    mdata.filter = params.RequestedParameters.Filter
87
                mdata.queue_size = params.RequestedParameters.QueueSize
88
                return result
89
        result = ua.MonitoredItemModifyResult()
90
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
91
        return result
92
93
    def _commit_monitored_item(self, result, mdata):
94
        if result.StatusCode.is_good():
95
            self._monitored_items[result.MonitoredItemId] = mdata
96
            self._monitored_item_counter += 1
97
98
    def _make_monitored_item_common(self, params):
99
        result = ua.MonitoredItemCreateResult()
100
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
101
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
102
        self._monitored_item_counter += 1
103
        result.MonitoredItemId = self._monitored_item_counter
104
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
105
        mdata = MonitoredItemData()
106
        mdata.mode = params.MonitoringMode
107
        mdata.client_handle = params.RequestedParameters.ClientHandle
108
        mdata.monitored_item_id = result.MonitoredItemId
109
        mdata.queue_size = params.RequestedParameters.QueueSize
110
        mdata.filter = params.RequestedParameters.Filter
111
        return result, mdata
112
113
    def _create_events_monitored_item(self, params):
114
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
115
                         params.ItemToMonitor.AttributeId)
116
117
        result, mdata = self._make_monitored_item_common(params)
118
        ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId,
119
                                                         ua.AttributeIds.EventNotifier).Value.Value
120
121
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
122
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
123
            return result
124
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
125
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
126
        self._commit_monitored_item(result, mdata)
127
        if params.ItemToMonitor.NodeId not in self._monitored_events:
128
            self._monitored_events[params.ItemToMonitor.NodeId] = []
129
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
130
        return result
131
132
    def _create_data_change_monitored_item(self, params):
133
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
134
                         params.ItemToMonitor.AttributeId)
135
136
        result, mdata = self._make_monitored_item_common(params)
137
        result.FilterResult = params.RequestedParameters.Filter
138
        result.StatusCode, handle = self.aspace.add_datachange_callback(
139
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
140
141
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
142
        mdata.callback_handle = handle
143
        self._commit_monitored_item(result, mdata)
144
        if result.StatusCode.is_good():
145
            self._monitored_datachange[handle] = result.MonitoredItemId
146
            # force data change event generation
147
            self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
148
        return result
149
150
    def delete_monitored_items(self, ids):
151
        self.logger.debug("delete monitored items %s", ids)
152
        # with self._lock:
153
        results = []
154
        for mid in ids:
155
            results.append(self._delete_monitored_items(mid))
156
        return results
157
158
    def _delete_monitored_items(self, mid):
159
        if mid not in self._monitored_items:
160
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
161
        for k, v in self._monitored_events.items():
162
            if mid in v:
163
                v.remove(mid)
164
                if not v:
165
                    self._monitored_events.pop(k)
166
                break
167
        for k, v in self._monitored_datachange.items():
168
            if v == mid:
169
                self.aspace.delete_datachange_callback(k)
170
                self._monitored_datachange.pop(k)
171
                break
172
        self._monitored_items.pop(mid)
173
        return ua.StatusCode()
174
175
    def datachange_callback(self, handle, value, error=None):
176
        if error:
177
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
178
                             handle, error)
179
            self.trigger_statuschange(error)
180
        else:
181
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
182
                             handle, value.Value)
183
            event = ua.MonitoredItemNotification()
184
            mid = self._monitored_datachange[handle]
185
            mdata = self._monitored_items[mid]
186
            mdata.mvalue.set_current_value(value.Value.Value)
187
            if mdata.filter:
188
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
189
            else:
190
                deadband_flag_pass = True
191
            if deadband_flag_pass:
192
                event.ClientHandle = mdata.client_handle
193
                event.Value = value
194
                self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
195
196
    def deadband_callback(self, values, flt):
197
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
198
            return True
199
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
200
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
201
            return True
202
        if flt.DeadbandType == ua.DeadbandType.Percent:
203
            self.logger.warning("DeadbandType Percent is not implemented !")
204
            return True
205
        return False
206
207
    def trigger_event(self, event):
208
        if event.emitting_node not in self._monitored_events:
209
            self.logger.debug("%s has no subscription for events %s from node: %s", self, event, event.emitting_node)
210
            return False
211
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
212
        mids = self._monitored_events[event.emitting_node]
213
        for mid in mids:
214
            self._trigger_event(event, mid)
215
        return True
216
217
    def _trigger_event(self, event, mid):
218
        if mid not in self._monitored_items:
219
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
220
                              self)
221
            return
222
        mdata = self._monitored_items[mid]
223
        if not mdata.where_clause_evaluator.eval(event):
224
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
225
            return
226
        fieldlist = ua.EventFieldList()
227
        fieldlist.ClientHandle = mdata.client_handle
228
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
229
        self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
230
231
    def trigger_statuschange(self, code):
232
        self.isub.enqueue_statuschange(code)
233
234
235
class WhereClauseEvaluator:
236
    def __init__(self, logger, aspace, whereclause):
237
        self.logger = logger
238
        self.elements = whereclause.Elements
239
        self._aspace = aspace
240
241
    def eval(self, event):
242
        if not self.elements:
243
            return True
244
        # spec says we should only evaluate first element, which may use other elements
245
        try:
246
            res = self._eval_el(0, event)
247
        except Exception as ex:
248
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
249
                                  ex)
250
            return False
251
        return res
252
253
    def _eval_el(self, index, event):
254
        el = self.elements[index]
255
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
256
        ops = el.FilterOperands  # just to make code more readable
257
        if el.FilterOperator == ua.FilterOperator.Equals:
258
            return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
259
        if el.FilterOperator == ua.FilterOperator.IsNull:
260
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
261
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
262
            return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
263
        if el.FilterOperator == ua.FilterOperator.LessThan:
264
            return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
265
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
266
            return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
267
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
268
            return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
269
        if el.FilterOperator == ua.FilterOperator.Like:
270
            return self._like_operator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
271
        if el.FilterOperator == ua.FilterOperator.Not:
272
            return not self._eval_op(ops[0], event)
273
        if el.FilterOperator == ua.FilterOperator.Between:
274
            return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
275
        if el.FilterOperator == ua.FilterOperator.InList:
276
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
277
        if el.FilterOperator == ua.FilterOperator.And:
278
            self.elements(ops[0].Index)
279
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
280
        if el.FilterOperator == ua.FilterOperator.Or:
281
            return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
282
        if el.FilterOperator == ua.FilterOperator.Cast:
283
            self.logger.warn("Cast operand not implemented, assuming True")
284
            return True
285
        if el.FilterOperator == ua.FilterOperator.OfType:
286
            return event.EventType == self._eval_op(ops[0], event)
287
        # TODO: implement missing operators
288
        self.logger.warning("WhereClause not implemented for element: %s", el)
289
        raise NotImplementedError
290
291
    def _like_operator(self, string, pattern):
292
        raise NotImplementedError
293
294
    def _eval_op(self, op, event):
295
        # seems spec says we should return Null if issues
296
        if isinstance(op, ua.ElementOperand):
297
            return self._eval_el(op.Index, event)
298
        if isinstance(op, ua.AttributeOperand):
299
            if op.BrowsePath:
300
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
301
            return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
302
            # FIXME: check, this is probably broken
303
        if isinstance(op, ua.SimpleAttributeOperand):
304
            if op.BrowsePath:
305
                # we only support depth of 1
306
                return getattr(event, op.BrowsePath[0].Name)
307
            # TODO: write code for index range.... but doe it make any sense
308
            return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
309
        if isinstance(op, ua.LiteralOperand):
310
            return op.Value.Value
311
        self.logger.warning("Where clause element % is not of a known type", op)
312
        raise NotImplementedError
313