Passed
Pull Request — master (#367)
by
unknown
02:46
created

reconciliator.Reconciliator.update_nodes()   D

Complexity

Conditions 11

Size

Total Lines 78
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 63
nop 4
dl 0
loc 78
rs 4.9909
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like reconciliator.Reconciliator.update_nodes() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import annotations
2
3
import asyncio
4
import inspect
5
import logging
6
import time
7
8
from collections import defaultdict
9
from dataclasses import astuple
10
from enum import Enum
11
from functools import partial
12
from typing import TYPE_CHECKING, Dict, Set, Union
13
from sortedcontainers import SortedDict
14
from asyncua import ua
15
16
from .utils import batch, event_wait, get_digest
17
from .virtual_subscription import VirtualSubscription
18
19
20
_logger = logging.getLogger(__name__)
21
22
23
if TYPE_CHECKING:
24
    from .ha_client import HaClient
25
26
27
class Method(Enum):
28
    """
29
    Map the actions to the lower level object methods
30
    """
31
32
    ADD_SUB = "create_subscription"
33
    ADD_MI = "subscribe_data_change"
34
    DEL_SUB = "delete_subscription"
35
    DEL_MI = "unsubscribe"
36
    MONITORING = "set_monitoring_mode"
37
    PUBLISHING = "set_publishing_mode"
38
39
40
class Reconciliator:
41
    """
42
    Reconciliator is a side-task of HaClient. It regularly
43
    applies the HaClient subscription configurations to actual
44
    OPC-UA objects.
45
46
    After a successfull reconciliation and if all the client status
47
    are >= HEALTHY_STATE, the ideal_map is equal to the real_map.
48
    """
49
50
    BATCH_MI_SIZE = 1000
51
52
    def __init__(self, timer: int, ha_client: HaClient) -> None:
53
        self.timer = timer
54
        self.ha_client = ha_client
55
        self.loop = ha_client.loop
56
        self.is_running = False
57
        self.stop_event = asyncio.Event(loop=self.loop)
58
59
        self.real_map: Dict[str, SortedDict] = {}
60
        for url in self.ha_client.urls:
61
            # full type: Dict[str, SortedDict[str, VirtualSubscription]]
62
            self.real_map[url] = SortedDict()
63
64
        self.name_to_subscription = defaultdict(dict)
65
        self.node_to_handle = defaultdict(dict)
66
        self.init_hooks()
67
68
    def init_hooks(self):
69
        """
70
        Implement hooks for custom actions like collecting metrics
71
        or triggering external events.
72
        """
73
        hook_prefix = "hook_"
74
        hook_events = ["mi_request", "del_from_map", "add_to_map", "add_to_map_error"]
75
        hooks = [f"{hook_prefix}{evt}" for evt in hook_events]
76
        for hook in hooks:
77
            if getattr(self, hook, None):
78
                continue
79
            setattr(self, hook, lambda **kwargs: None)
80
81
    async def run(self) -> None:
82
        _logger.info(f"Starting Reconciliator loop, checking every {self.timer}sec")
83
        self.is_running = True
84
        while self.is_running:
85
86
            start = time.time()
87
            async with self.ha_client._url_to_reset_lock:
88
                await self.resubscribe()
89
            async with self.ha_client._ideal_map_lock:
90
                await self.reconciliate()
91
            await self.debug_status()
92
            stop = time.time() - start
93
            _logger.info(f"[TIME] Reconciliation: {stop:.2f}sec")
94
95
            if await event_wait(self.stop_event, self.timer):
96
                self.is_running = False
97
                break
98
99
    async def stop(self) -> None:
100
        self.stop_event.set()
101
102
    async def resubscribe(self) -> None:
103
        """
104
        Remove all the subscriptions from the real_map.
105
106
        Deleting them from the remote server would be
107
        helpless because they are tied to a deleted session,
108
        however they should eventually time out.
109
        """
110
        _logger.debug("In resubscribe")
111
        url_to_reset = self.ha_client.url_to_reset
112
        while url_to_reset:
113
            url = url_to_reset.pop()
114
            self.real_map[url].clear()
115
            if self.name_to_subscription.get(url):
116
                self.name_to_subscription.pop(url)
117
            if self.node_to_handle.get(url):
118
                self.node_to_handle.pop(url)
119
120
    async def reconciliate(self) -> None:
121
        """
122
        Identify the differences between the ideal and the real_map
123
        and take actual actions on the underlying OPC-UA objects.
124
125
        We only tries to reconciliate healthy clients, since most of the
126
        unhealthy clients will end up resubscribing and their map
127
        will be cleared anyway.
128
129
        Reconciliator steps are ordered this way:
130
131
          1 - Resubscribe newly reconnected clients
132
          2 - Identify gap with healthy client configurations
133
          3 - Remove/Add subscription
134
          4 - Add nodes to subscriptions
135
          5 - Update publishing/monitoring options
136
        """
137
138
        ideal_map = self.ha_client.ideal_map
139
        healthy, unhealthy = await self.ha_client.group_clients_by_health()
140
        async with self.ha_client._client_lock:
141
            valid_urls = {self.ha_client.clients[h].url for h in healthy}
142
        real_map = self.real_map
143
        try:
144
            targets = set()
145
            for url in valid_urls:
146
                digest_ideal = get_digest(ideal_map[url])
147
                digest_real = get_digest(real_map[url])
148
                if url not in real_map or digest_ideal != digest_real:
149
                    targets.add(url)
150
            if not targets:
151
                _logger.info(
152
                    f"[PASS] No configuration difference for healthy targets: {valid_urls}"
153
                )
154
                return
155
            _logger.info(
156
                f"[WORK] Configuration difference found for healthy targets: {targets}"
157
            )
158
        except (AttributeError, TypeError) as e:
159
            _logger.warning(f"[WORK] Reconciliator performance impacted: {e}")
160
            targets = set(valid_urls)
161
        # add missing and delete unsubscribed subs
162
        await self.update_subscriptions(real_map, ideal_map, targets)
163
        # add and remove nodes
164
        await self.update_nodes(real_map, ideal_map, targets)
165
        # look for missing options (publish/monitoring) for existing subs
166
        await self.update_subscription_modes(real_map, ideal_map, targets)
167
168
    async def update_subscriptions(self, real_map, ideal_map, targets: Set[str]):
169
        _logger.debug("In update_subscriptions")
170
        tasks_add = []
171
        tasks_del = []
172
        for url in targets:
173
            sub_to_del = set(real_map[url]) - set(ideal_map[url])
174
            if sub_to_del:
175
                _logger.info(f"Removing {len(sub_to_del)} subscriptions")
176
            for sub_name in sub_to_del:
177
                sub_handle = self.name_to_subscription[url][sub_name]
178
                task = self.loop.create_task(sub_handle.delete())
179
                task.add_done_callback(
180
                    partial(self.del_from_map, url, Method.DEL_SUB, sub_name=sub_name)
181
                )
182
                tasks_del.append(task)
183
184
            sub_to_add = set(ideal_map[url]) - set(real_map[url])
185
            if sub_to_add:
186
                _logger.info(f"Adding {len(sub_to_add)} subscriptions")
187
            client = self.ha_client.get_client_by_url(url)
188
            for sub_name in sub_to_add:
189
                vs = ideal_map[url][sub_name]
190
                task = self.loop.create_task(
191
                    client.create_subscription(
192
                        vs.period, vs.handler, publishing=vs.publishing
193
                    )
194
                )
195
                task.add_done_callback(
196
                    partial(
197
                        self.add_to_map,
198
                        url,
199
                        Method.ADD_SUB,
200
                        period=vs.period,
201
                        handler=vs.handler,
202
                        publishing=vs.publishing,
203
                        monitoring=vs.monitoring,
204
                        sub_name=sub_name,
205
                    )
206
                )
207
                tasks_add.append(task)
208
                # add the sub name to real_map
209
        await asyncio.gather(*tasks_add, return_exceptions=True)
210
        await asyncio.gather(*tasks_del, return_exceptions=True)
211
212
    async def update_nodes(self, real_map, ideal_map, targets: Set[str]):
213
        _logger.debug("In update_nodes")
214
        tasks_add = []
215
        tasks_del = []
216
        for url in targets:
217
            client = self.ha_client.get_client_by_url(url)
218
            for sub_name in ideal_map[url]:
219
                real_sub = self.name_to_subscription[url].get(sub_name)
220
                # in case the previous create_subscription request failed
221
                if not real_sub:
222
                    _logger.warning(
223
                        f"Can't create nodes for {url} since underlying subscription for {sub_name} doesn't exist"
224
                    )
225
                    continue
226
                vs_real = real_map[url][sub_name]
227
                vs_ideal = ideal_map[url][sub_name]
228
                node_to_del = set(vs_real.nodes) - set(vs_ideal.nodes)
229
                if node_to_del:
230
                    _logger.info(f"Removing {len(node_to_del)} Nodes")
231
                if node_to_del:
232
                    for batch_nodes in batch(node_to_del, self.BATCH_MI_SIZE):
233
                        node_handles = [
234
                            self.node_to_handle[url][node] for node in batch_nodes
235
                        ]
236
                        task = self.loop.create_task(real_sub.unsubscribe(node_handles))
237
                        task.add_done_callback(
238
                            partial(
239
                                self.del_from_map,
240
                                url,
241
                                Method.DEL_MI,
242
                                sub_name=sub_name,
243
                                nodes=batch_nodes,
244
                            )
245
                        )
246
                        tasks_del.append(task)
247
                # pyre-fixme[16]: `Reconciliator` has no attribute `hook_mi_request`.
248
                self.hook_mi_request(
249
                    url=url, sub_name=sub_name, nodes=node_to_del, action=Method.DEL_MI
250
                )
251
                monitoring = vs_real.monitoring
252
                node_to_add = set(vs_ideal.nodes) - set(vs_real.nodes)
253
                if node_to_add:
254
                    _logger.info(f"Adding {len(node_to_add)} Nodes")
255
                # hack to group subscription by NodeAttributes
256
                attr_to_nodes = defaultdict(list)
257
                for node in node_to_add:
258
                    node_attr = vs_ideal.nodes[node]
259
                    node_obj = client.get_node(node)
260
                    attr_to_nodes[node_attr].append(node_obj)
261
                for node_attr, nodes_obj in attr_to_nodes.items():
262
                    # some servers are sensitive to the number of MI per request
263
                    for batch_nodes_obj in batch(nodes_obj, self.BATCH_MI_SIZE):
264
                        task = self.loop.create_task(
265
                            real_sub.subscribe_data_change(
266
                                batch_nodes_obj,
267
                                *astuple(node_attr),
268
                                monitoring=monitoring,
269
                            )
270
                        )
271
                        nodes = [n.nodeid.to_string() for n in batch_nodes_obj]
272
                        task.add_done_callback(
273
                            partial(
274
                                self.add_to_map,
275
                                url,
276
                                Method.ADD_MI,
277
                                sub_name=sub_name,
278
                                nodes=nodes,
279
                                node_attr=node_attr,
280
                                monitoring=monitoring,
281
                            )
282
                        )
283
                        tasks_add.append(task)
284
                self.hook_mi_request(
285
                    url=url, sub_name=sub_name, nodes=node_to_add, action=Method.ADD_MI
286
                )
287
288
        await asyncio.gather(*tasks_del, return_exceptions=True)
289
        await asyncio.gather(*tasks_add, return_exceptions=True)
290
291
    async def update_subscription_modes(
292
        self, real_map, ideal_map, targets: Set[str]
293
    ) -> None:
294
        _logger.debug("In update_subscription_modes")
295
        modes = [Method.MONITORING, Method.PUBLISHING]
296
        methods = [n.value for n in modes]
297
        tasks = []
298
        for url in targets:
299
            for sub_name in real_map[url]:
300
                real_sub = self.name_to_subscription[url].get(sub_name)
301
                # in case the previous create_subscription request failed
302
                if not real_sub:
303
                    _logger.warning(
304
                        f"Can't change modes for {url} since underlying subscription for {sub_name} doesn't exist"
305
                    )
306
                    continue
307
                vs_real = real_map[url][sub_name]
308
                vs_ideal = ideal_map[url][sub_name]
309
                for action, func in zip(modes, methods):
310
                    attr = action.name.lower()
311
                    ideal_val = getattr(vs_ideal, attr)
312
                    real_val = getattr(vs_real, attr)
313
                    if ideal_val != real_val:
314
                        _logger.info(f"Changing {attr} for {sub_name} to {ideal_val}")
315
                        set_func = getattr(real_sub, func)
316
                        task = self.loop.create_task(set_func(ideal_val))
317
                        task.add_done_callback(
318
                            partial(
319
                                self.change_mode,
320
                                url,
321
                                action,
322
                                ideal_val,
323
                                sub_name=sub_name,
324
                            )
325
                        )
326
                        tasks.append(task)
327
        await asyncio.gather(*tasks, return_exceptions=True)
328
329
    def change_mode(
330
        self,
331
        url: str,
332
        action: Method,
333
        val: Union[bool, ua.MonitoringMode],
334
        fut: asyncio.Task,
335
        **kwargs,
336
    ):
337
        if fut.exception():
338
            _logger.warning(f"Can't {action.value} on {url}: {fut.exception()}")
339
            return
340
        sub_name = kwargs["sub_name"]
341
        vs = self.real_map[url][sub_name]
342
        setattr(vs, action.name.lower(), val)
343
344
    def add_to_map(self, url: str, action: Method, fut: asyncio.Task, **kwargs):
345
        if fut.exception():
346
            _logger.warning(f"Can't {action.value} on {url}: {fut.exception()}")
347
            # pyre-fixme[16]: `Reconciliator` has no attribute `hook_add_to_map_error`.
348
            self.hook_add_to_map_error(url=url, action=action, fut=fut, **kwargs)
349
            return
350
351
        sub_name = kwargs.pop("sub_name")
352
        if action == Method.ADD_SUB:
353
            vs = VirtualSubscription(**kwargs)
354
            self.real_map[url][sub_name] = vs
355
            self.name_to_subscription[url][sub_name] = fut.result()
356
357
        if action == Method.ADD_MI:
358
            nodes = kwargs["nodes"]
359
            vs = self.real_map[url][sub_name]
360
            vs.subscribe_data_change(nodes, *astuple(kwargs["node_attr"]))
361
            for node, handle in zip(nodes, fut.result()):
362
                if isinstance(handle, ua.StatusCode):
363
                    # a StatusCode is returned, the request has failed.
364
                    vs.unsubscribe([node])
365
                    _logger.info(f"Node {node} subscription failed: {handle}")
366
                    # The node is invalid, remove it from both maps
367
                    if handle.name == "BadNodeIdUnknown":
368
                        _logger.warning(
369
                            f"WARNING: Abandoning {node} because it returned {handle} from {url}"
370
                        )
371
                        real_vs = self.ha_client.ideal_map[url][sub_name]
372
                        real_vs.unsubscribe([node])
373
                    continue
374
                self.node_to_handle[url][node] = handle
375
            # pyre-fixme[16]: `Reconciliator` has no attribute `hook_on_subscribe`.
376
        self.hook_add_to_map(fut=fut, url=url, action=action, **kwargs)
377
378
    def del_from_map(self, url: str, action: Method, fut: asyncio.Task, **kwargs):
379
        if fut.exception():
380
            # log exception but continues to delete local resources
381
            _logger.warning(f"Can't {action.value} on {url}: {fut.exception()}")
382
        sub_name = kwargs["sub_name"]
383
384
        if action == Method.DEL_SUB:
385
            self.real_map[url].pop(sub_name)
386
            self.name_to_subscription[url].pop(sub_name)
387
            _logger.warning(f"In del_from_map del sub: {fut.result()}")
388
389
        if action == Method.DEL_MI:
390
            nodes = kwargs["nodes"]
391
            vs = self.real_map[url][sub_name]
392
            vs.unsubscribe(nodes)
393
            for node in nodes:
394
                self.node_to_handle[url].pop(node)
395
            # pyre-fixme[16]: `Reconciliator` has no attribute `hook_on_unsubscribe`.
396
        self.hook_del_from_map(fut=fut, url=url, action=action, **kwargs)
397
398
    async def debug_status(self):
399
        """
400
        Return the class attribute for troubleshooting purposes
401
        """
402
        for a in inspect.getmembers(self):
403
            if not a[0].startswith("__") and not inspect.ismethod(a[1]):
404
                _logger.debug(a)
405