1
|
|
|
from asyncua.crypto import uacrypto |
2
|
|
|
import logging |
3
|
|
|
from asyncua.server.users import UserRole, User |
4
|
|
|
|
5
|
|
|
|
6
|
|
|
class UserManager: |
7
|
|
|
def get_user(self, iserver, username=None, password=None, certificate=None): |
8
|
|
|
raise NotImplementedError |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
class PermissiveUserManager: |
12
|
|
|
def get_user(self, iserver, username=None, password=None, certificate=None): |
13
|
|
|
""" |
14
|
|
|
Default user_manager, does nothing much but check for admin |
15
|
|
|
""" |
16
|
|
|
if username and iserver.allow_remote_admin and username in ("admin", "Admin"): |
17
|
|
|
return User(role=UserRole.Admin) |
18
|
|
|
else: |
19
|
|
|
return User(role=UserRole.User) |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
class CertificateUserManager: |
23
|
|
|
""" |
24
|
|
|
Certificate user manager, takes a certificate handler with its associated users and provides those users. |
25
|
|
|
""" |
26
|
|
|
def __init__(self): |
27
|
|
|
self._trusted_certificates = {} |
28
|
|
|
|
29
|
|
|
async def add_role(self, certificate_path: str, user_role: UserRole, name: str, format: str = None): |
30
|
|
|
certificate = await uacrypto.load_certificate(certificate_path, format) |
31
|
|
|
if name is None: |
32
|
|
|
raise KeyError |
33
|
|
|
|
34
|
|
|
user = User(role=user_role, name=name) |
35
|
|
|
|
36
|
|
|
if name in self._trusted_certificates: |
37
|
|
|
logging.warning(f"certificate with name {name} " |
38
|
|
|
f"attempted to be added multiple times, only the last version will be kept.") |
39
|
|
|
self._trusted_certificates[name] = {'certificate': uacrypto.der_from_x509(certificate), 'user': user} |
40
|
|
|
|
41
|
|
|
def get_user(self, iserver, username=None, password=None, certificate=None): |
42
|
|
|
if certificate is None: |
43
|
|
|
return None |
44
|
|
|
correct_users = [prospective_certificate['user'] for prospective_certificate in self._trusted_certificates.values() |
45
|
|
|
if certificate == prospective_certificate['certificate']] |
46
|
|
|
if len(correct_users) == 0: |
47
|
|
|
return None |
48
|
|
|
else: |
49
|
|
|
return correct_users[0] |
50
|
|
|
|
51
|
|
|
async def add_user(self, certificate_path: str, name: str, format: str = None): |
52
|
|
|
await self.add_role(certificate_path=certificate_path, user_role=UserRole.User, name=name, format=format) |
53
|
|
|
|
54
|
|
|
async def add_admin(self, certificate_path: str, name:str, format: str = None): |
55
|
|
|
await self.add_role(certificate_path=certificate_path, user_role=UserRole.Admin, name=name, format=format) |
56
|
|
|
|