renew_certificate()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
4
from ovh_interface.DnsManager import DnsManager
5
from ovh_interface.LoadBalancerSSLManager import LoadBalancerSSLManager
6
import os
7
import logging
8
import subprocess
9
from typing import List
10
import argparse
11
12
####################################################
13
# Static init
14
####################################################
15
16
logger = logging.getLogger(__name__)
17
logger.addHandler(logging.StreamHandler())
18
19
if os.getenv('DEBUG'):
20
    logger.setLevel(logging.DEBUG)
21
    logger.debug("DEBUG MODE".center(150, '-'))
22
else:
23
    logger.setLevel(logging.INFO)
24
25
####################################################
26
# End Static init
27
####################################################
28
29
30
def renew_certificate(domain: str) -> bool:
31
    """
32
    Create/Renew the certificate for domain passed in arg
33
    :param domain: The domain you want to get the ssl certificate
34
    :return: True if certificated getted and added in IP LB with success
35
    """
36
37
    logger.info("Start process for {}".format(domain).center(150, '-'))
38
    command_result = subprocess.run(args=['dehydrated', '-c', '-d', domain, '-k', './ovhDnsHook.py', '-t', 'dns-01'], stdout=subprocess.PIPE)
39
    return True if command_result.returncode == 0 else False
40
41
42
def parse_command_line() -> List[str]:
43
    """
44
    Parse the command line and extract list of domain passed in param
45
    :return: list of domain you want to add SSL certif in IPLB
46
    """
47
48
    parser = argparse.ArgumentParser()
49
    parser.add_argument("-d", "--domain", action='append', help="Domain you want to get the certificate and add it in the IP-LB")
50
51
    args = parser.parse_args()
52
    if args.domain:
53
        return args.domain
54
    else:
55
        return []
56
57
58
if __name__ == '__main__':
59
60
    ip_lb_name = os.getenv('iplb_name')
61
    input_domain_list = parse_command_line()
62
63
    if os.getenv('DEBUG'):
64
        with open('/etc/dehydrated/config', 'a') as config:
65
            config.write('CA="https://acme-staging.api.letsencrypt.org/directory"\n')
66
67
    loadBalancerUpdaterSSL = LoadBalancerSSLManager(ip_lb_name=ip_lb_name)
68
69
    list_of_ssl_ip_lb_entry = loadBalancerUpdaterSSL.get_certificate_managed_by_ip_lb()
70
    domains_from_iplb = [ssl_ip_lb_entry['ssl_cn'] for ssl_ip_lb_entry in list_of_ssl_ip_lb_entry]
71
    all_domains_to_get_certif = domains_from_iplb + list(set(input_domain_list) - set(domains_from_iplb))
72
73
    dnsManager = DnsManager()
74
    list_dns_zone = dnsManager.get_dns_zone_manageable()
75
    list_of_updatable_domain = set([domain for domain in all_domains_to_get_certif if '.'.join(domain.split('.')[1:]) in list_dns_zone])
76
77
    logger.info("List of domain will be updated: {}".format(list_of_updatable_domain))
78
79
    for ssl_certif_updatable in list_of_updatable_domain:
80
        renew_certificate(ssl_certif_updatable)
81
82
    for retry_count in range(os.getenv('max_retry', 5)):
83
84
        if not os.path.exists('failedRenew.log'):
85
            break
86
87
        logger.info("Retry process (count: {})".format(retry_count+1).center(150, '-'))
88
89
        with open('failedRenew.log', 'r') as failedDomain:
90
            domains = failedDomain.readlines()
91
        os.remove('failedRenew.log')
92
93
        logger.info("List of domain to retry: {}".format(retry_count+1, domains))
94
        for domain in domains:
95
            renew_certificate(domain)
96