Passed
Pull Request — master (#367)
by
unknown
02:46
created

ha_client.HaClient.subscribe_data_change()   A

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 17
nop 5
dl 0
loc 20
rs 9.0833
c 0
b 0
f 0
1
from __future__ import annotations
2
3
import asyncio
4
import inspect
5
import logging
6
7
from dataclasses import dataclass, field
8
from enum import IntEnum
9
from functools import partial
10
from itertools import chain
11
from sortedcontainers import SortedDict
12
from asyncua import Node, ua, Client
13
from asyncua.client.ua_client import UASocketProtocol
14
from asyncua.ua.uaerrors import BadSessionClosed, BadSessionNotActivated
15
from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Type, Union
16
17
from .reconciliator import Reconciliator
18
from .utils import ClientNotFound, event_wait
19
from .virtual_subscription import TypeSubHandler, VirtualSubscription
20
from ...crypto.uacrypto import CertProperties
21
22
23
_logger = logging.getLogger(__name__)
24
25
26
class HaMode(IntEnum):
27
28
    # OPC UA Part 4 - 6.6.2.4.5.2 - Cold
29
    # Only connect to the active_client, failover is managed by
30
    # promoting another client of the pool to active_client
31
    COLD = 0
32
    # OPC UA Part 4 - 6.6.2.4.5.3 - Warm
33
    # Enable the active client similarly to the cold mode.
34
    # Secondary clients create the MonitoredItems,
35
    # but disable sampling and publishing.
36
    WARM = 1
37
    # OPC UA Part 4 - 6.6.2.4.5.4 - Hot A
38
    # Client connects to multiple Servers and establishes
39
    # subscription(s) in each where only one is Reporting;
40
    # the others are Sampling only.
41
    HOT_A = 2
42
    # OPC UA Part 4 - 6.6.2.4.5.4 - Hot B
43
    # Client connects to multiple Servers and establishes
44
    # subscription(s) in each where all subscriptions are Reporting.
45
    # The Client is responsible for handling/processing
46
    # multiple subscription streams concurrently.
47
    HOT_B = 3
48
49
50
class ConnectionStates(IntEnum):
51
    """
52
    OPC UA Part 4 - Services Release
53
    Section 6.6.2.4.2 ServiceLevel
54
    """
55
56
    IN_MAINTENANCE = 0
57
    NO_DATA = 1
58
    DEGRADED = 2
59
    HEALTHY = 200
60
61
62
@dataclass
63
class ServerInfo:
64
    url: str
65
    status: ConnectionStates = ConnectionStates(1)
66
67
68
@dataclass(frozen=True, eq=True)
69
class HaSecurityConfig:
70
    policy: Optional[Type[ua.SecurityPolicy]] = None
71
    certificate: Optional[CertProperties] = None
72
    private_key: Optional[CertProperties] = None
73
    server_certificate: Optional[CertProperties] = None
74
    mode: Optional[ua.MessageSecurityMode] = None
75
76
77
@dataclass(frozen=True, eq=True)
78
class HaConfig:
79
    """
80
    Parameters for the HaClient constructor.
81
    Timers and timeouts are all in seconds.
82
    """
83
84
    ha_mode: HaMode
85
    keepalive_timer: int = 15
86
    manager_timer: int = 15
87
    reconciliator_timer: int = 15
88
    session_timeout: int = 60
89
    request_timeout: int = 30
90
    session_name: str = "HaClient"
91
    urls: List[str] = field(default_factory=list)
92
93
94
class HaClient:
95
    """
96
    The HaClient is responsible for managing non-transparent server redundancy.
97
    The two servers must have:
98
        - Identical NodeIds
99
        - Identical browse path and AddressSpace structure
100
        - Identical Service Level logic
101
        - However nodes in the server local namespace can differ
102
        - Time synchronization (e.g NTP)
103
    It starts the OPC-UA clients and connect to the server that
104
    fits in the HaMode selected.
105
    """
106
107
    # Override this if your servers require custom ServiceLevels
108
    # i.e: You're using an OPC-UA proxy
109
    HEALTHY_STATE = ConnectionStates.HEALTHY
110
111
    def __init__(
112
        self, config: HaConfig, security: Optional[HaSecurityConfig] = None, loop=None
113
    ) -> None:
114
        self._config: HaConfig = config
115
        self._keepalive_task: Dict[KeepAlive, asyncio.Task] = {}
116
        self._manager_task: Dict[HaManager, asyncio.Task] = {}
117
        self._reconciliator_task: Dict[Reconciliator, asyncio.Task] = {}
118
        self._gen_sub: Generator[str, None, None] = self.generate_sub_name()
119
120
        self.loop: asyncio.unix_events._UnixSelectorEventLoop = (
121
            loop or asyncio.get_event_loop()
122
        )
123
        self._url_to_reset_lock = asyncio.Lock(loop=self.loop)
124
        self._ideal_map_lock: asyncio.Lock = asyncio.Lock(loop=self.loop)
125
        self._client_lock: asyncio.Lock = asyncio.Lock(loop=self.loop)
126
127
        self.clients: Dict[Client, ServerInfo] = {}
128
        self.active_client: Optional[Client] = None
129
        # full type: Dict[str, SortedDict[str, VirtualSubscription]]
130
        self.ideal_map: Dict[str, SortedDict] = {}
131
        self.sub_names: Set[str] = set()
132
        self.url_to_reset: List[str] = []
133
        self.is_running = False
134
135
        if config.ha_mode != HaMode.WARM:
136
            # TODO
137
            # Check if transparent redundancy support exist for the server (nodeid=2035)
138
            # and prevent using HaClient with such servers.
139
            raise NotImplementedError(
140
                f"{config.ha_mode} not currently supported by HaClient"
141
            )
142
143
        for url in self.urls:
144
            c = Client(url, timeout=self._config.request_timeout, loop=self.loop)
145
            # timeouts for the session and secure channel are in ms
146
            c.session_timeout = self._config.session_timeout * 1000
147
            c.secure_channel_timeout = self._config.request_timeout * 1000
148
            c.description = self._config.session_name
149
            server_info = ServerInfo(url)
150
            self.clients[c] = server_info
151
            self.ideal_map[url] = SortedDict()
152
153
        # security can also be set via the set_security method
154
        self.security_config: HaSecurityConfig = (
155
            security if security else HaSecurityConfig()
156
        )
157
        self.manager = HaManager(self, self._config.manager_timer)
158
        self.reconciliator = Reconciliator(self._config.reconciliator_timer, self)
159
160
    async def start(self) -> None:
161
        for client, server in self.clients.items():
162
            keepalive = KeepAlive(client, server, self._config.keepalive_timer)
163
            task = self.loop.create_task(keepalive.run())
164
            self._keepalive_task[keepalive] = task
165
166
        task = self.loop.create_task(self.manager.run())
167
        self._manager_task[self.manager] = task
168
169
        task = self.loop.create_task(self.reconciliator.run())
170
        self._reconciliator_task[self.reconciliator] = task
171
172
        self.is_running = True
173
174
    async def stop(self):
175
        to_stop = chain(
176
            self._keepalive_task, self._manager_task, self._reconciliator_task
177
        )
178
        stop = [p.stop() for p in to_stop]
179
180
        await asyncio.gather(*stop)
181
        disco = [c.disconnect() for c in self.clients]
182
        await asyncio.gather(*disco, return_exceptions=True)
183
184
        tasks = list(
185
            chain(
186
                self._keepalive_task.values(),
187
                self._manager_task.values(),
188
                self._reconciliator_task.values(),
189
            )
190
        )
191
192
        for task in tasks:
193
            task.cancel()
194
        for task in tasks:
195
            try:
196
                await task
197
            except asyncio.CancelledError:
198
                pass
199
        self.is_running = False
200
201
    def set_security(
202
        self,
203
        policy: Type[ua.SecurityPolicy],
204
        certificate: CertProperties,
205
        private_key: CertProperties,
206
        server_certificate: Optional[CertProperties] = None,
207
        mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt,
208
    ) -> None:
209
210
        self.security_config = HaSecurityConfig(
211
            policy, certificate, private_key, server_certificate, mode
212
        )
213
214
    async def create_subscription(self, period: int, handler: TypeSubHandler) -> str:
215
        async with self._ideal_map_lock:
216
            sub_name = next(self._gen_sub)
217
            for client in self.clients:
218
                if client == self.active_client:
219
                    vs = VirtualSubscription(
220
                        period=period,
221
                        handler=handler,
222
                        publishing=True,
223
                        monitoring=ua.MonitoringMode.Reporting,
224
                    )
225
                else:
226
                    vs = VirtualSubscription(
227
                        period=period,
228
                        handler=handler,
229
                        publishing=False,
230
                        monitoring=ua.MonitoringMode.Disabled,
231
                    )
232
                url = client.server_url.geturl()
233
                self.ideal_map[url][sub_name] = vs
234
            return sub_name
235
236
    async def subscribe_data_change(
237
        self,
238
        sub_name: str,
239
        nodes: Union[Iterable[Node], Iterable[str]],
240
        attr=ua.AttributeIds.Value,
241
        queuesize=0,
242
    ) -> None:
243
244
        async with self._ideal_map_lock:
245
            nodes = [n.nodeid.to_string() if isinstance(n, Node) else n for n in nodes]
246
            for url in self.urls:
247
                vs = self.ideal_map[url].get(sub_name)
248
                if not vs:
249
                    _logger.warning(
250
                        f"The subscription specified for the data_change: {sub_name} doesn't exist in ideal_map"
251
                    )
252
                    return
253
                vs.subscribe_data_change(nodes, attr, queuesize)
254
                await self.hook_on_subscribe(
255
                    nodes=nodes, attr=attr, queuesize=queuesize, url=url
256
                )
257
258
    async def delete_subscriptions(self, sub_names: List[str]) -> None:
259
        async with self._ideal_map_lock:
260
            for sub_name in sub_names:
261
                for url in self.urls:
262
                    if self.ideal_map[url].get(sub_name):
263
                        self.ideal_map[url].pop(sub_name)
264
                    else:
265
                        _logger.warning(
266
                            f"No subscription named {sub_name} in ideal_map"
267
                        )
268
                self.sub_names.remove(sub_name)
269
270
    async def reconnect(self, client: Client) -> None:
271
        """
272
        Reconnect a client of the HA set and
273
        add its URL to the reset list.
274
        """
275
        async with self._url_to_reset_lock:
276
            url = client.server_url.geturl()
277
            self.url_to_reset.append(url)
278
        try:
279
            await client.disconnect()
280
        except Exception:
281
            pass
282
        await self.hook_on_reconnect(client=client)
283
        if self.security_config.policy:
284
            await client.set_security(**self.security_config.__dict__)
285
        await client.connect()
286
287
    async def unsubscribe(self, nodes: Union[Iterable[Node], Iterable[str]]) -> None:
288
        async with self._ideal_map_lock:
289
            sub_to_nodes = {}
290
            first_url = self.urls[0]
291
            for sub_name, vs in self.ideal_map[first_url].items():
292
                node_set = {
293
                    n.nodeid.to_string() if isinstance(n, Node) else n for n in nodes
294
                }
295
                to_del = node_set & vs.get_nodes()
296
                if to_del:
297
                    sub_to_nodes[sub_name] = to_del
298
            for url in self.urls:
299
                for sub_name, str_nodes in sub_to_nodes.items():
300
                    vs = self.ideal_map[url][sub_name]
301
                    vs.unsubscribe(str_nodes)
302
                    await self.hook_on_unsubscribe(url=url, nodes=str_nodes)
303
304
    async def failover_warm(
305
        self, primary: Optional[Client], secondaries: Iterable[Client]
306
    ) -> None:
307
        async with self._ideal_map_lock:
308
            if primary:
309
                self._set_monitoring_mode(
310
                    ua.MonitoringMode.Reporting, clients=[primary]
311
                )
312
                self._set_publishing_mode(True, clients=[primary])
313
            self.active_client = primary
314
            self._set_monitoring_mode(ua.MonitoringMode.Disabled, clients=secondaries)
315
            self._set_publishing_mode(False, clients=secondaries)
316
317
    def _set_monitoring_mode(
318
        self, monitoring: ua.MonitoringMode, clients: Iterable[Client]
319
    ) -> None:
320
        for client in clients:
321
            url = client.server_url.geturl()
322
            for sub in self.ideal_map[url]:
323
                vs = self.ideal_map[url][sub]
324
                vs.monitoring = monitoring
325
326
    def _set_publishing_mode(self, publishing: bool, clients: Iterable[Client]) -> None:
327
        for client in clients:
328
            url = client.server_url.geturl()
329
            for sub in self.ideal_map[url]:
330
                vs = self.ideal_map[url][sub]
331
                vs.publishing = publishing
332
333
    async def group_clients_by_health(self) -> Tuple[List[Client], List[Client]]:
334
        healthy = []
335
        unhealthy = []
336
        async with self._client_lock:
337
            for client, server in self.clients.items():
338
                if server.status >= self.HEALTHY_STATE:
339
                    healthy.append(client)
340
                else:
341
                    unhealthy.append(client)
342
            return healthy, unhealthy
343
344
    async def get_serving_client(
345
        self, clients: List[Client], serving_client: Optional[Client]
346
    ) -> Optional[Client]:
347
        """
348
        Returns the client with the higher service level.
349
350
        The service level reference is taken from the active_client,
351
        thus we prevent failing over when mutliple clients
352
        return the same number.
353
        """
354
        async with self._client_lock:
355
            if serving_client:
356
                max_slevel = self.clients[serving_client].status
357
            else:
358
                max_slevel = ConnectionStates.NO_DATA
359
360
            for c in clients:
361
                c_slevel = self.clients[c].status
362
                if c_slevel > max_slevel:
363
                    serving_client = c
364
                    max_slevel = c_slevel
365
            return serving_client if max_slevel >= self.HEALTHY_STATE else None
366
367
    async def debug_status(self):
368
        """
369
        Return the class attribute for troubleshooting purposes
370
        """
371
        for a in inspect.getmembers(self):
372
            if not a[0].startswith("__") and not inspect.ismethod(a[1]):
373
                _logger.debug(a)
374
375
    def get_client_warm_mode(self) -> List[Client]:
376
        return list(self.clients)
377
378
    def get_clients(self) -> List[Client]:
379
        ha_mode = self.ha_mode
380
        func = f"get_client_{ha_mode}_mode"
381
        get_clients = getattr(self, func)
382
        active_clients = get_clients()
383
        if not isinstance(active_clients, list):
384
            active_clients = [active_clients]
385
        return active_clients
386
387
    def get_client_by_url(self, url) -> Client:
388
        for client, srv_info in self.clients.items():
389
            if srv_info.url == url:
390
                return client
391
        raise ClientNotFound(f"{url} not managed by HaClient")
392
393
    @property
394
    def session_timeout(self) -> int:
395
        return self._config.session_timeout
396
397
    @property
398
    def ha_mode(self) -> str:
399
        return self._config.ha_mode.name.lower()
400
401
    @property
402
    def urls(self) -> List[str]:
403
        return self._config.urls
404
405
    def generate_sub_name(self) -> Generator[str, None, None]:
406
        """
407
        Asyncio unsafe - yield names for subscriptions.
408
        """
409
        while True:
410
            for i in range(9999):
411
                sub_name = f"sub_{i}"
412
                if sub_name in self.sub_names:
413
                    continue
414
                self.sub_names.add(sub_name)
415
                yield sub_name
416
417
    async def hook_on_reconnect(self, **kwargs):
418
        pass
419
420
    async def hook_on_subscribe(self, **kwargs):
421
        pass
422
423
    async def hook_on_unsubscribe(self, **kwargs):
424
        pass
425
426
427
class KeepAlive:
428
    """
429
    Ping the server status regularly to check its health
430
    """
431
432
    def __init__(self, client, server, timer) -> None:
433
        self.client: Client = client
434
        self.server: ServerInfo = server
435
        self.timer: int = timer
436
        self.stop_event: asyncio.locks.Event = asyncio.Event()
437
        self.is_running: bool = False
438
439
    async def stop(self) -> None:
440
        self.stop_event.set()
441
442
    async def run(self) -> None:
443
        status_node = self.client.nodes.server_state
444
        slevel_node = self.client.nodes.service_level
445
        server_info = self.server
446
        client = self.client
447
        # wait for HaManager to connect clients
448
        await asyncio.sleep(3)
449
        self.is_running = True
450
        _logger.info(
451
            f"Starting keepalive loop for {server_info.url}, checking every {self.timer}sec"
452
        )
453
        while self.is_running:
454
            try:
455
                status, slevel = await client.read_values([status_node, slevel_node])
456
                if status != ua.ServerState.Running:
457
                    _logger.info("ServerState is not running")
458
                    server_info.status = ConnectionStates.NO_DATA
459
                else:
460
                    server_info.status = slevel
461
            except BadSessionNotActivated:
462
                _logger.warning("Session is not yet activated.")
463
                server_info.status = ConnectionStates.NO_DATA
464
            except BadSessionClosed:
465
                _logger.warning("Session is closed.")
466
                server_info.status = ConnectionStates.NO_DATA
467
            except (asyncio.TimeoutError, asyncio.CancelledError):
468
                _logger.warning("Timeout when fetching state")
469
                server_info.status = ConnectionStates.NO_DATA
470
            except Exception:
471
                _logger.exception("Unknown exception during keepalive liveness check")
472
                server_info.status = ConnectionStates.NO_DATA
473
474
            _logger.info(f"ServiceLevel for {server_info.url}: {server_info.status}")
475
            if await event_wait(self.stop_event, self.timer):
476
                self.is_running = False
477
                break
478
479
480
class HaManager:
481
    """
482
    The manager handles individual client connections
483
    according to the selected HaMode
484
    """
485
486
    def __init__(self, ha_client: HaClient, timer: Optional[int] = None) -> None:
487
488
        self.ha_client = ha_client
489
        self.loop = ha_client.loop
490
        self.timer = self.set_loop_timer(timer)
491
        self.stop_event = asyncio.Event(loop=self.loop)
492
        self.is_running = False
493
494
    def set_loop_timer(self, timer: Optional[int]):
495
        return timer if timer else int(self.ha_client.session_timeout)
496
497
    async def run(self) -> None:
498
        ha_mode = self.ha_client.ha_mode
499
        update_func = f"update_state_{ha_mode}"
500
        update_state = getattr(self, update_func)
501
        reco_func = f"reconnect_{ha_mode}"
502
        reconnect = getattr(self, reco_func)
503
        self.is_running = True
504
505
        _logger.info(f"Starting HaManager loop, checking every {self.timer}sec")
506
        while self.is_running:
507
508
            # failover happens here
509
            await update_state()
510
            await reconnect()
511
            await self.ha_client.debug_status()
512
513
            if await event_wait(self.stop_event, self.timer):
514
                self.is_running = False
515
                break
516
517
    async def stop(self) -> None:
518
        self.stop_event.set()
519
520
    async def update_state_warm(self) -> None:
521
        active_client = self.ha_client.active_client
522
        clients = self.ha_client.get_clients()
523
        primary_client = await self.ha_client.get_serving_client(
524
            list(self.ha_client.clients), active_client
525
        )
526
        if primary_client != active_client:
527
            # disable monitoring and reporting when the service_level goes below 200
528
            _logger.info(
529
                f"Failing over active client from {active_client} to {primary_client}"
530
            )
531
            secondaries = (
532
                set(clients) - {primary_client} if primary_client else set(clients)
533
            )
534
            await self.ha_client.failover_warm(
535
                primary=primary_client, secondaries=secondaries
536
            )
537
538
    async def reconnect_warm(self) -> None:
539
        """
540
        Reconnect disconnected clients
541
        """
542
        healthy, unhealthy = await self.ha_client.group_clients_by_health()
543
544
        async def reco_resub(client: Client, force: bool):
545
            if (
546
                force
547
                or not client.uaclient.protocol
548
                or client.uaclient.protocol
549
                # pyre-fixme[16]: `Optional` has no attribute `state`.
550
                and client.uaclient.protocol.state == UASocketProtocol.CLOSED
551
            ):
552
                _logger.info(f"Virtually reconnecting and resubscribing {client}")
553
                await self.ha_client.reconnect(client=client)
554
555
        def log_exception(client: Client, fut: asyncio.Task):
556
            if fut.exception():
557
                _logger.warning(f"Error when reconnecting {client}: {fut.exception()}")
558
559
        tasks = []
560
        for client in healthy:
561
            task = self.loop.create_task(reco_resub(client, force=False))
562
            task.add_done_callback(partial(log_exception, client))
563
            tasks.append(task)
564
        for client in unhealthy:
565
            task = self.loop.create_task(reco_resub(client, force=True))
566
            task.add_done_callback(partial(log_exception, client))
567
            tasks.append(task)
568
        await asyncio.gather(*tasks, return_exceptions=True)
569
570