Passed
Push — master ( b0b54c...47920a )
by Olivier
02:38
created

CertificateUserManager.add_admin()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from asyncua.crypto import uacrypto
2
import logging
3
from asyncua.server.users import UserRole, User
4
5
6
class UserManager:
7
    def get_user(self, iserver, username=None, password=None, certificate=None):
8
        raise NotImplementedError
9
10
11
class PermissiveUserManager:
12
    def get_user(self, iserver, username=None, password=None, certificate=None):
13
        """
14
        Default user_manager, does nothing much but check for admin
15
        """
16
        if username and iserver.allow_remote_admin and username in ("admin", "Admin"):
17
            return User(role=UserRole.Admin)
18
        else:
19
            return User(role=UserRole.User)
20
21
22
class CertificateUserManager:
23
    """
24
    Certificate user manager, takes a certificate handler with its associated users and provides those users.
25
    """
26
    def __init__(self):
27
        self._trusted_certificates = {}
28
29
    async def add_role(self, certificate_path: str, user_role: UserRole, name: str, format: str = None):
30
        certificate = await uacrypto.load_certificate(certificate_path, format)
31
        if name is None:
32
            raise KeyError
33
34
        user = User(role=user_role, name=name)
35
36
        if name in self._trusted_certificates:
37
            logging.warning(f"certificate with name {name} "
38
                            f"attempted to be added multiple times, only the last version will be kept.")
39
        self._trusted_certificates[name] = {'certificate': uacrypto.der_from_x509(certificate), 'user': user}
40
41
    def get_user(self, iserver, username=None, password=None, certificate=None):
42
        if certificate is None:
43
            return None
44
        correct_users = [prospective_certificate['user'] for prospective_certificate in self._trusted_certificates.values()
45
                         if certificate == prospective_certificate['certificate']]
46
        if len(correct_users) == 0:
47
            return None
48
        else:
49
            return correct_users[0]
50
51
    async def add_user(self, certificate_path: str, name: str, format: str = None):
52
        await self.add_role(certificate_path=certificate_path, user_role=UserRole.User, name=name, format=format)
53
54
    async def add_admin(self, certificate_path: str, name:str, format: str = None):
55
        await self.add_role(certificate_path=certificate_path, user_role=UserRole.Admin, name=name, format=format)
56