clean_challenge()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 14
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
4
import logging
5
import os
6
import sys
7
from time import sleep
8
from ovh_interface import DnsManager, LoadBalancerSSLManager
9
10
####################################################
11
# Static init
12
####################################################
13
14
logger = logging.getLogger(__name__)
15
logger.addHandler(logging.StreamHandler())
16
17
if os.getenv('DEBUG'):
18
    logger.setLevel(logging.DEBUG)
19
else:
20
    logger.setLevel(logging.INFO)
21
22
####################################################
23
# End Static init
24
####################################################
25
26
27
def deploy_challenge(domain, txt_challenge):
28
    """
29
    Deploy the DNS challenge
30
    :param domain:  The domain where DNS challenge will be witten
31
    :param txt_challenge: The token value
32
    """
33
34
    logger.info('Deploy challenge for {}'.format(domain))
35
36
    dns_manager = DnsManager.DnsManager()
37
    dns_records = dns_manager.create_txt_entry(domain, txt_challenge)
38
39
    with open('.idRecord', 'w') as tempfile:
40
        tempfile.write(str(dns_records['id']))
41
42
    logger.info('Waiting period, DNS propagation for {}'.format(domain))
43
    sleep(30)
44
45
46
def clean_challenge(domain):
47
    """
48
    Clean the previous DNS challenge
49
    :param domain:  The domain where DNS challenge will be deleted
50
    """
51
52
    logger.info('Clean challenge for {}'.format(domain))
53
54
    with open('.idRecord', 'r') as tempfile:
55
        id_records = tempfile.readline()
56
    os.remove('.idRecord')
57
58
    dns_manager = DnsManager.DnsManager()
59
    dns_manager.delete_entry(domain, id_records)
60
61
62
def deploy_cert(domain, private_key, certificate, chain=None):
63
    """
64
    Deploy the previously generated certificate IN the IPLB
65
    :param domain: The domain where the certificate will be deploy
66
    :param private_key: The private key of certificate you want to deploy
67
    :param certificate: The certificate you want to deploy
68
    :param chain: The chain of certificate you want to deploy
69
    """
70
71
    iplb_name = os.getenv('iplb_name')
72
    logger.info('Deploy certificate for {} in {}'.format(domain, iplb_name))
73
    lb_ssl_manager = LoadBalancerSSLManager.LoadBalancerSSLManager(ip_lb_name=iplb_name)
74
    lb_ssl_manager.update_certificate(domain=domain, certif_path=certificate, privatekey_path=private_key, chain_path=chain)
75
76
77
def unchanged_cert(args):
78
    logger.debug("Certificate was unchanged. args: {}".format(args))
79
80
81
def invalid_challenge(args):
82
    logger.error("Challenge is invalid! args: {}".format(args))
83
    with open('failedRenew.log', 'a') as failedDomain:
84
        failedDomain.write('{}\n'.format(args[1]))
85
86
87
def main(argv):
88
    """
89
    Entry point of the hook
90
    :param argv: The command line args
91
    """
92
93
    logger.debug("Ovh hook executing: {}".format(argv))
94
95
    if argv[0] == 'deploy_challenge':
96
        deploy_challenge(domain=sys.argv[2], txt_challenge=sys.argv[4])
97
    elif argv[0] == 'clean_challenge':
98
        clean_challenge(domain=sys.argv[2])
99
    elif argv[0] == 'deploy_cert':
100
        deploy_cert(domain=argv[1], private_key=argv[2], certificate=argv[3], chain=argv[4])
101
    elif argv[0] == 'unchanged_cert':
102
        unchanged_cert(argv)
103
    elif argv[0] == 'invalid_challenge':
104
        invalid_challenge(argv)
105
    else:
106
        logger.fatal("Operation not managed: {0}".format(argv[0]))
107
108
109
if __name__ == '__main__':
110
    main(sys.argv[1:])
111