Completed
Push — master ( 229cac...232edf )
by Thomas
10:56
created

lib/exabgp/application/healthcheck.py (1 issue)

Severity
1
#!/usr/bin/env python3
2
3
"""Healthchecker for exabgp.
4
5
This program is to be used as a process for exabgp. It will announce
6
some VIP depending on the state of a check whose a third-party program
7
wrapped by this program.
8
9
To use, declare this program as a process in your
10
:file:`/etc/exabgp/exabgp.conf`::
11
12
    neighbor 192.0.2.1 {
13
       router-id 192.0.2.2;
14
       local-as 64496;
15
       peer-as 64497;
16
    }
17
    process watch-haproxy {
18
       run python -m exabgp healthcheck --cmd "curl -sf http://127.0.0.1/healthcheck" --label haproxy;
19
    }
20
    process watch-mysql {
21
       run python -m exabgp healthcheck --cmd "mysql -u check -e 'SELECT 1'" --label mysql;
22
    }
23
24
Use :option:`--help` to get options accepted by this program. A
25
configuration file is also possible. Such a configuration file looks
26
like this::
27
28
     debug
29
     name = haproxy
30
     interval = 10
31
     fast-interval = 1
32
     command = curl -sf http://127.0.0.1/healthcheck
33
34
The left-part of each line is the corresponding long option.
35
36
When using label for loopback selection, the provided value should
37
match the beginning of the label without the interface prefix. In the
38
example above, this means that you should have addresses on lo
39
labelled ``lo:haproxy1``, ``lo:haproxy2``, etc.
40
41
"""
42
43
import sys
44
import os
45
import subprocess
46
import re
47
import logging
48
import logging.handlers
49
import argparse
50
import signal
51
import time
52
import collections
53
54
from ipaddress import ip_network
55
from ipaddress import ip_address
56
57
58
logger = logging.getLogger("healthcheck")
59
60
61
def enum(*sequential):
62
    """Create a simple enumeration."""
63
    return type(str("Enum"), (), dict(zip(sequential, sequential)))
64
65
66
def parse():
67
    """Parse arguments"""
68
    formatter = argparse.RawDescriptionHelpFormatter
69
    parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__, formatter_class=formatter)
70
71
    # fmt: off
72
    g = parser.add_mutually_exclusive_group()
73
    g.add_argument("--debug", "-d", action="store_true", default=False, help="enable debugging")
74
    g.add_argument("--no-ack", "-a", action="store_true", default=False, help="set for exabgp 3.4 or 4.x when exabgp.api.ack=false")
75
    g.add_argument("--silent", "-s", action="store_true", default=False, help="don't log to console")
76
    g.add_argument("--syslog-facility", "-sF", metavar="FACILITY", nargs='?', const="daemon", default="daemon", help="log to syslog using FACILITY, default FACILITY is daemon")
77
    g.add_argument("--sudo", action="store_true", default=False, help="use sudo to setup ip addresses")
78
    g.add_argument("--no-syslog", action="store_true", help="disable syslog logging")
79
    parser.add_argument("--name", "-n", metavar="NAME", help="name for this healthchecker")
80
    parser.add_argument("--config", "-F", metavar="FILE", type=open, help="read configuration from a file")
81
    parser.add_argument("--pid", "-p", metavar="FILE", type=argparse.FileType('w'), help="write PID to the provided file")
82
    parser.add_argument("--user", metavar="USER", help="set user after setting loopback addresses")
83
    parser.add_argument("--group", metavar="GROUP", help="set group after setting loopback addresses")
84
85
    g = parser.add_argument_group("checking healthiness")
86
    g.add_argument("--interval", "-i", metavar='N', default=5, type=float, help="wait N seconds between each healthcheck (zero to exit after first announcement)")
87
    g.add_argument("--fast-interval", "-f", metavar='N', default=1, type=float, dest="fast", help="when a state change is about to occur, wait N seconds between each healthcheck")
88
    g.add_argument("--timeout", "-t", metavar='N', default=5, type=int, help="wait N seconds for the check command to execute")
89
    g.add_argument("--rise", metavar='N', default=3, type=int, help="check N times before considering the service up")
90
    g.add_argument("--fall", metavar='N', default=3, type=int, help="check N times before considering the service down")
91
    g.add_argument("--disable", metavar='FILE', type=str, help="if FILE exists, the service is considered disabled")
92
    g.add_argument("--command", "--cmd", "-c", metavar='CMD', type=str, help="command to use for healthcheck")
93
94
    g = parser.add_argument_group("advertising options")
95
    g.add_argument("--next-hop", "-N", metavar='IP', type=ip_address, help="self IP address to use as next hop")
96
    g.add_argument("--ip", metavar='IP', type=ip_network, dest="ips", action="append", help="advertise this IP address or network (CIDR notation)")
97
    g.add_argument("--local-preference", metavar='P', type=int, default=-1, help="advertise with local preference P")
98
    g.add_argument("--deaggregate-networks", dest="deaggregate_networks", action="store_true", help="Deaggregate Networks specified in --ip")
99
    g.add_argument("--no-ip-setup", action="store_false", dest="ip_setup", help="don't setup missing IP addresses")
100
    g.add_argument("--dynamic-ip-setup", default=False, action="store_true", dest="ip_dynamic", help="delete existing loopback ips on state down and " "disabled, then restore loopback when up")
101
    g.add_argument("--label", default=None, help="use the provided label to match loopback addresses")
102
    g.add_argument("--start-ip", metavar='N', type=int, default=0, help="index of the first IP in the list of IP addresses")
103
    g.add_argument("--up-metric", metavar='M', type=int, default=100, help="first IP get the metric M when the service is up")
104
    g.add_argument("--down-metric", metavar='M', type=int, default=1000, help="first IP get the metric M when the service is down")
105
    g.add_argument("--disabled-metric", metavar='M', type=int, default=500, help="first IP get the metric M when the service is disabled")
106
    g.add_argument("--increase", metavar='M', type=int, default=1, help="for each additional IP address, increase metric value by M")
107
    g.add_argument("--community", metavar="COMMUNITY", type=str, default=None, help="announce IPs with the supplied community")
108
    g.add_argument("--extended-community", metavar="EXTENDEDCOMMUNITY", type=str, default=None, help="announce IPs with the supplied extended community")
109
    g.add_argument("--large-community", metavar="LARGECOMMUNITY", type=str, default=None, help="announce IPs with the supplied large community")
110
    g.add_argument("--disabled-community", metavar="DISABLEDCOMMUNITY", type=str, default=None, help="announce IPs with the supplied community when disabled")
111
    g.add_argument("--as-path", metavar="ASPATH", type=str, default=None, help="announce IPs with the supplied as-path")
112
    g.add_argument("--withdraw-on-down", action="store_true", help="Instead of increasing the metric on health failure, withdraw the route")
113
114
    g = parser.add_argument_group("reporting")
115
    g.add_argument("--execute", metavar='CMD', type=str, action="append", help="execute CMD on state change")
116
    g.add_argument("--up-execute", metavar='CMD', type=str, action="append", help="execute CMD when the service becomes available")
117
    g.add_argument("--down-execute", metavar='CMD', type=str, action="append", help="execute CMD when the service becomes unavailable")
118
    g.add_argument("--disabled-execute", metavar='CMD', type=str, action="append", help="execute CMD when the service is disabled")
119
    # fmt: on
120
121
    options = parser.parse_args()
122
    if options.config is not None:
123
        # A configuration file has been provided. Read each line and
124
        # build an equivalent command line.
125
        args = sum(
126
            [
127
                "--{0}".format(l.strip()).split("=", 1)
128
                for l in options.config.readlines()
129
                if not l.strip().startswith("#") and l.strip()
130
            ],
131
            [],
132
        )
133
        args = [x.strip() for x in args]
134
        args.extend(sys.argv[1:])
135
        options = parser.parse_args(args)
136
    return options
137
138
139
def setup_logging(debug, silent, name, syslog_facility, syslog):
140
    """Setup logger"""
141
142
    def syslog_address():
143
        """Return a sensible syslog address"""
144
        if sys.platform == "darwin":
145
            return "/var/run/syslog"
146
        if sys.platform.startswith("freebsd"):
147
            return "/var/run/log"
148
        if sys.platform.startswith("netbsd"):
149
            return "/var/run/log"
150
        if sys.platform.startswith("linux"):
151
            return "/dev/log"
152
        raise EnvironmentError("Unable to guess syslog address for your " "platform, try to disable syslog")
153
154
    logger.setLevel(debug and logging.DEBUG or logging.INFO)
155
    enable_syslog = syslog and not debug
156
    # To syslog
157
    if enable_syslog:
158
        facility = getattr(logging.handlers.SysLogHandler, "LOG_{0}".format(syslog_facility.upper()))
159
        sh = logging.handlers.SysLogHandler(address=str(syslog_address()), facility=facility)
160
        if name:
161
            healthcheck_name = "healthcheck-{0}".format(name)
162
        else:
163
            healthcheck_name = "healthcheck"
164
        sh.setFormatter(logging.Formatter("{0}[{1}]: %(message)s".format(healthcheck_name, os.getpid())))
165
        logger.addHandler(sh)
166
    # To console
167
    toconsole = hasattr(sys.stderr, "isatty") and sys.stderr.isatty() and not silent  # pylint: disable=E1101
168
    if toconsole:
169
        ch = logging.StreamHandler()
170
        ch.setFormatter(logging.Formatter("%(levelname)s[%(name)s] %(message)s"))
171
        logger.addHandler(ch)
172
173
174
def loopback_ips(label, label_only):
175
    """Retrieve loopback IP addresses"""
176
    logger.debug("Retrieve loopback IP addresses")
177
    addresses = []
178
179
    if sys.platform.startswith("linux"):
180
        # Use "ip" (ifconfig is not able to see all addresses)
181
        ipre = re.compile(r"^(?P<index>\d+):\s+(?P<name>\S+)\s+inet6?\s+" r"(?P<ip>[\da-f.:]+)/(?P<mask>\d+)\s+.*")
182
        labelre = re.compile(r".*\s+lo:(?P<label>\S+).*")
183
        cmd = subprocess.Popen("/sbin/ip -o address show dev lo".split(), shell=False, stdout=subprocess.PIPE)
184
    else:
185
        # Try with ifconfig
186
        ipre = re.compile(
187
            r"^inet6?\s+(alias\s+)?(?P<ip>[\da-f.:]+)\s+"
188
            r"(?:netmask 0x(?P<netmask>[0-9a-f]+)|"
189
            r"prefixlen (?P<mask>\d+)).*"
190
        )
191
        cmd = subprocess.Popen("/sbin/ifconfig lo0".split(), shell=False, stdout=subprocess.PIPE)
192
        labelre = re.compile(r"")
193
    for line in cmd.stdout:
194
        line = line.decode("ascii", "ignore").strip()
195
        mo = ipre.match(line)
196
        if not mo:
197
            continue
198
        if mo.group("mask"):
199
            mask = int(mo.group("mask"))
200
        else:
201
            mask = bin(int(mo.group("netmask"), 16)).count("1")
202
        try:
203
            ip = ip_network("{0}/{1}".format(mo.group("ip"), mask))
204
        except ValueError:
205
            continue
206
        if not ip.is_loopback:
207
            if label:
208
                lmo = labelre.match(line)
209
                if not lmo:
210
                    continue
211
                if lmo.groupdict().get("label", "").startswith(label):
212
                    addresses.append(ip)
213
            elif not label_only:
214
                addresses.append(ip)
215
216
    logger.debug("Loopback addresses: %s", addresses)
217
    return addresses
218
219
220
def loopback():
221
    lo = "lo0"
222
    if sys.platform.startswith("linux"):
223
        lo = "lo"
224
    return lo
225
226
227
def setup_ips(ips, label, sudo=False):
228
    """Setup missing IP on loopback interface"""
229
230
    existing = set(loopback_ips(label, False))
231
    toadd = set([ip_network(ip) for net in ips for ip in net]) - existing
232
    for ip in toadd:
233
        logger.debug("Setup loopback IP address %s", ip)
234
        with open(os.devnull, "w") as fnull:
235
            cmd = ["ip", "address", "add", str(ip), "dev", loopback()]
236
            if sudo:
237
                cmd.insert(0, "sudo")
238
            if label:
239
                cmd += ["label", "{0}:{1}".format(loopback(), label)]
240
                try:
241
                    subprocess.check_call(cmd, stdout=fnull, stderr=fnull)
242
                except subprocess.CalledProcessError as e:
243
                    # the IP address is already setup, ignoring
244
                    if cmd[0] == "ip" and cmd[2] == "add" and e.returncode == 2:
245
                        continue
246
                    raise e
247
248
249
def remove_ips(ips, label, sudo=False):
250
    """Remove added IP on loopback interface"""
251
    existing = set(loopback_ips(label, True))
252
253
    # Get intersection of IPs (ips setup, and IPs configured by ExaBGP)
254
    toremove = set([ip_network(ip) for net in ips for ip in net]) & existing
255
    for ip in toremove:
256
        logger.debug("Remove loopback IP address %s", ip)
257
        with open(os.devnull, "w") as fnull:
258
            cmd = ["ip", "address", "delete", str(ip), "dev", loopback()]
259
            if sudo:
260
                cmd.insert(0, "sudo")
261
            if label:
262
                cmd += ["label", "{0}:{1}".format(loopback(), label)]
263
            try:
264
                subprocess.check_call(cmd, stdout=fnull, stderr=fnull)
265
            except subprocess.CalledProcessError:
266
                logger.warn(
267
                    "Unable to remove loopback IP address %s - is \
268
                    healthcheck running as root?",
269
                    str(ip),
270
                )
271
272
273
def drop_privileges(user, group):
274
    """Drop privileges to specified user and group"""
275
    if group is not None:
276
        import grp
277
278
        gid = grp.getgrnam(group).gr_gid
279
        logger.debug("Dropping privileges to group {0}/{1}".format(group, gid))
280
        try:
281
            os.setresgid(gid, gid, gid)
282
        except AttributeError:
283
            os.setregid(gid, gid)
284
    if user is not None:
285
        import pwd
286
287
        uid = pwd.getpwnam(user).pw_uid
288
        logger.debug("Dropping privileges to user {0}/{1}".format(user, uid))
289
        try:
290
            os.setresuid(uid, uid, uid)
291
        except AttributeError:
292
            os.setreuid(uid, uid)
293
294
295
def check(cmd, timeout):
296
    """Check the return code of the given command.
297
298
    :param cmd: command to execute. If :keyword:`None`, no command is executed.
299
    :param timeout: how much time we should wait for command completion.
300
    :return: :keyword:`True` if the command was successful or
301
             :keyword:`False` if not or if the timeout was triggered.
302
    """
303
    if cmd is None:
304
        return True
305
306
    class Alarm(Exception):
307
        """Exception to signal an alarm condition."""
308
309
        pass
310
311
    def alarm_handler(number, frame):  # pylint: disable=W0613
312
        """Handle SIGALRM signal."""
313
        raise Alarm()
0 ignored issues
show
The variable Alarm does not seem to be defined for all execution paths.
Loading history...
314
315
    logger.debug("Checking command %s", repr(cmd))
316
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setpgrp)
317
    if timeout:
318
        signal.signal(signal.SIGALRM, alarm_handler)
319
        signal.alarm(timeout)
320
    try:
321
        stdout = None
322
        stdout, _ = p.communicate()
323
        if timeout:
324
            signal.alarm(0)
325
        if p.returncode != 0:
326
            logger.warn("Check command was unsuccessful: %s", p.returncode)
327
            if stdout.strip():
328
                logger.info("Output of check command: %s", stdout)
329
            return False
330
        logger.debug("Command was executed successfully %s %s", p.returncode, stdout)
331
        return True
332
    except Alarm:
333
        logger.warn("Timeout (%s) while running check command %s", timeout, cmd)
334
        os.killpg(p.pid, signal.SIGKILL)
335
        return False
336
337
338
def loop(options):
339
    """Main loop."""
340
    states = enum(
341
        "INIT",  # Initial state
342
        "DISABLED",  # Disabled state
343
        "RISING",  # Checks are currently succeeding.
344
        "FALLING",  # Checks are currently failing.
345
        "UP",  # Service is considered as up.
346
        "DOWN",  # Service is considered as down.
347
        "EXIT",  # Exit state
348
        "END",  # End state, exiting but without removing loopback and/or announced routes
349
    )
350
351
    def exabgp(target):
352
        """Communicate new state to ExaBGP"""
353
        if target not in (states.UP, states.DOWN, states.DISABLED, states.EXIT, states.END):
354
            return
355
        if target in (states.END,):
356
            return
357
        # dynamic ip management. When the service fail, remove the loopback
358
        if target in (states.EXIT,) and (options.ip_dynamic or options.ip_setup):
359
            logger.info("exiting, deleting loopback ips")
360
            remove_ips(options.ips, options.label, options.sudo)
361
        # dynamic ip management. When the service fail, remove the loopback
362
        if target in (states.DOWN, states.DISABLED) and options.ip_dynamic:
363
            logger.info("service down, deleting loopback ips")
364
            remove_ips(options.ips, options.label, options.sudo)
365
        # if ips was deleted with dyn ip, re-setup them
366
        if target == states.UP and options.ip_dynamic:
367
            logger.info("service up, restoring loopback ips")
368
            setup_ips(options.ips, options.label, options.sudo)
369
370
        logger.info("send announces for %s state to ExaBGP", target)
371
        metric = vars(options).get("{0}_metric".format(str(target).lower()), 0)
372
        for ip in options.ips:
373
            if options.withdraw_on_down or target is states.EXIT:
374
                command = "announce" if target is states.UP else "withdraw"
375
            else:
376
                command = "announce"
377
            announce = "route {0} next-hop {1}".format(str(ip), options.next_hop or "self")
378
379
            if command == "announce":
380
                announce = "{0} med {1}".format(announce, metric)
381
                if options.local_preference >= 0:
382
                    announce = "{0} local-preference {1}".format(announce, options.local_preference)
383
                if options.community or options.disabled_community:
384
                    community = options.community
385
                    if target in (states.DOWN, states.DISABLED):
386
                        if options.disabled_community:
387
                            community = options.disabled_community
388
                    if community:
389
                        announce = "{0} community [ {1} ]".format(announce, community)
390
                if options.extended_community:
391
                    announce = "{0} extended-community [ {1} ]".format(announce, options.extended_community)
392
                if options.large_community:
393
                    announce = "{0} large-community [ {1} ]".format(announce, options.large_community)
394
                if options.as_path:
395
                    announce = "{0} as-path [ {1} ]".format(announce, options.as_path)
396
397
            metric += options.increase
398
399
            # Send and flush command
400
            logger.debug("exabgp: {0} {1}".format(command, announce))
401
            print("{0} {1}".format(command, announce))
402
            sys.stdout.flush()
403
404
            # Wait for confirmation from ExaBGP if expected
405
            if options.no_ack:
406
                continue
407
            # if the program is not ran manually, do not read the input
408
            if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
409
                continue
410
            sys.stdin.readline()
411
412
    def trigger(target):
413
        """Trigger a state change and execute the appropriate commands"""
414
        # Shortcut for RISING->UP and FALLING->UP
415
        if target == states.RISING and options.rise <= 1:
416
            target = states.UP
417
        elif target == states.FALLING and options.fall <= 1:
418
            target = states.DOWN
419
420
        # Log and execute commands
421
        logger.debug("Transition to %s", str(target))
422
        cmds = []
423
        cmds.extend(vars(options).get("{0}_execute".format(str(target).lower()), []) or [])
424
        cmds.extend(vars(options).get("execute", []) or [])
425
        for cmd in cmds:
426
            logger.debug("Transition to %s, execute `%s`", str(target), cmd)
427
            env = os.environ.copy()
428
            env.update({"STATE": str(target)})
429
            with open(os.devnull, "w") as fnull:
430
                subprocess.call(cmd, shell=True, stdout=fnull, stderr=fnull, env=env)
431
432
        return target
433
434
    def one(checks, state):
435
        """Execute one loop iteration."""
436
        disabled = options.disable is not None and os.path.exists(options.disable)
437
        successful = disabled or check(options.command, options.timeout)
438
        # FSM
439
        if state != states.DISABLED and disabled:
440
            state = trigger(states.DISABLED)
441
        elif state == states.INIT:
442
            if successful and options.rise <= 1:
443
                state = trigger(states.UP)
444
            elif successful:
445
                state = trigger(states.RISING)
446
                checks = 1
447
            else:
448
                state = trigger(states.FALLING)
449
                checks = 1
450
        elif state == states.DISABLED:
451
            if not disabled:
452
                state = trigger(states.INIT)
453
        elif state == states.RISING:
454
            if successful:
455
                checks += 1
456
                if checks >= options.rise:
457
                    state = trigger(states.UP)
458
            else:
459
                state = trigger(states.FALLING)
460
                checks = 1
461
        elif state == states.FALLING:
462
            if not successful:
463
                checks += 1
464
                if checks >= options.fall:
465
                    state = trigger(states.DOWN)
466
            else:
467
                state = trigger(states.RISING)
468
                checks = 1
469
        elif state == states.UP:
470
            if not successful:
471
                state = trigger(states.FALLING)
472
                checks = 1
473
        elif state == states.DOWN:
474
            if successful:
475
                state = trigger(states.RISING)
476
                checks = 1
477
        else:
478
            raise ValueError("Unhandled state: {0}".format(str(state)))
479
480
        # Send announces. We announce them on a regular basis in case
481
        # we lose connection with a peer and the adj-rib-out is disabled.
482
        exabgp(state)
483
        return checks, state
484
485
    checks = 0
486
    state = states.INIT
487
    # Do cleanups on SIGTERM
488
    def sigterm_handler(signum, frame):  # pylint: disable=W0612,W0613
489
        exabgp(states.EXIT)
490
        sys.exit(0)
491
492
    signal.signal(signal.SIGTERM, sigterm_handler)
493
494
    while True:
495
        checks, state = one(checks, state)
496
497
        try:
498
            # How much we should sleep?
499
            if state in (states.FALLING, states.RISING):
500
                time.sleep(options.fast)
501
            elif options.interval == 0:
502
                logger.info("interval set to zero, exiting after the announcement")
503
                exabgp(states.END)
504
                break
505
            else:
506
                time.sleep(options.interval)
507
        except KeyboardInterrupt:
508
            exabgp(states.EXIT)
509
            break
510
511
512
def cmdline(cmdarg):
513
    sys.argv = [f'{sys.argv[0]} {sys.argv[1]}'] + sys.argv[2:]
514
    main()
515
516
517
def main():
518
    """Entry point."""
519
    options = parse()
520
    setup_logging(options.debug, options.silent, options.name, options.syslog_facility, not options.no_syslog)
521
    if options.pid:
522
        options.pid.write("{0}\n".format(os.getpid()))
523
        options.pid.close()
524
    try:
525
        # Setup IP to use
526
        options.ips = options.ips or loopback_ips(options.label, False)
527
        if not options.ips:
528
            logger.error("No IP found")
529
            sys.exit(1)
530
        if options.ip_setup:
531
            setup_ips(options.ips, options.label, options.sudo)
532
        drop_privileges(options.user, options.group)
533
534
        # Parse defined networks into a list of IPs for advertisement
535
        if options.deaggregate_networks:
536
            options.ips = [ip_network(ip) for net in options.ips for ip in net]
537
538
        options.ips = collections.deque(options.ips)
539
        options.ips.rotate(-options.start_ip)
540
        options.ips = list(options.ips)
541
        # Main loop
542
        loop(options)
543
    except Exception as e:  # pylint: disable=W0703
544
        logger.exception("Uncaught exception: %s", e)
545
        sys.exit(1)
546
547
548
if __name__ == "__main__":
549
    main()
550