Passed
Pull Request — master (#365)
by
unknown
02:54
created

WhereClauseEvaluator._eval_op()   B

Complexity

Conditions 7

Size

Total Lines 19
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 15
nop 3
dl 0
loc 19
rs 8
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from typing import Dict
8
from .address_space import AddressSpace
9
10
11
class MonitoredItemData:
12
    def __init__(self):
13
        self.client_handle = None
14
        self.callback_handle = None
15
        self.monitored_item_id = None
16
        self.mode = None
17
        self.filter = None
18
        self.mvalue = MonitoredItemValues()
19
        self.where_clause_evaluator = None
20
        self.queue_size = 0
21
22
23
class MonitoredItemValues:
24
    def __init__(self):
25
        self.current_value = None
26
        self.old_value = None
27
28
    def set_current_value(self, cur_val):
29
        self.old_value = self.current_value
30
        self.current_value = cur_val
31
32
    def get_current_value(self):
33
        return self.current_value
34
35
    def get_old_value(self):
36
        return self.old_value
37
38
39
class MonitoredItemService:
40
    """
41
    Implements monitored item service for one subscription
42
    """
43
44
    def __init__(self, isub, aspace: AddressSpace):
45
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
46
        self.isub = isub
47
        self.aspace: AddressSpace = aspace
48
        self._monitored_items: Dict[int, MonitoredItemData] = {}
49
        self._monitored_events = {}
50
        self._monitored_datachange: Dict[int, int] = {}
51
        self._monitored_item_counter = 111
52
        self._events_with_retain = {}
53
        self._mid_with_retain = {}
54
55
    def __str__(self):
56
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
57
58
    def delete_all_monitored_items(self):
59
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
60
61
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
62
        results = []
63
        for item in params.ItemsToCreate:
64
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
65
                result = self._create_events_monitored_item(item)
66
            else:
67
                result = await self._create_data_change_monitored_item(item)
68
            results.append(result)
69
        return results
70
71
    def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
72
        results = []
73
        for item in params.ItemsToModify:
74
            results.append(self._modify_monitored_item(item))
75
        return results
76
77
    async def trigger_datachange(self, handle, nodeid, attr):
78
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
79
        dv = self.aspace.read_attribute_value(nodeid, attr)
80
        await self.datachange_callback(handle, dv)
81
82
    def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
83
        for mdata in self._monitored_items.values():
84
            result = ua.MonitoredItemModifyResult()
85
            if mdata.monitored_item_id == params.MonitoredItemId:
86
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
87
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
88
                if params.RequestedParameters.Filter is not None:
89
                    mdata.filter = params.RequestedParameters.Filter
90
                mdata.queue_size = params.RequestedParameters.QueueSize
91
                return result
92
        result = ua.MonitoredItemModifyResult()
93
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
94
        return result
95
96
    def _commit_monitored_item(self, result, mdata: MonitoredItemData):
97
        if result.StatusCode.is_good():
98
            self._monitored_items[result.MonitoredItemId] = mdata
99
100
    def _make_monitored_item_common(self, params):
101
        result = ua.MonitoredItemCreateResult()
102
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
103
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
104
        self._monitored_item_counter += 1
105
        result.MonitoredItemId = self._monitored_item_counter
106
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
107
        mdata = MonitoredItemData()
108
        mdata.mode = params.MonitoringMode
109
        mdata.client_handle = params.RequestedParameters.ClientHandle
110
        mdata.monitored_item_id = result.MonitoredItemId
111
        mdata.queue_size = params.RequestedParameters.QueueSize
112
        mdata.filter = params.RequestedParameters.Filter
113
        return result, mdata
114
115
    def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
116
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
117
                         params.ItemToMonitor.AttributeId)
118
119
        result, mdata = self._make_monitored_item_common(params)
120
        ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
121
                                                          ua.AttributeIds.EventNotifier).Value.Value
122
123
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
124
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
125
            return result
126
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
127
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
128
        self._commit_monitored_item(result, mdata)
129
130
        def find_events(node_id):
131
            for ref in self.aspace.get(node_id).references:
132
                if ref.ReferenceTypeId == ua.NumericNodeId(41):
133
                    if ref.NodeId not in self._monitored_events:
134
                        self._monitored_events[ref.NodeId] = []
135
                    self._monitored_events[ref.NodeId].append(result.MonitoredItemId)
136
                elif ref.ReferenceTypeId == ua.NumericNodeId(48):
137
                    find_events(ref.NodeId)
138
139
        find_events(params.ItemToMonitor.NodeId)
140
        if params.ItemToMonitor.NodeId not in self._monitored_events:
141
            self._monitored_events[params.ItemToMonitor.NodeId] = []
142
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
143
        return result
144
145
    async def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
146
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
147
                         params.ItemToMonitor.AttributeId)
148
149
        result, mdata = self._make_monitored_item_common(params)
150
        result.FilterResult = params.RequestedParameters.Filter
151
        result.StatusCode, handle = self.aspace.add_datachange_callback(
152
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
153
154
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
155
        mdata.callback_handle = handle
156
        self._commit_monitored_item(result, mdata)
157
        if result.StatusCode.is_good():
158
            self._monitored_datachange[handle] = result.MonitoredItemId
159
            # force data change event generation
160
            await self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
161
        return result
162
163
    def delete_monitored_items(self, ids):
164
        self.logger.debug("delete monitored items %s", ids)
165
        # with self._lock:
166
        results = []
167
        for mid in ids:
168
            results.append(self._delete_monitored_items(mid))
169
        return results
170
171
    def _delete_monitored_items(self, mid: int):
172
        if mid not in self._monitored_items:
173
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
174
        for k, v in self._monitored_events.items():
175
            if mid in v:
176
                v.remove(mid)
177
                if not v:
178
                    self._monitored_events.pop(k)
179
                break
180
        for k, v in self._monitored_datachange.items():
181
            if v == mid:
182
                self.aspace.delete_datachange_callback(k)
183
                self._monitored_datachange.pop(k)
184
                break
185
        self._monitored_items.pop(mid)
186
        return ua.StatusCode()
187
188
    async def datachange_callback(self, handle: int, value, error=None):
189
        if error:
190
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
191
                             handle, error)
192
            await self.trigger_statuschange(error)
193
        else:
194
            # self.logger.info(f"subscription {self}: datachange callback called "
195
            #                 f"with handle '{handle}' and value '{value.Value}'")
196
            event = ua.MonitoredItemNotification()
197
            mid = self._monitored_datachange[handle]
198
            mdata = self._monitored_items[mid]
199
            mdata.mvalue.set_current_value(value.Value.Value)
200
            if mdata.filter:
201
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
202
            else:
203
                deadband_flag_pass = True
204
            if deadband_flag_pass:
205
                event.ClientHandle = mdata.client_handle
206
                event.Value = value
207
                await self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
208
209
    def deadband_callback(self, values, flt):
210
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
211
            return True
212
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
213
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
214
            return True
215
        if flt.DeadbandType == ua.DeadbandType.Percent:
216
            self.logger.warning("DeadbandType Percent is not implemented !")
217
            return True
218
        return False
219
220
    async def trigger_event(self, event):
221
        if hasattr(event, 'Retain'):
222
            if event.Retain:
223
                self._events_with_retain[event.emitting_node] = event
224
            else:
225
                del self._events_with_retain[event.emitting_node]
226
        if event.emitting_node not in self._monitored_events:
227
            self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
228
            return False
229
230
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
231
        mids = self._monitored_events[event.emitting_node]
232
233
        for mid in mids:
234
            await self._trigger_event(event, mid)
235
        return True
236
237
    async def _trigger_event(self, event, mid: int):
238
        if mid not in self._monitored_items:
239
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
240
                              self)
241
            return
242
        if hasattr(event, 'Retain'):
243
            if event.Retain:
244
                if mid in self._mid_with_retain:
245
                    if event not in self._mid_with_retain[mid]:
246
                        self._mid_with_retain[mid].append(event)
247
                else:
248
                    self._mid_with_retain[mid] = [event]
249
            else:
250
                self._mid_with_retain[mid].remove(event)
251
                if not self._mid_with_retain[mid]:
252
                    del self._mid_with_retain[mid]
253
        mdata = self._monitored_items[mid]
254
        if not mdata.where_clause_evaluator.eval(event):
255
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
256
            return
257
        fieldlist = ua.EventFieldList()
258
        fieldlist.ClientHandle = mdata.client_handle
259
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
260
261
        await self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
262
263
    async def trigger_statuschange(self, code):
264
        await self.isub.enqueue_statuschange(code)
265
266
    async def condition_refresh(self):
267
        for event in self._events_with_retain.values():
268
            await self.trigger_event(event)
269
270
    def condition_refresh2(self, mid):
271
        if mid in self._mid_with_retain:
272
            for event in self._mid_with_retain[mid]:
273
                self._trigger_event(event, mid)
274
275
276
class WhereClauseEvaluator:
277
    def __init__(self, logger, aspace: AddressSpace, whereclause):
278
        self.logger = logger
279
        self.elements = whereclause.Elements
280
        self._aspace = aspace
281
282
    def eval(self, event):
283
        if not self.elements:
284
            return True
285
        # spec says we should only evaluate first element, which may use other elements
286
        try:
287
            res = self._eval_el(0, event)
288
        except Exception as ex:
289
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
290
                                  ex)
291
            return False
292
        return res
293
294
    def _eval_el(self, index, event):
295
        el = self.elements[index]
296
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
297
        ops = el.FilterOperands  # just to make code more readable
298
        if el.FilterOperator == ua.FilterOperator.Equals:
299
            return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
300
        if el.FilterOperator == ua.FilterOperator.IsNull:
301
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
302
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
303
            return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
304
        if el.FilterOperator == ua.FilterOperator.LessThan:
305
            return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
306
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
307
            return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
308
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
309
            return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
310
        if el.FilterOperator == ua.FilterOperator.Like:
311
            return self._like_operator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
312
        if el.FilterOperator == ua.FilterOperator.Not:
313
            return not self._eval_op(ops[0], event)
314
        if el.FilterOperator == ua.FilterOperator.Between:
315
            return self._eval_op(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
316
        if el.FilterOperator == ua.FilterOperator.InList:
317
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
318
        if el.FilterOperator == ua.FilterOperator.And:
319
            self.elements(ops[0].Index)
320
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
321
        if el.FilterOperator == ua.FilterOperator.Or:
322
            return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
323
        if el.FilterOperator == ua.FilterOperator.Cast:
324
            self.logger.warn("Cast operand not implemented, assuming True")
325
            return True
326
        if el.FilterOperator == ua.FilterOperator.OfType:
327
            return event.EventType == self._eval_op(ops[0], event)
328
        # TODO: implement missing operators
329
        self.logger.warning("WhereClause not implemented for element: %s", el)
330
        raise NotImplementedError
331
332
    def _like_operator(self, string, pattern):
333
        raise NotImplementedError
334
335
    def _eval_op(self, op, event):
336
        # seems spec says we should return Null if issues
337
        if isinstance(op, ua.ElementOperand):
338
            return self._eval_el(op.Index, event)
339
        if isinstance(op, ua.AttributeOperand):
340
            if op.BrowsePath:
341
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
342
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
343
            # FIXME: check, this is probably broken
344
        if isinstance(op, ua.SimpleAttributeOperand):
345
            if op.BrowsePath:
346
                # we only support depth of 1
347
                return getattr(event, op.BrowsePath[0].Name)
348
            # TODO: write code for index range.... but doe it make any sense
349
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
350
        if isinstance(op, ua.LiteralOperand):
351
            return op.Value.Value
352
        self.logger.warning("Where clause element % is not of a known type", op)
353
        raise NotImplementedError
354