Passed
Push — master ( b0b54c...47920a )
by Olivier
02:38
created

PermissionRuleset.check_validity()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from asyncua import ua
2
from asyncua.server.users import UserRole
3
4
WRITE_TYPES = [
5
    ua.ObjectIds.WriteRequest_Encoding_DefaultBinary,
6
    ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary,
7
    ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary,
8
    ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary,
9
    ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary,
10
    ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary,
11
    ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary,
12
    ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary,
13
    ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary
14
]
15
16
READ_TYPES = [
17
    ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary,
18
    ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary,
19
    ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary,
20
    ua.ObjectIds.ReadRequest_Encoding_DefaultBinary,
21
    ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary,
22
    ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary,
23
    ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary,
24
    ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary,
25
    ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary,
26
    ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary,
27
    ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary,
28
    ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary,
29
    ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary,
30
    ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary,
31
    ua.ObjectIds.PublishRequest_Encoding_DefaultBinary,
32
    ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary,
33
    ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary,
34
    ua.ObjectIds.CallRequest_Encoding_DefaultBinary,
35
    ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary,
36
    ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary
37
]
38
39
40
class PermissionRuleset:
41
    """
42
    Base class for permission ruleset
43
    """
44
45
    def check_validity(self, user, action_type, body):
46
        raise NotImplementedError
47
48
49
class SimpleRoleRuleset(PermissionRuleset):
50
    """
51
    Standard simple role-based ruleset.
52
    Admins alone can write, admins and users can read, and anonymous users can't do anything.
53
    """
54
55
    def __init__(self):
56
        write_ids = list(map(ua.NodeId, WRITE_TYPES))
57
        read_ids = list(map(ua.NodeId, READ_TYPES))
58
        self._permission_dict = {
59
            UserRole.Admin: set().union(write_ids, read_ids),
60
            UserRole.User: set().union(read_ids),
61
            UserRole.Anonymous: set()
62
        }
63
64
    def check_validity(self, user, action_type_id, body):
65
        if action_type_id in self._permission_dict[user.role]:
66
            return True
67
        else:
68
            return False
69
70