Passed
Pull Request — master (#367)
by
unknown
02:49
created

Reconciliator.update_subscription_modes()   B

Complexity

Conditions 6

Size

Total Lines 37
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 32
nop 4
dl 0
loc 37
rs 8.1786
c 0
b 0
f 0
1
import asyncio
2
import inspect
3
import logging
4
import time
5
6
from collections import defaultdict
7
from dataclasses import astuple
8
from enum import Enum
9
from functools import partial
10
from typing import TYPE_CHECKING, Dict, Set, Union
11
from sortedcontainers import SortedDict
12
from asyncua import ua
13
from pickle import PicklingError
14
15
from .utils import batch, event_wait, get_digest
16
from .virtual_subscription import VirtualSubscription
17
18
19
_logger = logging.getLogger(__name__)
20
21
22
if TYPE_CHECKING:
23
    from .ha_client import HaClient
24
25
26
class Method(Enum):
27
    """
28
    Map the actions to the lower level object methods
29
    """
30
31
    ADD_SUB = "create_subscription"
32
    ADD_MI = "subscribe_data_change"
33
    DEL_SUB = "delete_subscription"
34
    DEL_MI = "unsubscribe"
35
    MONITORING = "set_monitoring_mode"
36
    PUBLISHING = "set_publishing_mode"
37
38
39
class Reconciliator:
40
    """
41
    Reconciliator is a side-task of HaClient. It regularly
42
    applies the HaClient subscription configurations to actual
43
    OPC-UA objects.
44
45
    After a successfull reconciliation and if all the client status
46
    are >= HEALTHY_STATE, the ideal_map is equal to the real_map.
47
    """
48
49
    BATCH_MI_SIZE = 1000
50
51
    def __init__(self, timer: int, ha_client: "HaClient") -> None:
52
        self.timer = timer
53
        self.ha_client = ha_client
54
        self.loop = ha_client.loop
55
        self.is_running = False
56
        self.stop_event = asyncio.Event(loop=self.loop)
57
58
        self.real_map: Dict[str, SortedDict] = {}
59
        for url in self.ha_client.urls:
60
            # full type: Dict[str, SortedDict[str, VirtualSubscription]]
61
            self.real_map[url] = SortedDict()
62
63
        self.name_to_subscription = defaultdict(dict)
64
        self.node_to_handle = defaultdict(dict)
65
        self.init_hooks()
66
67
    def init_hooks(self):
68
        """
69
        Implement hooks for custom actions like collecting metrics
70
        or triggering external events.
71
        """
72
        hook_prefix = "hook_"
73
        hook_events = ["mi_request", "del_from_map", "add_to_map", "add_to_map_error"]
74
        hooks = [f"{hook_prefix}{evt}" for evt in hook_events]
75
        for hook in hooks:
76
            if getattr(self, hook, None):
77
                continue
78
            setattr(self, hook, lambda **kwargs: None)
79
80
    async def run(self) -> None:
81
        _logger.info(f"Starting Reconciliator loop, checking every {self.timer}sec")
82
        self.is_running = True
83
        while self.is_running:
84
85
            start = time.time()
86
            async with self.ha_client._url_to_reset_lock:
87
                await self.resubscribe()
88
            async with self.ha_client._ideal_map_lock:
89
                await self.reconciliate()
90
            await self.debug_status()
91
            stop = time.time() - start
92
            _logger.info(f"[TIME] Reconciliation: {stop:.2f}sec")
93
94
            if await event_wait(self.stop_event, self.timer):
95
                self.is_running = False
96
                break
97
98
    async def stop(self) -> None:
99
        self.stop_event.set()
100
101
    async def resubscribe(self) -> None:
102
        """
103
        Remove all the subscriptions from the real_map.
104
105
        Deleting them from the remote server would be
106
        helpless because they are tied to a deleted session,
107
        however they should eventually time out.
108
        """
109
        _logger.debug("In resubscribe")
110
        url_to_reset = self.ha_client.url_to_reset
111
        while url_to_reset:
112
            url = url_to_reset.pop()
113
            self.real_map[url].clear()
114
            if self.name_to_subscription.get(url):
115
                self.name_to_subscription.pop(url)
116
            if self.node_to_handle.get(url):
117
                self.node_to_handle.pop(url)
118
119
    async def reconciliate(self) -> None:
120
        """
121
        Identify the differences between the ideal and the real_map
122
        and take actual actions on the underlying OPC-UA objects.
123
124
        We only tries to reconciliate healthy clients, since most of the
125
        unhealthy clients will end up resubscribing and their map
126
        will be cleared anyway.
127
128
        Reconciliator steps are ordered this way:
129
130
          1 - Resubscribe newly reconnected clients
131
          2 - Identify gap with healthy client configurations
132
          3 - Remove/Add subscription
133
          4 - Add nodes to subscriptions
134
          5 - Update publishing/monitoring options
135
        """
136
137
        ideal_map = self.ha_client.ideal_map
138
        healthy, unhealthy = await self.ha_client.group_clients_by_health()
139
        async with self.ha_client._client_lock:
140
            valid_urls = {self.ha_client.clients[h].url for h in healthy}
141
        real_map = self.real_map
142
        try:
143
            targets = set()
144
            for url in valid_urls:
145
                digest_ideal = get_digest(ideal_map[url])
146
                digest_real = get_digest(real_map[url])
147
                if url not in real_map or digest_ideal != digest_real:
148
                    targets.add(url)
149
            if not targets:
150
                _logger.info(
151
                    f"[PASS] No configuration difference for healthy targets: {valid_urls}"
152
                )
153
                return
154
            _logger.info(
155
                f"[WORK] Configuration difference found for healthy targets: {targets}"
156
            )
157
        except (AttributeError, TypeError, PicklingError) as e:
158
            _logger.warning(f"[WORK] Reconciliator performance impacted: {e}")
159
            targets = set(valid_urls)
160
        # add missing and delete unsubscribed subs
161
        await self.update_subscriptions(real_map, ideal_map, targets)
162
        # add and remove nodes
163
        await self.update_nodes(real_map, ideal_map, targets)
164
        # look for missing options (publish/monitoring) for existing subs
165
        await self.update_subscription_modes(real_map, ideal_map, targets)
166
167
    async def update_subscriptions(self, real_map, ideal_map, targets: Set[str]):
168
        _logger.debug("In update_subscriptions")
169
        tasks_add = []
170
        tasks_del = []
171
        for url in targets:
172
            sub_to_del = set(real_map[url]) - set(ideal_map[url])
173
            if sub_to_del:
174
                _logger.info(f"Removing {len(sub_to_del)} subscriptions")
175
            for sub_name in sub_to_del:
176
                sub_handle = self.name_to_subscription[url][sub_name]
177
                task = self.loop.create_task(sub_handle.delete())
178
                task.add_done_callback(
179
                    partial(self.del_from_map, url, Method.DEL_SUB, sub_name=sub_name)
180
                )
181
                tasks_del.append(task)
182
183
            sub_to_add = set(ideal_map[url]) - set(real_map[url])
184
            if sub_to_add:
185
                _logger.info(f"Adding {len(sub_to_add)} subscriptions")
186
            client = self.ha_client.get_client_by_url(url)
187
            for sub_name in sub_to_add:
188
                vs = ideal_map[url][sub_name]
189
                task = self.loop.create_task(
190
                    client.create_subscription(
191
                        vs.period, vs.handler, publishing=vs.publishing
192
                    )
193
                )
194
                task.add_done_callback(
195
                    partial(
196
                        self.add_to_map,
197
                        url,
198
                        Method.ADD_SUB,
199
                        period=vs.period,
200
                        handler=vs.handler,
201
                        publishing=vs.publishing,
202
                        monitoring=vs.monitoring,
203
                        sub_name=sub_name,
204
                    )
205
                )
206
                tasks_add.append(task)
207
                # add the sub name to real_map
208
        await asyncio.gather(*tasks_add, return_exceptions=True)
209
        await asyncio.gather(*tasks_del, return_exceptions=True)
210
211
    async def update_nodes(self, real_map, ideal_map, targets: Set[str]):
212
        _logger.debug("In update_nodes")
213
        tasks_add = []
214
        tasks_del = []
215
        for url in targets:
216
            client = self.ha_client.get_client_by_url(url)
217
            for sub_name in ideal_map[url]:
218
                real_sub = self.name_to_subscription[url].get(sub_name)
219
                # in case the previous create_subscription request failed
220
                if not real_sub:
221
                    _logger.warning(
222
                        f"Can't create nodes for {url} since underlying subscription for {sub_name} doesn't exist"
223
                    )
224
                    continue
225
                vs_real = real_map[url][sub_name]
226
                vs_ideal = ideal_map[url][sub_name]
227
                node_to_del = set(vs_real.nodes) - set(vs_ideal.nodes)
228
                if node_to_del:
229
                    _logger.info(f"Removing {len(node_to_del)} Nodes")
230
                if node_to_del:
231
                    for batch_nodes in batch(node_to_del, self.BATCH_MI_SIZE):
232
                        node_handles = [
233
                            self.node_to_handle[url][node] for node in batch_nodes
234
                        ]
235
                        task = self.loop.create_task(real_sub.unsubscribe(node_handles))
236
                        task.add_done_callback(
237
                            partial(
238
                                self.del_from_map,
239
                                url,
240
                                Method.DEL_MI,
241
                                sub_name=sub_name,
242
                                nodes=batch_nodes,
243
                            )
244
                        )
245
                        tasks_del.append(task)
246
                # pyre-fixme[16]: `Reconciliator` has no attribute `hook_mi_request`.
247
                self.hook_mi_request(
248
                    url=url, sub_name=sub_name, nodes=node_to_del, action=Method.DEL_MI
249
                )
250
                monitoring = vs_real.monitoring
251
                node_to_add = set(vs_ideal.nodes) - set(vs_real.nodes)
252
                if node_to_add:
253
                    _logger.info(f"Adding {len(node_to_add)} Nodes")
254
                # hack to group subscription by NodeAttributes
255
                attr_to_nodes = defaultdict(list)
256
                for node in node_to_add:
257
                    node_attr = vs_ideal.nodes[node]
258
                    node_obj = client.get_node(node)
259
                    attr_to_nodes[node_attr].append(node_obj)
260
                for node_attr, nodes_obj in attr_to_nodes.items():
261
                    # some servers are sensitive to the number of MI per request
262
                    for batch_nodes_obj in batch(nodes_obj, self.BATCH_MI_SIZE):
263
                        task = self.loop.create_task(
264
                            real_sub.subscribe_data_change(
265
                                batch_nodes_obj,
266
                                *astuple(node_attr),
267
                                monitoring=monitoring,
268
                            )
269
                        )
270
                        nodes = [n.nodeid.to_string() for n in batch_nodes_obj]
271
                        task.add_done_callback(
272
                            partial(
273
                                self.add_to_map,
274
                                url,
275
                                Method.ADD_MI,
276
                                sub_name=sub_name,
277
                                nodes=nodes,
278
                                node_attr=node_attr,
279
                                monitoring=monitoring,
280
                            )
281
                        )
282
                        tasks_add.append(task)
283
                self.hook_mi_request(
284
                    url=url, sub_name=sub_name, nodes=node_to_add, action=Method.ADD_MI
285
                )
286
287
        await asyncio.gather(*tasks_del, return_exceptions=True)
288
        await asyncio.gather(*tasks_add, return_exceptions=True)
289
290
    async def update_subscription_modes(
291
        self, real_map, ideal_map, targets: Set[str]
292
    ) -> None:
293
        _logger.debug("In update_subscription_modes")
294
        modes = [Method.MONITORING, Method.PUBLISHING]
295
        methods = [n.value for n in modes]
296
        tasks = []
297
        for url in targets:
298
            for sub_name in real_map[url]:
299
                real_sub = self.name_to_subscription[url].get(sub_name)
300
                # in case the previous create_subscription request failed
301
                if not real_sub:
302
                    _logger.warning(
303
                        f"Can't change modes for {url} since underlying subscription for {sub_name} doesn't exist"
304
                    )
305
                    continue
306
                vs_real = real_map[url][sub_name]
307
                vs_ideal = ideal_map[url][sub_name]
308
                for action, func in zip(modes, methods):
309
                    attr = action.name.lower()
310
                    ideal_val = getattr(vs_ideal, attr)
311
                    real_val = getattr(vs_real, attr)
312
                    if ideal_val != real_val:
313
                        _logger.info(f"Changing {attr} for {sub_name} to {ideal_val}")
314
                        set_func = getattr(real_sub, func)
315
                        task = self.loop.create_task(set_func(ideal_val))
316
                        task.add_done_callback(
317
                            partial(
318
                                self.change_mode,
319
                                url,
320
                                action,
321
                                ideal_val,
322
                                sub_name=sub_name,
323
                            )
324
                        )
325
                        tasks.append(task)
326
        await asyncio.gather(*tasks, return_exceptions=True)
327
328
    def change_mode(
329
        self,
330
        url: str,
331
        action: Method,
332
        val: Union[bool, ua.MonitoringMode],
333
        fut: asyncio.Task,
334
        **kwargs,
335
    ):
336
        if fut.exception():
337
            _logger.warning(f"Can't {action.value} on {url}: {fut.exception()}")
338
            return
339
        sub_name = kwargs["sub_name"]
340
        vs = self.real_map[url][sub_name]
341
        setattr(vs, action.name.lower(), val)
342
343
    def add_to_map(self, url: str, action: Method, fut: asyncio.Task, **kwargs):
344
        if fut.exception():
345
            _logger.warning(f"Can't {action.value} on {url}: {fut.exception()}")
346
            # pyre-fixme[16]: `Reconciliator` has no attribute `hook_add_to_map_error`.
347
            self.hook_add_to_map_error(url=url, action=action, fut=fut, **kwargs)
348
            return
349
350
        sub_name = kwargs.pop("sub_name")
351
        if action == Method.ADD_SUB:
352
            vs = VirtualSubscription(**kwargs)
353
            self.real_map[url][sub_name] = vs
354
            self.name_to_subscription[url][sub_name] = fut.result()
355
356
        if action == Method.ADD_MI:
357
            nodes = kwargs["nodes"]
358
            vs = self.real_map[url][sub_name]
359
            vs.subscribe_data_change(nodes, *astuple(kwargs["node_attr"]))
360
            for node, handle in zip(nodes, fut.result()):
361
                if isinstance(handle, ua.StatusCode):
362
                    # a StatusCode is returned, the request has failed.
363
                    vs.unsubscribe([node])
364
                    _logger.info(f"Node {node} subscription failed: {handle}")
365
                    # The node is invalid, remove it from both maps
366
                    if handle.name == "BadNodeIdUnknown":
367
                        _logger.warning(
368
                            f"WARNING: Abandoning {node} because it returned {handle} from {url}"
369
                        )
370
                        real_vs = self.ha_client.ideal_map[url][sub_name]
371
                        real_vs.unsubscribe([node])
372
                    continue
373
                self.node_to_handle[url][node] = handle
374
            # pyre-fixme[16]: `Reconciliator` has no attribute `hook_on_subscribe`.
375
        self.hook_add_to_map(fut=fut, url=url, action=action, **kwargs)
376
377
    def del_from_map(self, url: str, action: Method, fut: asyncio.Task, **kwargs):
378
        if fut.exception():
379
            # log exception but continues to delete local resources
380
            _logger.warning(f"Can't {action.value} on {url}: {fut.exception()}")
381
        sub_name = kwargs["sub_name"]
382
383
        if action == Method.DEL_SUB:
384
            self.real_map[url].pop(sub_name)
385
            self.name_to_subscription[url].pop(sub_name)
386
            _logger.warning(f"In del_from_map del sub: {fut.result()}")
387
388
        if action == Method.DEL_MI:
389
            nodes = kwargs["nodes"]
390
            vs = self.real_map[url][sub_name]
391
            vs.unsubscribe(nodes)
392
            for node in nodes:
393
                self.node_to_handle[url].pop(node)
394
            # pyre-fixme[16]: `Reconciliator` has no attribute `hook_on_unsubscribe`.
395
        self.hook_del_from_map(fut=fut, url=url, action=action, **kwargs)
396
397
    async def debug_status(self):
398
        """
399
        Return the class attribute for troubleshooting purposes
400
        """
401
        for a in inspect.getmembers(self):
402
            if not a[0].startswith("__") and not inspect.ismethod(a[1]):
403
                _logger.debug(a)
404