Test Failed
Pull Request — master (#209)
by
unknown
05:45
created

asyncua.crypto.certificate_handler   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 38
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 38
rs 10
c 0
b 0
f 0
wmc 8

5 Methods

Rating   Name   Duplication   Size   Complexity  
A CertificateHandler.__init__() 0 2 1
A CertificateHandler.check_certificate() 0 2 1
A CertificateHandler.trust_certificate() 0 10 3
A CertificateHandler.get_user() 0 7 2
A CertificateHandler.__contains__() 0 4 1
1
from asyncua.crypto import uacrypto
2
import sys
3
import logging
4
from asyncua.server.users import UserRole, User
5
sys.path.append('..')
6
7
8
class CertificateHandler:
9
    def __init__(self):
10
        self._trusted_certificates = {}
11
12
    async def trust_certificate(self, certificate_path: str, format: str = None, label: str = None,
13
                                user_role=UserRole.User):
14
        certificate = await uacrypto.load_certificate(certificate_path, format)
15
        if label is None:
16
            label = certificate_path
17
        user = User(role=user_role, name=label)
18
        if label in self._trusted_certificates:
19
            logging.warning(f"certificate with label {label} "
20
                            f"attempted to be added multiple times, only the last version will be kept.")
21
        self._trusted_certificates[label] = {'certificate': uacrypto.der_from_x509(certificate), 'user':user}
22
23
    def __contains__(self, certificate):
24
        return any(certificate == prospective_cert['certificate']
25
                   for prospective_cert
26
                   in self._trusted_certificates.values())
27
28
    def check_certificate(self, certificate):
29
        return certificate in self
30
31
    def get_user(self, certificate):
32
        correct_users = [prospective_certificate['user'] for prospective_certificate in self._trusted_certificates.values()
33
                         if certificate == prospective_certificate['certificate']]
34
        if len(correct_users) == 0:
35
            return None
36
        else:
37
            return correct_users[0]
38