Completed
Push — master ( 0f3e04...1c5edf )
by Thomas
30:23 queued 15:11
created

exabgp.application.healthcheck.parse()   A

Complexity

Conditions 2

Size

Total Lines 22
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 16
nop 0
dl 0
loc 22
rs 9.6
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
"""Healthchecker for exabgp.
4
5
This program is to be used as a process for exabgp. It will announce
6
some VIP depending on the state of a check whose a third-party program
7
wrapped by this program.
8
9
To use, declare this program as a process in your
10
:file:`/etc/exabgp/exabgp.conf`::
11
12
    neighbor 192.0.2.1 {
13
       router-id 192.0.2.2;
14
       local-as 64496;
15
       peer-as 64497;
16
    }
17
    process watch-haproxy {
18
       run python -m exabgp healthcheck --cmd "curl -sf http://127.0.0.1/healthcheck" --label haproxy;
19
    }
20
    process watch-mysql {
21
       run python -m exabgp healthcheck --cmd "mysql -u check -e 'SELECT 1'" --label mysql;
22
    }
23
24
Use :option:`--help` to get options accepted by this program. A
25
configuration file is also possible. Such a configuration file looks
26
like this::
27
28
     debug
29
     name = haproxy
30
     interval = 10
31
     fast-interval = 1
32
     command = curl -sf http://127.0.0.1/healthcheck
33
34
The left-part of each line is the corresponding long option.
35
36
When using label for loopback selection, the provided value should
37
match the beginning of the label without the interface prefix. In the
38
example above, this means that you should have addresses on lo
39
labelled ``lo:haproxy1``, ``lo:haproxy2``, etc.
40
41
"""
42
43
import sys
44
import os
45
import subprocess
46
import re
47
import logging
48
import logging.handlers
49
import argparse
50
import signal
51
import time
52
import collections
53
54
from ipaddress import ip_network
55
from ipaddress import ip_address
56
57
58
logger = logging.getLogger("healthcheck")
59
60
61
def enum(*sequential):
62
    """Create a simple enumeration."""
63
    return type(str("Enum"), (), dict(zip(sequential, sequential)))
64
65
66
def setargs(parser):
67
    # fmt: off
68
    g = parser.add_mutually_exclusive_group()
69
    g.add_argument("--debug", "-d", action="store_true", default=False, help="enable debugging")
70
    g.add_argument("--no-ack", "-a", action="store_true", default=False, help="set for exabgp 3.4 or 4.x when exabgp.api.ack=false")
71
    g.add_argument("--silent", "-s", action="store_true", default=False, help="don't log to console")
72
    g.add_argument("--syslog-facility", "-sF", metavar="FACILITY", nargs='?', const="daemon", default="daemon", help="log to syslog using FACILITY, default FACILITY is daemon")
73
    g.add_argument("--sudo", action="store_true", default=False, help="use sudo to setup ip addresses")
74
    g.add_argument("--no-syslog", action="store_true", help="disable syslog logging")
75
    parser.add_argument("--name", "-n", metavar="NAME", help="name for this healthchecker")
76
    parser.add_argument("--config", "-F", metavar="FILE", type=open, help="read configuration from a file")
77
    parser.add_argument("--pid", "-p", metavar="FILE", type=argparse.FileType('w'), help="write PID to the provided file")
78
    parser.add_argument("--user", metavar="USER", help="set user after setting loopback addresses")
79
    parser.add_argument("--group", metavar="GROUP", help="set group after setting loopback addresses")
80
81
    g = parser.add_argument_group("checking healthiness")
82
    g.add_argument("--interval", "-i", metavar='N', default=5, type=float, help="wait N seconds between each healthcheck (zero to exit after first announcement)")
83
    g.add_argument("--fast-interval", "-f", metavar='N', default=1, type=float, dest="fast", help="when a state change is about to occur, wait N seconds between each healthcheck")
84
    g.add_argument("--timeout", "-t", metavar='N', default=5, type=int, help="wait N seconds for the check command to execute")
85
    g.add_argument("--rise", metavar='N', default=3, type=int, help="check N times before considering the service up")
86
    g.add_argument("--fall", metavar='N', default=3, type=int, help="check N times before considering the service down")
87
    g.add_argument("--disable", metavar='FILE', type=str, help="if FILE exists, the service is considered disabled")
88
    g.add_argument("--command", "--cmd", "-c", metavar='CMD', type=str, help="command to use for healthcheck")
89
90
    g = parser.add_argument_group("advertising options")
91
    g.add_argument("--next-hop", "-N", metavar='IP', type=ip_address, help="self IP address to use as next hop")
92
    g.add_argument("--ip", metavar='IP', type=ip_network, dest="ips", action="append", help="advertise this IP address or network (CIDR notation)")
93
    g.add_argument("--local-preference", metavar='P', type=int, default=-1, help="advertise with local preference P")
94
    g.add_argument("--deaggregate-networks", dest="deaggregate_networks", action="store_true", help="Deaggregate Networks specified in --ip")
95
    g.add_argument("--no-ip-setup", action="store_false", dest="ip_setup", help="don't setup missing IP addresses")
96
    g.add_argument("--dynamic-ip-setup", default=False, action="store_true", dest="ip_dynamic", help="delete existing loopback ips on state down and " "disabled, then restore loopback when up")
97
    g.add_argument("--label", default=None, help="use the provided label to match loopback addresses")
98
    g.add_argument("--start-ip", metavar='N', type=int, default=0, help="index of the first IP in the list of IP addresses")
99
    g.add_argument("--up-metric", metavar='M', type=int, default=100, help="first IP get the metric M when the service is up")
100
    g.add_argument("--down-metric", metavar='M', type=int, default=1000, help="first IP get the metric M when the service is down")
101
    g.add_argument("--disabled-metric", metavar='M', type=int, default=500, help="first IP get the metric M when the service is disabled")
102
    g.add_argument("--increase", metavar='M', type=int, default=1, help="for each additional IP address, increase metric value by M")
103
    g.add_argument("--community", metavar="COMMUNITY", type=str, default=None, help="announce IPs with the supplied community")
104
    g.add_argument("--extended-community", metavar="EXTENDEDCOMMUNITY", type=str, default=None, help="announce IPs with the supplied extended community")
105
    g.add_argument("--large-community", metavar="LARGECOMMUNITY", type=str, default=None, help="announce IPs with the supplied large community")
106
    g.add_argument("--disabled-community", metavar="DISABLEDCOMMUNITY", type=str, default=None, help="announce IPs with the supplied community when disabled")
107
    g.add_argument("--as-path", metavar="ASPATH", type=str, default=None, help="announce IPs with the supplied as-path")
108
    g.add_argument("--withdraw-on-down", action="store_true", help="Instead of increasing the metric on health failure, withdraw the route")
109
110
    g = parser.add_argument_group("reporting")
111
    g.add_argument("--execute", metavar='CMD', type=str, action="append", help="execute CMD on state change")
112
    g.add_argument("--up-execute", metavar='CMD', type=str, action="append", help="execute CMD when the service becomes available")
113
    g.add_argument("--down-execute", metavar='CMD', type=str, action="append", help="execute CMD when the service becomes unavailable")
114
    g.add_argument("--disabled-execute", metavar='CMD', type=str, action="append", help="execute CMD when the service is disabled")
115
    # fmt: on
116
117
118
def parse():
119
    """Parse arguments"""
120
    formatter = argparse.RawDescriptionHelpFormatter
121
    parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__, formatter_class=formatter)
122
    setargs(parser)
123
124
    options = parser.parse_args()
125
    if options.config is not None:
126
        # A configuration file has been provided. Read each line and
127
        # build an equivalent command line.
128
        args = sum(
129
            [
130
                "--{0}".format(l.strip()).split("=", 1)
131
                for l in options.config.readlines()
132
                if not l.strip().startswith("#") and l.strip()
133
            ],
134
            [],
135
        )
136
        args = [x.strip() for x in args]
137
        args.extend(sys.argv[1:])
138
        options = parser.parse_args(args)
139
    return options
140
141
142
def setup_logging(debug, silent, name, syslog_facility, syslog):
143
    """Setup logger"""
144
145
    def syslog_address():
146
        """Return a sensible syslog address"""
147
        if sys.platform == "darwin":
148
            return "/var/run/syslog"
149
        if sys.platform.startswith("freebsd"):
150
            return "/var/run/log"
151
        if sys.platform.startswith("netbsd"):
152
            return "/var/run/log"
153
        if sys.platform.startswith("linux"):
154
            return "/dev/log"
155
        raise EnvironmentError("Unable to guess syslog address for your " "platform, try to disable syslog")
156
157
    logger.setLevel(debug and logging.DEBUG or logging.INFO)
158
    enable_syslog = syslog and not debug
159
    # To syslog
160
    if enable_syslog:
161
        facility = getattr(logging.handlers.SysLogHandler, "LOG_{0}".format(syslog_facility.upper()))
162
        sh = logging.handlers.SysLogHandler(address=str(syslog_address()), facility=facility)
163
        if name:
164
            healthcheck_name = "healthcheck-{0}".format(name)
165
        else:
166
            healthcheck_name = "healthcheck"
167
        sh.setFormatter(logging.Formatter("{0}[{1}]: %(message)s".format(healthcheck_name, os.getpid())))
168
        logger.addHandler(sh)
169
    # To console
170
    toconsole = hasattr(sys.stderr, "isatty") and sys.stderr.isatty() and not silent  # pylint: disable=E1101
171
    if toconsole:
172
        ch = logging.StreamHandler()
173
        ch.setFormatter(logging.Formatter("%(levelname)s[%(name)s] %(message)s"))
174
        logger.addHandler(ch)
175
176
177
def loopback_ips(label, label_only):
178
    """Retrieve loopback IP addresses"""
179
    logger.debug("Retrieve loopback IP addresses")
180
    addresses = []
181
182
    if sys.platform.startswith("linux"):
183
        # Use "ip" (ifconfig is not able to see all addresses)
184
        ipre = re.compile(r"^(?P<index>\d+):\s+(?P<name>\S+)\s+inet6?\s+" r"(?P<ip>[\da-f.:]+)/(?P<mask>\d+)\s+.*")
185
        labelre = re.compile(r".*\s+lo:(?P<label>\S+).*")
186
        cmd = subprocess.Popen("/sbin/ip -o address show dev lo".split(), shell=False, stdout=subprocess.PIPE)
187
    else:
188
        # Try with ifconfig
189
        ipre = re.compile(
190
            r"^inet6?\s+(alias\s+)?(?P<ip>[\da-f.:]+)\s+"
191
            r"(?:netmask 0x(?P<netmask>[0-9a-f]+)|"
192
            r"prefixlen (?P<mask>\d+)).*"
193
        )
194
        cmd = subprocess.Popen("/sbin/ifconfig lo0".split(), shell=False, stdout=subprocess.PIPE)
195
        labelre = re.compile(r"")
196
    for line in cmd.stdout:
197
        line = line.decode("ascii", "ignore").strip()
198
        mo = ipre.match(line)
199
        if not mo:
200
            continue
201
        if mo.group("mask"):
202
            mask = int(mo.group("mask"))
203
        else:
204
            mask = bin(int(mo.group("netmask"), 16)).count("1")
205
        try:
206
            ip = ip_network("{0}/{1}".format(mo.group("ip"), mask))
207
        except ValueError:
208
            continue
209
        if not ip.is_loopback:
210
            if label:
211
                lmo = labelre.match(line)
212
                if not lmo:
213
                    continue
214
                if lmo.groupdict().get("label", "").startswith(label):
215
                    addresses.append(ip)
216
            elif not label_only:
217
                addresses.append(ip)
218
219
    logger.debug("Loopback addresses: %s", addresses)
220
    return addresses
221
222
223
def loopback():
224
    lo = "lo0"
225
    if sys.platform.startswith("linux"):
226
        lo = "lo"
227
    return lo
228
229
230
def setup_ips(ips, label, sudo=False):
231
    """Setup missing IP on loopback interface"""
232
233
    existing = set(loopback_ips(label, False))
234
    toadd = set([ip_network(ip) for net in ips for ip in net]) - existing
235
    for ip in toadd:
236
        logger.debug("Setup loopback IP address %s", ip)
237
        with open(os.devnull, "w") as fnull:
238
            cmd = ["ip", "address", "add", str(ip), "dev", loopback()]
239
            if sudo:
240
                cmd.insert(0, "sudo")
241
            if label:
242
                cmd += ["label", "{0}:{1}".format(loopback(), label)]
243
                try:
244
                    subprocess.check_call(cmd, stdout=fnull, stderr=fnull)
245
                except subprocess.CalledProcessError as e:
246
                    # the IP address is already setup, ignoring
247
                    if cmd[0] == "ip" and cmd[2] == "add" and e.returncode == 2:
248
                        continue
249
                    raise e
250
251
252
def remove_ips(ips, label, sudo=False):
253
    """Remove added IP on loopback interface"""
254
    existing = set(loopback_ips(label, True))
255
256
    # Get intersection of IPs (ips setup, and IPs configured by ExaBGP)
257
    toremove = set([ip_network(ip) for net in ips for ip in net]) & existing
258
    for ip in toremove:
259
        logger.debug("Remove loopback IP address %s", ip)
260
        with open(os.devnull, "w") as fnull:
261
            cmd = ["ip", "address", "delete", str(ip), "dev", loopback()]
262
            if sudo:
263
                cmd.insert(0, "sudo")
264
            if label:
265
                cmd += ["label", "{0}:{1}".format(loopback(), label)]
266
            try:
267
                subprocess.check_call(cmd, stdout=fnull, stderr=fnull)
268
            except subprocess.CalledProcessError:
269
                logger.warn(
270
                    "Unable to remove loopback IP address %s - is \
271
                    healthcheck running as root?",
272
                    str(ip),
273
                )
274
275
276
def drop_privileges(user, group):
277
    """Drop privileges to specified user and group"""
278
    if group is not None:
279
        import grp
280
281
        gid = grp.getgrnam(group).gr_gid
282
        logger.debug("Dropping privileges to group {0}/{1}".format(group, gid))
283
        try:
284
            os.setresgid(gid, gid, gid)
285
        except AttributeError:
286
            os.setregid(gid, gid)
287
    if user is not None:
288
        import pwd
289
290
        uid = pwd.getpwnam(user).pw_uid
291
        logger.debug("Dropping privileges to user {0}/{1}".format(user, uid))
292
        try:
293
            os.setresuid(uid, uid, uid)
294
        except AttributeError:
295
            os.setreuid(uid, uid)
296
297
298
def check(cmd, timeout):
299
    """Check the return code of the given command.
300
301
    :param cmd: command to execute. If :keyword:`None`, no command is executed.
302
    :param timeout: how much time we should wait for command completion.
303
    :return: :keyword:`True` if the command was successful or
304
             :keyword:`False` if not or if the timeout was triggered.
305
    """
306
    if cmd is None:
307
        return True
308
309
    class Alarm(Exception):
310
        """Exception to signal an alarm condition."""
311
312
        pass
313
314
    def alarm_handler(number, frame):  # pylint: disable=W0613
315
        """Handle SIGALRM signal."""
316
        raise Alarm()
0 ignored issues
show
introduced by
The variable Alarm does not seem to be defined for all execution paths.
Loading history...
317
318
    logger.debug("Checking command %s", repr(cmd))
319
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setpgrp)
320
    if timeout:
321
        signal.signal(signal.SIGALRM, alarm_handler)
322
        signal.alarm(timeout)
323
    try:
324
        stdout = None
325
        stdout, _ = p.communicate()
326
        if timeout:
327
            signal.alarm(0)
328
        if p.returncode != 0:
329
            logger.warn("Check command was unsuccessful: %s", p.returncode)
330
            if stdout.strip():
331
                logger.info("Output of check command: %s", stdout)
332
            return False
333
        logger.debug("Command was executed successfully %s %s", p.returncode, stdout)
334
        return True
335
    except Alarm:
336
        logger.warn("Timeout (%s) while running check command %s", timeout, cmd)
337
        os.killpg(p.pid, signal.SIGKILL)
338
        return False
339
340
341
def loop(options):
342
    """Main loop."""
343
    states = enum(
344
        "INIT",  # Initial state
345
        "DISABLED",  # Disabled state
346
        "RISING",  # Checks are currently succeeding.
347
        "FALLING",  # Checks are currently failing.
348
        "UP",  # Service is considered as up.
349
        "DOWN",  # Service is considered as down.
350
        "EXIT",  # Exit state
351
        "END",  # End state, exiting but without removing loopback and/or announced routes
352
    )
353
354
    def exabgp(target):
355
        """Communicate new state to ExaBGP"""
356
        if target not in (states.UP, states.DOWN, states.DISABLED, states.EXIT, states.END):
357
            return
358
        if target in (states.END,):
359
            return
360
        # dynamic ip management. When the service fail, remove the loopback
361
        if target in (states.EXIT,) and (options.ip_dynamic or options.ip_setup):
362
            logger.info("exiting, deleting loopback ips")
363
            remove_ips(options.ips, options.label, options.sudo)
364
        # dynamic ip management. When the service fail, remove the loopback
365
        if target in (states.DOWN, states.DISABLED) and options.ip_dynamic:
366
            logger.info("service down, deleting loopback ips")
367
            remove_ips(options.ips, options.label, options.sudo)
368
        # if ips was deleted with dyn ip, re-setup them
369
        if target == states.UP and options.ip_dynamic:
370
            logger.info("service up, restoring loopback ips")
371
            setup_ips(options.ips, options.label, options.sudo)
372
373
        logger.info("send announces for %s state to ExaBGP", target)
374
        metric = vars(options).get("{0}_metric".format(str(target).lower()), 0)
375
        for ip in options.ips:
376
            if options.withdraw_on_down or target is states.EXIT:
377
                command = "announce" if target is states.UP else "withdraw"
378
            else:
379
                command = "announce"
380
            announce = "route {0} next-hop {1}".format(str(ip), options.next_hop or "self")
381
382
            if command == "announce":
383
                announce = "{0} med {1}".format(announce, metric)
384
                if options.local_preference >= 0:
385
                    announce = "{0} local-preference {1}".format(announce, options.local_preference)
386
                if options.community or options.disabled_community:
387
                    community = options.community
388
                    if target in (states.DOWN, states.DISABLED):
389
                        if options.disabled_community:
390
                            community = options.disabled_community
391
                    if community:
392
                        announce = "{0} community [ {1} ]".format(announce, community)
393
                if options.extended_community:
394
                    announce = "{0} extended-community [ {1} ]".format(announce, options.extended_community)
395
                if options.large_community:
396
                    announce = "{0} large-community [ {1} ]".format(announce, options.large_community)
397
                if options.as_path:
398
                    announce = "{0} as-path [ {1} ]".format(announce, options.as_path)
399
400
            metric += options.increase
401
402
            # Send and flush command
403
            logger.debug("exabgp: {0} {1}".format(command, announce))
404
            print("{0} {1}".format(command, announce))
405
            sys.stdout.flush()
406
407
            # Wait for confirmation from ExaBGP if expected
408
            if options.no_ack:
409
                continue
410
            # if the program is not ran manually, do not read the input
411
            if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
412
                continue
413
            sys.stdin.readline()
414
415
    def trigger(target):
416
        """Trigger a state change and execute the appropriate commands"""
417
        # Shortcut for RISING->UP and FALLING->UP
418
        if target == states.RISING and options.rise <= 1:
419
            target = states.UP
420
        elif target == states.FALLING and options.fall <= 1:
421
            target = states.DOWN
422
423
        # Log and execute commands
424
        logger.debug("Transition to %s", str(target))
425
        cmds = []
426
        cmds.extend(vars(options).get("{0}_execute".format(str(target).lower()), []) or [])
427
        cmds.extend(vars(options).get("execute", []) or [])
428
        for cmd in cmds:
429
            logger.debug("Transition to %s, execute `%s`", str(target), cmd)
430
            env = os.environ.copy()
431
            env.update({"STATE": str(target)})
432
            with open(os.devnull, "w") as fnull:
433
                subprocess.call(cmd, shell=True, stdout=fnull, stderr=fnull, env=env)
434
435
        return target
436
437
    def one(checks, state):
438
        """Execute one loop iteration."""
439
        disabled = options.disable is not None and os.path.exists(options.disable)
440
        successful = disabled or check(options.command, options.timeout)
441
        # FSM
442
        if state != states.DISABLED and disabled:
443
            state = trigger(states.DISABLED)
444
        elif state == states.INIT:
445
            if successful and options.rise <= 1:
446
                state = trigger(states.UP)
447
            elif successful:
448
                state = trigger(states.RISING)
449
                checks = 1
450
            else:
451
                state = trigger(states.FALLING)
452
                checks = 1
453
        elif state == states.DISABLED:
454
            if not disabled:
455
                state = trigger(states.INIT)
456
        elif state == states.RISING:
457
            if successful:
458
                checks += 1
459
                if checks >= options.rise:
460
                    state = trigger(states.UP)
461
            else:
462
                state = trigger(states.FALLING)
463
                checks = 1
464
        elif state == states.FALLING:
465
            if not successful:
466
                checks += 1
467
                if checks >= options.fall:
468
                    state = trigger(states.DOWN)
469
            else:
470
                state = trigger(states.RISING)
471
                checks = 1
472
        elif state == states.UP:
473
            if not successful:
474
                state = trigger(states.FALLING)
475
                checks = 1
476
        elif state == states.DOWN:
477
            if successful:
478
                state = trigger(states.RISING)
479
                checks = 1
480
        else:
481
            raise ValueError("Unhandled state: {0}".format(str(state)))
482
483
        # Send announces. We announce them on a regular basis in case
484
        # we lose connection with a peer and the adj-rib-out is disabled.
485
        exabgp(state)
486
        return checks, state
487
488
    checks = 0
489
    state = states.INIT
490
    # Do cleanups on SIGTERM
491
    def sigterm_handler(signum, frame):  # pylint: disable=W0612,W0613
492
        exabgp(states.EXIT)
493
        sys.exit(0)
494
495
    signal.signal(signal.SIGTERM, sigterm_handler)
496
497
    while True:
498
        checks, state = one(checks, state)
499
500
        try:
501
            # How much we should sleep?
502
            if state in (states.FALLING, states.RISING):
503
                time.sleep(options.fast)
504
            elif options.interval == 0:
505
                logger.info("interval set to zero, exiting after the announcement")
506
                exabgp(states.END)
507
                break
508
            else:
509
                time.sleep(options.interval)
510
        except KeyboardInterrupt:
511
            exabgp(states.EXIT)
512
            break
513
514
515
def cmdline(cmdarg):
516
    sys.argv = [f'{sys.argv[0]} {sys.argv[1]}'] + sys.argv[2:]
517
    main()
518
519
520
def main():
521
    """Entry point."""
522
    options = parse()
523
    setup_logging(options.debug, options.silent, options.name, options.syslog_facility, not options.no_syslog)
524
    if options.pid:
525
        options.pid.write("{0}\n".format(os.getpid()))
526
        options.pid.close()
527
    try:
528
        # Setup IP to use
529
        options.ips = options.ips or loopback_ips(options.label, False)
530
        if not options.ips:
531
            logger.error("No IP found")
532
            sys.exit(1)
533
        if options.ip_setup:
534
            setup_ips(options.ips, options.label, options.sudo)
535
        drop_privileges(options.user, options.group)
536
537
        # Parse defined networks into a list of IPs for advertisement
538
        if options.deaggregate_networks:
539
            options.ips = [ip_network(ip) for net in options.ips for ip in net]
540
541
        options.ips = collections.deque(options.ips)
542
        options.ips.rotate(-options.start_ip)
543
        options.ips = list(options.ips)
544
        # Main loop
545
        loop(options)
546
    except Exception as e:  # pylint: disable=W0703
547
        logger.exception("Uncaught exception: %s", e)
548
        sys.exit(1)
549
550
551
if __name__ == "__main__":
552
    main()
553