LoadBalancerSSLManager.update_certificate()   F
last analyzed

Complexity

Conditions 10

Size

Total Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
c 1
b 0
f 0
dl 0
loc 44
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like LoadBalancerSSLManager.update_certificate() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
4
import ovh
5
import re
6
from pathlib import Path
7
from ovh_interface.InterfaceBase import InterfaceBase
8
9
10
class LoadBalancerSSLManager(InterfaceBase):
11
    """
12
    Class which manage SSL in ovh IPLB
13
    """
14
15
    def __init__(self, ip_lb_name, ovh_client=None):
16
        """
17
        Constructor
18
        :type ip_lb_name: The name of ip LB you want to work with
19
        :param ovh_client: the obvh client you want to use. If none use environment variable to initialize the ovh client.
20
        """
21
        InterfaceBase.__init__(self, ovh_client)
22
        self.ip_lb_name = ip_lb_name
23
24
    def get_certificate_managed_by_ip_lb(self):
25
        """
26
        Get certificate managed by IP load balancer
27
        :return: list of dict {ip_lb_name, ssl_id_in_lb, ssl_cn}
28
        """
29
30
        self.logger.debug('List certificate in IP LB')
31
        regex_parse_ovh_ssl_config = r"/CN=([\w\d\.]+)"
32
33
        list_of_ssl_ip_lb_entry = []
34
35
        for ip_lb_name in self.ovh_client.get('/ipLoadbalancing'):
36
            if self.ip_lb_name == ip_lb_name:
37
                for ssl_id in self.ovh_client.get('/ipLoadbalancing/{}/ssl'.format(ip_lb_name)):
38
                    ssl_config = self.ovh_client.get('/ipLoadbalancing/{}/ssl/{}'.format(ip_lb_name, ssl_id))
39
                    matches = re.findall(regex_parse_ovh_ssl_config, ssl_config['subject'])
40
                    if len(matches) != 0:
41
                        list_of_ssl_ip_lb_entry.append({
42
                            "ip_lb_name": ip_lb_name,
43
                            "ssl_id": ssl_id,
44
                            "ssl_cn": matches[0]}
45
                        )
46
47
        self.logger.debug('SSL certif in IPLB : {}'.format(list_of_ssl_ip_lb_entry))
48
        return list_of_ssl_ip_lb_entry
49
50
    def _add_certificate(self, certif, privatekey, chain=None):
51
        """
52
        Add a certificate in the current ip lb
53
        :param certif: The content of
54
        :param privatekey:
55
        :param chain:
56
        :return: The certificate entry created or None if an error occurred
57
        """
58
59
        try:
60
            result = self.ovh_client.post('/ipLoadbalancing/{}/ssl'.format(self.ip_lb_name),
61
                                          certificate=certif.strip(),
62
                                          key=privatekey.strip(),
63
                                          chain=chain.strip()
64
                                          )
65
        except (ovh.exceptions.BadParametersError, ovh.exceptions.ResourceConflictError) as err:
66
            self.logger.error('Impossible to add certificate. err: {}'.format(err))
67
            return None
68
69
        return result
70
71
    def _get_ssl_id_from_domain(self, domain):
72
        """
73
        Get all ssl id of certificate found for a domain
74
        :param domain: The domain you want to search ssl associated with
75
        :return: List of ssl id
76
        """
77
        list_of_ssl_ip_lb_entry = self.get_certificate_managed_by_ip_lb()
78
        return (ssl_ip_lb_entry['ssl_id'] for ssl_ip_lb_entry in list_of_ssl_ip_lb_entry if ssl_ip_lb_entry['ssl_cn'] == domain)
79
80
    def _delete_certificate(self, list_of_ssl_id):
81
        """
82
        Delete a SSL certificates
83
        :param list_of_ssl_id: list of ssl entry you want to delete
84
        """
85
86
        for ssl_id in list_of_ssl_id:
87
            self.logger.debug("Delete SSL certif {}".format(ssl_id))
88
            self.ovh_client.delete('/ipLoadbalancing/{serviceName}/ssl/{id}'.format(serviceName=self.ip_lb_name, id=ssl_id))
89
90
    def update_certificate(self, domain, certif_path, privatekey_path, chain_path=None):
91
        """
92
        Update a certificate in the current ip lb
93
        :param domain: The domain you want to update
94
        :param certif_path: The path to the certificate you want to add
95
        :param privatekey_path: The path to the private key you want to add
96
        :param chain_path: The path to the  chain you want to add
97
        :return: True if success else False
98
        """
99
100
        self.logger.info("Update the certificate of {}".format(domain))
101
        self.logger.debug("certif: {}\nprivatekey:{}\nchain: {}".format(certif_path, privatekey_path, chain_path))
102
103
        if not Path(certif_path).is_file() or not Path(privatekey_path).is_file():
104
            self.logger.error("Impossible to update the certificate of {} some files doesn't exist".format(domain))
105
            return False
106
107
        if chain_path is not None and not Path(chain_path).is_file():
108
            self.logger.error("Impossible to update the certificate of {} some chain file doesn't exist {}".format(domain, chain_path))
109
            return False
110
111
        with open(certif_path, 'r') as content_file:
112
            certif = content_file.read()
113
114
        with open(privatekey_path, 'r') as content_file:
115
            privatekey = content_file.read()
116
117
        if chain_path:
118
119
            with open(chain_path, 'r') as content_file:
120
                chain = content_file.read()
121
122
        else:
123
            chain = None
124
125
        list_of_ssl_id_to_delete = self._get_ssl_id_from_domain(domain)
126
127
        self.logger.info('Add certificate for domain : {}'.format(domain))
128
        if self._add_certificate(certif, privatekey, chain) is None:
129
            return False
130
131
        self.logger.debug('Delete certificate for domain {}'.format(domain))
132
        self._delete_certificate(list_of_ssl_id_to_delete)
133
        return True
134