Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/persistence/auth.py (4 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
from st2common.exceptions.auth import (TokenNotFoundError, ApiKeyNotFoundError,
18
                                       UserNotFoundError, AmbiguousUserError,
19
                                       NoNicknameOriginProvidedError)
20
from st2common.models.db import MongoDBAccess
21
from st2common.models.db.auth import UserDB, TokenDB, ApiKeyDB
22
from st2common.persistence.base import Access
23
from st2common.util import hash as hash_utils
24
25
26
class User(Access):
27
    impl = MongoDBAccess(UserDB)
28
29
    @classmethod
30
    def get(cls, username):
31
        return cls.get_by_name(username)
32
33
    @classmethod
34
    def get_by_nickname(cls, nickname, origin):
35
        if not origin:
36
            raise NoNicknameOriginProvidedError()
37
38
        result = cls.query(**{('nicknames__%s' % origin): nickname})
39
40
        if not result.first():
41
            raise UserNotFoundError()
42
        if result.count() > 1:
43
            raise AmbiguousUserError()
44
45
        return result.first()
46
47
    @classmethod
48
    def _get_impl(cls):
49
        return cls.impl
50
51
    @classmethod
52
    def _get_by_object(cls, object):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in object.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
53
        # For User name is unique.
54
        name = getattr(object, 'name', '')
55
        return cls.get_by_name(name)
56
57
58
class Token(Access):
59
    impl = MongoDBAccess(TokenDB)
60
61
    @classmethod
62
    def _get_impl(cls):
63
        return cls.impl
64
65
    @classmethod
66
    def add_or_update(cls, model_object, publish=True):
0 ignored issues
show
Arguments number differs from overridden 'add_or_update' method
Loading history...
67
        if not getattr(model_object, 'user', None):
68
            raise ValueError('User is not provided in the token.')
69
        if not getattr(model_object, 'token', None):
70
            raise ValueError('Token value is not set.')
71
        if not getattr(model_object, 'expiry', None):
72
            raise ValueError('Token expiry is not provided in the token.')
73
        return super(Token, cls).add_or_update(model_object, publish=publish)
74
75
    @classmethod
76
    def get(cls, value):
0 ignored issues
show
Arguments number differs from overridden 'get' method
Loading history...
77
        result = cls.query(token=value).first()
78
79
        if not result:
80
            raise TokenNotFoundError()
81
82
        return result
83
84
85
class ApiKey(Access):
86
    impl = MongoDBAccess(ApiKeyDB)
87
88
    @classmethod
89
    def _get_impl(cls):
90
        return cls.impl
91
92
    @classmethod
93
    def get(cls, value):
0 ignored issues
show
Arguments number differs from overridden 'get' method
Loading history...
94
        # DB does not contain key but the key_hash.
95
        value_hash = hash_utils.hash(value)
96
        result = cls.query(key_hash=value_hash).first()
97
98
        if not result:
99
            raise ApiKeyNotFoundError('ApiKey with key_hash=%s not found.' % value_hash)
100
101
        return result
102
103
    @classmethod
104
    def get_by_key_or_id(cls, value):
105
        try:
106
            return cls.get(value)
107
        except ApiKeyNotFoundError:
108
            pass
109
        try:
110
            return cls.get_by_id(value)
111
        except:
112
            raise ApiKeyNotFoundError('ApiKey with key or id=%s not found.' % value)
113