asyncua.server.monitored_item_service   F
last analyzed

Complexity

Total Complexity 80

Size/Duplication

Total Lines 313
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 255
dl 0
loc 313
rs 2
c 0
b 0
f 0
wmc 80

28 Methods

Rating   Name   Duplication   Size   Complexity  
A WhereClauseEvaluator._like_operator() 0 2 1
B MonitoredItemService.deadband_callback() 0 10 6
A MonitoredItemService.trigger_statuschange() 0 2 1
B WhereClauseEvaluator._eval_op() 0 19 7
B MonitoredItemService._delete_monitored_items() 0 16 7
A MonitoredItemService.delete_monitored_items() 0 7 2
A MonitoredItemService._create_events_monitored_item() 0 18 4
A MonitoredItemService._create_data_change_monitored_item() 0 17 2
A WhereClauseEvaluator.__init__() 0 4 1
A MonitoredItemService._make_monitored_item_common() 0 14 1
F WhereClauseEvaluator._eval_el() 0 37 15
A MonitoredItemService.trigger_event() 0 9 3
A MonitoredItemService._trigger_event() 0 13 3
A WhereClauseEvaluator.eval() 0 11 3
A MonitoredItemService.delete_all_monitored_items() 0 2 1
A MonitoredItemService.modify_monitored_items() 0 5 2
A MonitoredItemService.__init__() 0 8 1
A MonitoredItemService.trigger_datachange() 0 4 1
A MonitoredItemService.__str__() 0 2 1
A MonitoredItemValues.set_current_value() 0 3 1
A MonitoredItemService.create_monitored_items() 0 9 3
A MonitoredItemService._modify_monitored_item() 0 13 4
A MonitoredItemValues.get_current_value() 0 2 1
A MonitoredItemValues.get_old_value() 0 2 1
A MonitoredItemValues.__init__() 0 3 1
A MonitoredItemData.__init__() 0 9 1
A MonitoredItemService._commit_monitored_item() 0 3 2
A MonitoredItemService.datachange_callback() 0 20 4

How to fix   Complexity   

Complexity

Complex classes like asyncua.server.monitored_item_service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from typing import Dict
8
from .address_space import AddressSpace
9
10
11
class MonitoredItemData:
12
    def __init__(self):
13
        self.client_handle = None
14
        self.callback_handle = None
15
        self.monitored_item_id = None
16
        self.mode = None
17
        self.filter = None
18
        self.mvalue = MonitoredItemValues()
19
        self.where_clause_evaluator = None
20
        self.queue_size = 0
21
22
23
class MonitoredItemValues:
24
    def __init__(self):
25
        self.current_value = None
26
        self.old_value = None
27
28
    def set_current_value(self, cur_val):
29
        self.old_value = self.current_value
30
        self.current_value = cur_val
31
32
    def get_current_value(self):
33
        return self.current_value
34
35
    def get_old_value(self):
36
        return self.old_value
37
38
39
class MonitoredItemService:
40
    """
41
    Implements monitored item service for one subscription
42
    """
43
44
    def __init__(self, isub, aspace: AddressSpace):
45
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
46
        self.isub = isub
47
        self.aspace: AddressSpace = aspace
48
        self._monitored_items: Dict[int, MonitoredItemData] = {}
49
        self._monitored_events = {}
50
        self._monitored_datachange: Dict[int, int] = {}
51
        self._monitored_item_counter = 111
52
53
    def __str__(self):
54
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
55
56
    def delete_all_monitored_items(self):
57
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
58
59
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
60
        results = []
61
        for item in params.ItemsToCreate:
62
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
63
                result = self._create_events_monitored_item(item)
64
            else:
65
                result = await self._create_data_change_monitored_item(item)
66
            results.append(result)
67
        return results
68
69
    def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
70
        results = []
71
        for item in params.ItemsToModify:
72
            results.append(self._modify_monitored_item(item))
73
        return results
74
75
    async def trigger_datachange(self, handle, nodeid, attr):
76
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
77
        dv = self.aspace.read_attribute_value(nodeid, attr)
78
        await self.datachange_callback(handle, dv)
79
80
    def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
81
        for mdata in self._monitored_items.values():
82
            result = ua.MonitoredItemModifyResult()
83
            if mdata.monitored_item_id == params.MonitoredItemId:
84
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
85
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
86
                if params.RequestedParameters.Filter is not None:
87
                    mdata.filter = params.RequestedParameters.Filter
88
                mdata.queue_size = params.RequestedParameters.QueueSize
89
                return result
90
        result = ua.MonitoredItemModifyResult()
91
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
92
        return result
93
94
    def _commit_monitored_item(self, result, mdata: MonitoredItemData):
95
        if result.StatusCode.is_good():
96
            self._monitored_items[result.MonitoredItemId] = mdata
97
98
    def _make_monitored_item_common(self, params):
99
        result = ua.MonitoredItemCreateResult()
100
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
101
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
102
        self._monitored_item_counter += 1
103
        result.MonitoredItemId = self._monitored_item_counter
104
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
105
        mdata = MonitoredItemData()
106
        mdata.mode = params.MonitoringMode
107
        mdata.client_handle = params.RequestedParameters.ClientHandle
108
        mdata.monitored_item_id = result.MonitoredItemId
109
        mdata.queue_size = params.RequestedParameters.QueueSize
110
        mdata.filter = params.RequestedParameters.Filter
111
        return result, mdata
112
113
    def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
114
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
115
                         params.ItemToMonitor.AttributeId)
116
117
        result, mdata = self._make_monitored_item_common(params)
118
        ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
119
                                                          ua.AttributeIds.EventNotifier).Value.Value
120
121
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
122
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
123
            return result
124
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
125
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
126
        self._commit_monitored_item(result, mdata)
127
        if params.ItemToMonitor.NodeId not in self._monitored_events:
128
            self._monitored_events[params.ItemToMonitor.NodeId] = []
129
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
130
        return result
131
132
    async def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
133
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
134
                         params.ItemToMonitor.AttributeId)
135
136
        result, mdata = self._make_monitored_item_common(params)
137
        result.FilterResult = params.RequestedParameters.Filter
138
        result.StatusCode, handle = self.aspace.add_datachange_callback(
139
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
140
141
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
142
        mdata.callback_handle = handle
143
        self._commit_monitored_item(result, mdata)
144
        if result.StatusCode.is_good():
145
            self._monitored_datachange[handle] = result.MonitoredItemId
146
            # force data change event generation
147
            await self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
148
        return result
149
150
    def delete_monitored_items(self, ids):
151
        self.logger.debug("delete monitored items %s", ids)
152
        # with self._lock:
153
        results = []
154
        for mid in ids:
155
            results.append(self._delete_monitored_items(mid))
156
        return results
157
158
    def _delete_monitored_items(self, mid: int):
159
        if mid not in self._monitored_items:
160
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
161
        for k, v in self._monitored_events.items():
162
            if mid in v:
163
                v.remove(mid)
164
                if not v:
165
                    self._monitored_events.pop(k)
166
                break
167
        for k, v in self._monitored_datachange.items():
168
            if v == mid:
169
                self.aspace.delete_datachange_callback(k)
170
                self._monitored_datachange.pop(k)
171
                break
172
        self._monitored_items.pop(mid)
173
        return ua.StatusCode()
174
175
    async def datachange_callback(self, handle: int, value, error=None):
176
        if error:
177
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
178
                             handle, error)
179
            await self.trigger_statuschange(error)
180
        else:
181
            # self.logger.info(f"subscription {self}: datachange callback called "
182
            #                 f"with handle '{handle}' and value '{value.Value}'")
183
            event = ua.MonitoredItemNotification()
184
            mid = self._monitored_datachange[handle]
185
            mdata = self._monitored_items[mid]
186
            mdata.mvalue.set_current_value(value.Value.Value)
187
            if mdata.filter:
188
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
189
            else:
190
                deadband_flag_pass = True
191
            if deadband_flag_pass:
192
                event.ClientHandle = mdata.client_handle
193
                event.Value = value
194
                await self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
195
196
    def deadband_callback(self, values, flt):
197
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
198
            return True
199
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
200
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
201
            return True
202
        if flt.DeadbandType == ua.DeadbandType.Percent:
203
            self.logger.warning("DeadbandType Percent is not implemented !")
204
            return True
205
        return False
206
207
    async def trigger_event(self, event):
208
        if event.emitting_node not in self._monitored_events:
209
            self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
210
            return False
211
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
212
        mids = self._monitored_events[event.emitting_node]
213
        for mid in mids:
214
            await self._trigger_event(event, mid)
215
        return True
216
217
    async def _trigger_event(self, event, mid: int):
218
        if mid not in self._monitored_items:
219
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
220
                              self)
221
            return
222
        mdata = self._monitored_items[mid]
223
        if not mdata.where_clause_evaluator.eval(event):
224
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
225
            return
226
        fieldlist = ua.EventFieldList()
227
        fieldlist.ClientHandle = mdata.client_handle
228
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
229
        await self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
230
231
    async def trigger_statuschange(self, code):
232
        await self.isub.enqueue_statuschange(code)
233
234
235
class WhereClauseEvaluator:
236
    def __init__(self, logger, aspace: AddressSpace, whereclause):
237
        self.logger = logger
238
        self.elements = whereclause.Elements
239
        self._aspace = aspace
240
241
    def eval(self, event):
242
        if not self.elements:
243
            return True
244
        # spec says we should only evaluate first element, which may use other elements
245
        try:
246
            res = self._eval_el(0, event)
247
        except Exception as ex:
248
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
249
                                  ex)
250
            return False
251
        return res
252
253
    def _eval_el(self, index, event):
254
        el = self.elements[index]
255
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
256
        ops = el.FilterOperands  # just to make code more readable
257
        if el.FilterOperator == ua.FilterOperator.Equals:
258
            return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
259
        if el.FilterOperator == ua.FilterOperator.IsNull:
260
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
261
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
262
            return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
263
        if el.FilterOperator == ua.FilterOperator.LessThan:
264
            return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
265
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
266
            return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
267
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
268
            return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
269
        if el.FilterOperator == ua.FilterOperator.Like:
270
            return self._like_operator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
271
        if el.FilterOperator == ua.FilterOperator.Not:
272
            return not self._eval_op(ops[0], event)
273
        if el.FilterOperator == ua.FilterOperator.Between:
274
            return self._eval_op(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
275
        if el.FilterOperator == ua.FilterOperator.InList:
276
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
277
        if el.FilterOperator == ua.FilterOperator.And:
278
            self.elements(ops[0].Index)
279
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
280
        if el.FilterOperator == ua.FilterOperator.Or:
281
            return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
282
        if el.FilterOperator == ua.FilterOperator.Cast:
283
            self.logger.warn("Cast operand not implemented, assuming True")
284
            return True
285
        if el.FilterOperator == ua.FilterOperator.OfType:
286
            return event.EventType == self._eval_op(ops[0], event)
287
        # TODO: implement missing operators
288
        self.logger.warning("WhereClause not implemented for element: %s", el)
289
        raise NotImplementedError
290
291
    def _like_operator(self, string, pattern):
292
        raise NotImplementedError
293
294
    def _eval_op(self, op, event):
295
        # seems spec says we should return Null if issues
296
        if isinstance(op, ua.ElementOperand):
297
            return self._eval_el(op.Index, event)
298
        if isinstance(op, ua.AttributeOperand):
299
            if op.BrowsePath:
300
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
301
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
302
            # FIXME: check, this is probably broken
303
        if isinstance(op, ua.SimpleAttributeOperand):
304
            if op.BrowsePath:
305
                # we only support depth of 1
306
                return getattr(event, op.BrowsePath[0].Name)
307
            # TODO: write code for index range.... but doe it make any sense
308
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
309
        if isinstance(op, ua.LiteralOperand):
310
            return op.Value.Value
311
        self.logger.warning("Where clause element % is not of a known type", op)
312
        raise NotImplementedError
313