Passed
Pull Request — master (#367)
by
unknown
02:49
created

ha_client.HaClient.get_serving_client()   B

Complexity

Conditions 6

Size

Total Lines 22
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 12
nop 3
dl 0
loc 22
rs 8.6666
c 0
b 0
f 0
1
import asyncio
2
import inspect
3
import logging
4
5
from dataclasses import dataclass, field
6
from enum import IntEnum
7
from functools import partial
8
from itertools import chain
9
from sortedcontainers import SortedDict
10
from asyncua import Node, ua, Client
11
from asyncua.client.ua_client import UASocketProtocol
12
from asyncua.ua.uaerrors import BadSessionClosed, BadSessionNotActivated
13
from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Type, Union
14
15
from .reconciliator import Reconciliator
16
from .utils import ClientNotFound, event_wait
17
from .virtual_subscription import TypeSubHandler, VirtualSubscription
18
from ...crypto.uacrypto import CertProperties
19
20
21
_logger = logging.getLogger(__name__)
22
23
24
class HaMode(IntEnum):
25
26
    # OPC UA Part 4 - 6.6.2.4.5.2 - Cold
27
    # Only connect to the active_client, failover is managed by
28
    # promoting another client of the pool to active_client
29
    COLD = 0
30
    # OPC UA Part 4 - 6.6.2.4.5.3 - Warm
31
    # Enable the active client similarly to the cold mode.
32
    # Secondary clients create the MonitoredItems,
33
    # but disable sampling and publishing.
34
    WARM = 1
35
    # OPC UA Part 4 - 6.6.2.4.5.4 - Hot A
36
    # Client connects to multiple Servers and establishes
37
    # subscription(s) in each where only one is Reporting;
38
    # the others are Sampling only.
39
    HOT_A = 2
40
    # OPC UA Part 4 - 6.6.2.4.5.4 - Hot B
41
    # Client connects to multiple Servers and establishes
42
    # subscription(s) in each where all subscriptions are Reporting.
43
    # The Client is responsible for handling/processing
44
    # multiple subscription streams concurrently.
45
    HOT_B = 3
46
47
48
class ConnectionStates(IntEnum):
49
    """
50
    OPC UA Part 4 - Services Release
51
    Section 6.6.2.4.2 ServiceLevel
52
    """
53
54
    IN_MAINTENANCE = 0
55
    NO_DATA = 1
56
    DEGRADED = 2
57
    HEALTHY = 200
58
59
60
@dataclass
61
class ServerInfo:
62
    url: str
63
    status: ConnectionStates = ConnectionStates(1)
64
65
66
@dataclass(frozen=True, eq=True)
67
class HaSecurityConfig:
68
    policy: Optional[Type[ua.SecurityPolicy]] = None
69
    certificate: Optional[CertProperties] = None
70
    private_key: Optional[CertProperties] = None
71
    server_certificate: Optional[CertProperties] = None
72
    mode: Optional[ua.MessageSecurityMode] = None
73
74
75
@dataclass(frozen=True, eq=True)
76
class HaConfig:
77
    """
78
    Parameters for the HaClient constructor.
79
    Timers and timeouts are all in seconds.
80
    """
81
82
    ha_mode: HaMode
83
    keepalive_timer: int = 15
84
    manager_timer: int = 15
85
    reconciliator_timer: int = 15
86
    session_timeout: int = 60
87
    request_timeout: int = 30
88
    session_name: str = "HaClient"
89
    urls: List[str] = field(default_factory=list)
90
91
92
class HaClient:
93
    """
94
    The HaClient is responsible for managing non-transparent server redundancy.
95
    The two servers must have:
96
        - Identical NodeIds
97
        - Identical browse path and AddressSpace structure
98
        - Identical Service Level logic
99
        - However nodes in the server local namespace can differ
100
        - Time synchronization (e.g NTP)
101
    It starts the OPC-UA clients and connect to the server that
102
    fits in the HaMode selected.
103
    """
104
105
    # Override this if your servers require custom ServiceLevels
106
    # i.e: You're using an OPC-UA proxy
107
    HEALTHY_STATE = ConnectionStates.HEALTHY
108
109
    def __init__(
110
        self, config: HaConfig, security: Optional[HaSecurityConfig] = None, loop=None
111
    ) -> None:
112
        self._config: HaConfig = config
113
        self._keepalive_task: Dict[KeepAlive, asyncio.Task] = {}
114
        self._manager_task: Dict[HaManager, asyncio.Task] = {}
115
        self._reconciliator_task: Dict[Reconciliator, asyncio.Task] = {}
116
        self._gen_sub: Generator[str, None, None] = self.generate_sub_name()
117
118
        self.loop: asyncio.unix_events._UnixSelectorEventLoop = (
119
            loop or asyncio.get_event_loop()
120
        )
121
        self._url_to_reset_lock = asyncio.Lock(loop=self.loop)
122
        self._ideal_map_lock: asyncio.Lock = asyncio.Lock(loop=self.loop)
123
        self._client_lock: asyncio.Lock = asyncio.Lock(loop=self.loop)
124
125
        self.clients: Dict[Client, ServerInfo] = {}
126
        self.active_client: Optional[Client] = None
127
        # full type: Dict[str, SortedDict[str, VirtualSubscription]]
128
        self.ideal_map: Dict[str, SortedDict] = {}
129
        self.sub_names: Set[str] = set()
130
        self.url_to_reset: List[str] = []
131
        self.is_running = False
132
133
        if config.ha_mode != HaMode.WARM:
134
            # TODO
135
            # Check if transparent redundancy support exist for the server (nodeid=2035)
136
            # and prevent using HaClient with such servers.
137
            raise NotImplementedError(
138
                f"{config.ha_mode} not currently supported by HaClient"
139
            )
140
141
        for url in self.urls:
142
            c = Client(url, timeout=self._config.request_timeout, loop=self.loop)
143
            # timeouts for the session and secure channel are in ms
144
            c.session_timeout = self._config.session_timeout * 1000
145
            c.secure_channel_timeout = self._config.request_timeout * 1000
146
            c.description = self._config.session_name
147
            server_info = ServerInfo(url)
148
            self.clients[c] = server_info
149
            self.ideal_map[url] = SortedDict()
150
151
        # security can also be set via the set_security method
152
        self.security_config: HaSecurityConfig = (
153
            security if security else HaSecurityConfig()
154
        )
155
        self.manager = HaManager(self, self._config.manager_timer)
156
        self.reconciliator = Reconciliator(self._config.reconciliator_timer, self)
157
158
    async def start(self) -> None:
159
        for client, server in self.clients.items():
160
            keepalive = KeepAlive(client, server, self._config.keepalive_timer)
161
            task = self.loop.create_task(keepalive.run())
162
            self._keepalive_task[keepalive] = task
163
164
        task = self.loop.create_task(self.manager.run())
165
        self._manager_task[self.manager] = task
166
167
        task = self.loop.create_task(self.reconciliator.run())
168
        self._reconciliator_task[self.reconciliator] = task
169
170
        self.is_running = True
171
172
    async def stop(self):
173
        to_stop = chain(
174
            self._keepalive_task, self._manager_task, self._reconciliator_task
175
        )
176
        stop = [p.stop() for p in to_stop]
177
178
        await asyncio.gather(*stop)
179
        disco = [c.disconnect() for c in self.clients]
180
        await asyncio.gather(*disco, return_exceptions=True)
181
182
        tasks = list(
183
            chain(
184
                self._keepalive_task.values(),
185
                self._manager_task.values(),
186
                self._reconciliator_task.values(),
187
            )
188
        )
189
190
        for task in tasks:
191
            task.cancel()
192
        for task in tasks:
193
            try:
194
                await task
195
            except asyncio.CancelledError:
196
                pass
197
        self.is_running = False
198
199
    def set_security(
200
        self,
201
        policy: Type[ua.SecurityPolicy],
202
        certificate: CertProperties,
203
        private_key: CertProperties,
204
        server_certificate: Optional[CertProperties] = None,
205
        mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt,
206
    ) -> None:
207
208
        self.security_config = HaSecurityConfig(
209
            policy, certificate, private_key, server_certificate, mode
210
        )
211
212
    async def create_subscription(self, period: int, handler: TypeSubHandler) -> str:
213
        async with self._ideal_map_lock:
214
            sub_name = next(self._gen_sub)
215
            for client in self.clients:
216
                if client == self.active_client:
217
                    vs = VirtualSubscription(
218
                        period=period,
219
                        handler=handler,
220
                        publishing=True,
221
                        monitoring=ua.MonitoringMode.Reporting,
222
                    )
223
                else:
224
                    vs = VirtualSubscription(
225
                        period=period,
226
                        handler=handler,
227
                        publishing=False,
228
                        monitoring=ua.MonitoringMode.Disabled,
229
                    )
230
                url = client.server_url.geturl()
231
                self.ideal_map[url][sub_name] = vs
232
            return sub_name
233
234
    async def subscribe_data_change(
235
        self,
236
        sub_name: str,
237
        nodes: Union[Iterable[Node], Iterable[str]],
238
        attr=ua.AttributeIds.Value,
239
        queuesize=0,
240
    ) -> None:
241
242
        async with self._ideal_map_lock:
243
            nodes = [n.nodeid.to_string() if isinstance(n, Node) else n for n in nodes]
244
            for url in self.urls:
245
                vs = self.ideal_map[url].get(sub_name)
246
                if not vs:
247
                    _logger.warning(
248
                        f"The subscription specified for the data_change: {sub_name} doesn't exist in ideal_map"
249
                    )
250
                    return
251
                vs.subscribe_data_change(nodes, attr, queuesize)
252
                await self.hook_on_subscribe(
253
                    nodes=nodes, attr=attr, queuesize=queuesize, url=url
254
                )
255
256
    async def delete_subscriptions(self, sub_names: List[str]) -> None:
257
        async with self._ideal_map_lock:
258
            for sub_name in sub_names:
259
                for url in self.urls:
260
                    if self.ideal_map[url].get(sub_name):
261
                        self.ideal_map[url].pop(sub_name)
262
                    else:
263
                        _logger.warning(
264
                            f"No subscription named {sub_name} in ideal_map"
265
                        )
266
                self.sub_names.remove(sub_name)
267
268
    async def reconnect(self, client: Client) -> None:
269
        """
270
        Reconnect a client of the HA set and
271
        add its URL to the reset list.
272
        """
273
        async with self._url_to_reset_lock:
274
            url = client.server_url.geturl()
275
            self.url_to_reset.append(url)
276
        try:
277
            await client.disconnect()
278
        except Exception:
279
            pass
280
        await self.hook_on_reconnect(client=client)
281
        if self.security_config.policy:
282
            await client.set_security(**self.security_config.__dict__)
283
        await client.connect()
284
285
    async def unsubscribe(self, nodes: Union[Iterable[Node], Iterable[str]]) -> None:
286
        async with self._ideal_map_lock:
287
            sub_to_nodes = {}
288
            first_url = self.urls[0]
289
            for sub_name, vs in self.ideal_map[first_url].items():
290
                node_set = {
291
                    n.nodeid.to_string() if isinstance(n, Node) else n for n in nodes
292
                }
293
                to_del = node_set & vs.get_nodes()
294
                if to_del:
295
                    sub_to_nodes[sub_name] = to_del
296
            for url in self.urls:
297
                for sub_name, str_nodes in sub_to_nodes.items():
298
                    vs = self.ideal_map[url][sub_name]
299
                    vs.unsubscribe(str_nodes)
300
                    await self.hook_on_unsubscribe(url=url, nodes=str_nodes)
301
302
    async def failover_warm(
303
        self, primary: Optional[Client], secondaries: Iterable[Client]
304
    ) -> None:
305
        async with self._ideal_map_lock:
306
            if primary:
307
                self._set_monitoring_mode(
308
                    ua.MonitoringMode.Reporting, clients=[primary]
309
                )
310
                self._set_publishing_mode(True, clients=[primary])
311
            self.active_client = primary
312
            self._set_monitoring_mode(ua.MonitoringMode.Disabled, clients=secondaries)
313
            self._set_publishing_mode(False, clients=secondaries)
314
315
    def _set_monitoring_mode(
316
        self, monitoring: ua.MonitoringMode, clients: Iterable[Client]
317
    ) -> None:
318
        for client in clients:
319
            url = client.server_url.geturl()
320
            for sub in self.ideal_map[url]:
321
                vs = self.ideal_map[url][sub]
322
                vs.monitoring = monitoring
323
324
    def _set_publishing_mode(self, publishing: bool, clients: Iterable[Client]) -> None:
325
        for client in clients:
326
            url = client.server_url.geturl()
327
            for sub in self.ideal_map[url]:
328
                vs = self.ideal_map[url][sub]
329
                vs.publishing = publishing
330
331
    async def group_clients_by_health(self) -> Tuple[List[Client], List[Client]]:
332
        healthy = []
333
        unhealthy = []
334
        async with self._client_lock:
335
            for client, server in self.clients.items():
336
                if server.status >= self.HEALTHY_STATE:
337
                    healthy.append(client)
338
                else:
339
                    unhealthy.append(client)
340
            return healthy, unhealthy
341
342
    async def get_serving_client(
343
        self, clients: List[Client], serving_client: Optional[Client]
344
    ) -> Optional[Client]:
345
        """
346
        Returns the client with the higher service level.
347
348
        The service level reference is taken from the active_client,
349
        thus we prevent failing over when mutliple clients
350
        return the same number.
351
        """
352
        async with self._client_lock:
353
            if serving_client:
354
                max_slevel = self.clients[serving_client].status
355
            else:
356
                max_slevel = ConnectionStates.NO_DATA
357
358
            for c in clients:
359
                c_slevel = self.clients[c].status
360
                if c_slevel > max_slevel:
361
                    serving_client = c
362
                    max_slevel = c_slevel
363
            return serving_client if max_slevel >= self.HEALTHY_STATE else None
364
365
    async def debug_status(self):
366
        """
367
        Return the class attribute for troubleshooting purposes
368
        """
369
        for a in inspect.getmembers(self):
370
            if not a[0].startswith("__") and not inspect.ismethod(a[1]):
371
                _logger.debug(a)
372
373
    def get_client_warm_mode(self) -> List[Client]:
374
        return list(self.clients)
375
376
    def get_clients(self) -> List[Client]:
377
        ha_mode = self.ha_mode
378
        func = f"get_client_{ha_mode}_mode"
379
        get_clients = getattr(self, func)
380
        active_clients = get_clients()
381
        if not isinstance(active_clients, list):
382
            active_clients = [active_clients]
383
        return active_clients
384
385
    def get_client_by_url(self, url) -> Client:
386
        for client, srv_info in self.clients.items():
387
            if srv_info.url == url:
388
                return client
389
        raise ClientNotFound(f"{url} not managed by HaClient")
390
391
    @property
392
    def session_timeout(self) -> int:
393
        return self._config.session_timeout
394
395
    @property
396
    def ha_mode(self) -> str:
397
        return self._config.ha_mode.name.lower()
398
399
    @property
400
    def urls(self) -> List[str]:
401
        return self._config.urls
402
403
    def generate_sub_name(self) -> Generator[str, None, None]:
404
        """
405
        Asyncio unsafe - yield names for subscriptions.
406
        """
407
        while True:
408
            for i in range(9999):
409
                sub_name = f"sub_{i}"
410
                if sub_name in self.sub_names:
411
                    continue
412
                self.sub_names.add(sub_name)
413
                yield sub_name
414
415
    async def hook_on_reconnect(self, **kwargs):
416
        pass
417
418
    async def hook_on_subscribe(self, **kwargs):
419
        pass
420
421
    async def hook_on_unsubscribe(self, **kwargs):
422
        pass
423
424
425
class KeepAlive:
426
    """
427
    Ping the server status regularly to check its health
428
    """
429
430
    def __init__(self, client, server, timer) -> None:
431
        self.client: Client = client
432
        self.server: ServerInfo = server
433
        self.timer: int = timer
434
        self.stop_event: asyncio.locks.Event = asyncio.Event()
435
        self.is_running: bool = False
436
437
    async def stop(self) -> None:
438
        self.stop_event.set()
439
440
    async def run(self) -> None:
441
        status_node = self.client.nodes.server_state
442
        slevel_node = self.client.nodes.service_level
443
        server_info = self.server
444
        client = self.client
445
        # wait for HaManager to connect clients
446
        await asyncio.sleep(3)
447
        self.is_running = True
448
        _logger.info(
449
            f"Starting keepalive loop for {server_info.url}, checking every {self.timer}sec"
450
        )
451
        while self.is_running:
452
            try:
453
                status, slevel = await client.read_values([status_node, slevel_node])
454
                if status != ua.ServerState.Running:
455
                    _logger.info("ServerState is not running")
456
                    server_info.status = ConnectionStates.NO_DATA
457
                else:
458
                    server_info.status = slevel
459
            except BadSessionNotActivated:
460
                _logger.warning("Session is not yet activated.")
461
                server_info.status = ConnectionStates.NO_DATA
462
            except BadSessionClosed:
463
                _logger.warning("Session is closed.")
464
                server_info.status = ConnectionStates.NO_DATA
465
            except (asyncio.TimeoutError, asyncio.CancelledError):
466
                _logger.warning("Timeout when fetching state")
467
                server_info.status = ConnectionStates.NO_DATA
468
            except Exception:
469
                _logger.exception("Unknown exception during keepalive liveness check")
470
                server_info.status = ConnectionStates.NO_DATA
471
472
            _logger.info(f"ServiceLevel for {server_info.url}: {server_info.status}")
473
            if await event_wait(self.stop_event, self.timer):
474
                self.is_running = False
475
                break
476
477
478
class HaManager:
479
    """
480
    The manager handles individual client connections
481
    according to the selected HaMode
482
    """
483
484
    def __init__(self, ha_client: HaClient, timer: Optional[int] = None) -> None:
485
486
        self.ha_client = ha_client
487
        self.loop = ha_client.loop
488
        self.timer = self.set_loop_timer(timer)
489
        self.stop_event = asyncio.Event(loop=self.loop)
490
        self.is_running = False
491
492
    def set_loop_timer(self, timer: Optional[int]):
493
        return timer if timer else int(self.ha_client.session_timeout)
494
495
    async def run(self) -> None:
496
        ha_mode = self.ha_client.ha_mode
497
        update_func = f"update_state_{ha_mode}"
498
        update_state = getattr(self, update_func)
499
        reco_func = f"reconnect_{ha_mode}"
500
        reconnect = getattr(self, reco_func)
501
        self.is_running = True
502
503
        _logger.info(f"Starting HaManager loop, checking every {self.timer}sec")
504
        while self.is_running:
505
506
            # failover happens here
507
            await update_state()
508
            await reconnect()
509
            await self.ha_client.debug_status()
510
511
            if await event_wait(self.stop_event, self.timer):
512
                self.is_running = False
513
                break
514
515
    async def stop(self) -> None:
516
        self.stop_event.set()
517
518
    async def update_state_warm(self) -> None:
519
        active_client = self.ha_client.active_client
520
        clients = self.ha_client.get_clients()
521
        primary_client = await self.ha_client.get_serving_client(
522
            list(self.ha_client.clients), active_client
523
        )
524
        if primary_client != active_client:
525
            # disable monitoring and reporting when the service_level goes below 200
526
            _logger.info(
527
                f"Failing over active client from {active_client} to {primary_client}"
528
            )
529
            secondaries = (
530
                set(clients) - {primary_client} if primary_client else set(clients)
531
            )
532
            await self.ha_client.failover_warm(
533
                primary=primary_client, secondaries=secondaries
534
            )
535
536
    async def reconnect_warm(self) -> None:
537
        """
538
        Reconnect disconnected clients
539
        """
540
        healthy, unhealthy = await self.ha_client.group_clients_by_health()
541
542
        async def reco_resub(client: Client, force: bool):
543
            if (
544
                force
545
                or not client.uaclient.protocol
546
                or client.uaclient.protocol
547
                # pyre-fixme[16]: `Optional` has no attribute `state`.
548
                and client.uaclient.protocol.state == UASocketProtocol.CLOSED
549
            ):
550
                _logger.info(f"Virtually reconnecting and resubscribing {client}")
551
                await self.ha_client.reconnect(client=client)
552
553
        def log_exception(client: Client, fut: asyncio.Task):
554
            if fut.exception():
555
                _logger.warning(f"Error when reconnecting {client}: {fut.exception()}")
556
557
        tasks = []
558
        for client in healthy:
559
            task = self.loop.create_task(reco_resub(client, force=False))
560
            task.add_done_callback(partial(log_exception, client))
561
            tasks.append(task)
562
        for client in unhealthy:
563
            task = self.loop.create_task(reco_resub(client, force=True))
564
            task.add_done_callback(partial(log_exception, client))
565
            tasks.append(task)
566
        await asyncio.gather(*tasks, return_exceptions=True)
567