Completed
Pull Request — master (#2920)
by Anthony
07:24 queued 01:55
created

TokenController._handle_standalone_auth()   D

Complexity

Conditions 10

Size

Total Lines 63

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
c 1
b 0
f 0
dl 0
loc 63
rs 4.6153

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like TokenController._handle_standalone_auth() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import base64
17
18
import pecan
19
from pecan import rest
20
from six.moves import http_client
21
from oslo_config import cfg
22
23
from st2common.exceptions.auth import TokenNotFoundError, TokenExpiredError
24
from st2common.exceptions.auth import TTLTooLargeException, UserNotFoundError
25
from st2common.models.api.base import jsexpose
26
from st2common.models.api.auth import TokenAPI
27
from st2common.persistence.auth import User
28
from st2common.services.access import create_token
29
from st2common.util import auth as auth_utils
30
from st2common import log as logging
31
from st2auth.backends import get_backend_instance
32
33
34
LOG = logging.getLogger(__name__)
35
36
37
class TokenValidationController(rest.RestController):
38
39
    @jsexpose(body_cls=TokenAPI, status_code=http_client.OK)
40
    def post(self, request, **kwargs):
41
        token = getattr(request, 'token', None)
42
43
        if not token:
44
            pecan.abort(http_client.BAD_REQUEST, 'Token is not provided.')
45
46
        try:
47
            return {'valid': auth_utils.validate_token(token) is not None}
48
        except (TokenNotFoundError, TokenExpiredError):
49
            return {'valid': False}
50
        except Exception:
51
            msg = 'Unexpected error occurred while verifying token.'
52
            LOG.exception(msg)
53
            pecan.abort(http_client.INTERNAL_SERVER_ERROR, msg)
54
55
56
class TokenController(rest.RestController):
57
    validate = TokenValidationController()
58
59
    def __init__(self, *args, **kwargs):
60
        super(TokenController, self).__init__(*args, **kwargs)
61
62
        if cfg.CONF.auth.mode == 'standalone':
63
            self._auth_backend = get_backend_instance(name=cfg.CONF.auth.backend)
64
        else:
65
            self._auth_backend = None
66
67
    @jsexpose(body_cls=TokenAPI, status_code=http_client.CREATED)
68
    def post(self, request, **kwargs):
69
        if cfg.CONF.auth.mode == 'proxy':
70
            return self._handle_proxy_auth(request=request, **kwargs)
71
        elif cfg.CONF.auth.mode == 'standalone':
72
            return self._handle_standalone_auth(request=request, **kwargs)
73
74
    def _handle_proxy_auth(self, request, **kwargs):
75
        remote_addr = pecan.request.headers.get('x-forwarded-for', pecan.request.remote_addr)
76
        extra = {'remote_addr': remote_addr}
77
78
        if pecan.request.remote_user:
79
            ttl = getattr(request, 'ttl', None)
80
            try:
81
                token = self._create_token_for_user(username=pecan.request.remote_user, ttl=ttl)
82
            except TTLTooLargeException as e:
83
                self._abort_request(status_code=http_client.BAD_REQUEST,
84
                                    message=e.message)
85
            return self._process_successful_response(token=token)
86
87
        LOG.audit('Access denied to anonymous user.', extra=extra)
88
        self._abort_request()
89
90
    def _handle_standalone_auth(self, request, **kwargs):
91
        authorization = pecan.request.authorization
92
93
        auth_backend = self._auth_backend.__class__.__name__
94
        remote_addr = pecan.request.remote_addr
95
        extra = {'auth_backend': auth_backend, 'remote_addr': remote_addr}
96
97
        if not authorization:
98
            LOG.audit('Authorization header not provided', extra=extra)
99
            self._abort_request()
100
            return
101
102
        auth_type, auth_value = authorization
103
        if auth_type.lower() not in ['basic']:
104
            extra['auth_type'] = auth_type
105
            LOG.audit('Unsupported authorization type: %s' % (auth_type), extra=extra)
106
            self._abort_request()
107
            return
108
109
        try:
110
            auth_value = base64.b64decode(auth_value)
111
        except Exception:
112
            LOG.audit('Invalid authorization header', extra=extra)
113
            self._abort_request()
114
            return
115
116
        split = auth_value.split(':')
117
        if len(split) != 2:
118
            LOG.audit('Invalid authorization header', extra=extra)
119
            self._abort_request()
120
            return
121
122
        username, password = split
123
        result = self._auth_backend
124
125
        result = self._auth_backend.authenticate(username=username, password=password)
126
        if result is True:
127
            ttl = getattr(request, 'ttl', None)
128
            impersonate_user = getattr(request, 'user', None)
129
130
            if impersonate_user is not None:
131
                username = impersonate_user
132
            else:
133
                impersonate_user = getattr(request, 'impersonate_user', None)
134
            if impersonate_user is not None:
135
                try:
136
                    username = User.get_by_chatops_id(impersonate_user).username
137
                except UserNotFoundError:
138
                    message = "Could not locate user with chatops_id '%s'" % \
139
                              impersonate_user
140
                    self._abort_request(status_code=http_client.BAD_REQUEST,
141
                                        message=message)
142
                    return
143
            try:
144
                token = self._create_token_for_user(username=username, ttl=ttl)
145
                return self._process_successful_response(token=token)
146
            except TTLTooLargeException as e:
147
                self._abort_request(status_code=http_client.BAD_REQUEST,
148
                                    message=e.message)
149
                return
150
151
        LOG.audit('Invalid credentials provided', extra=extra)
152
        self._abort_request()
153
154
    def _abort_request(self, status_code=http_client.UNAUTHORIZED,
155
                       message='Invalid or missing credentials'):
156
        pecan.abort(status_code, message)
157
158
    def _process_successful_response(self, token):
159
        api_url = cfg.CONF.auth.api_url
160
        pecan.response.headers['X-API-URL'] = api_url
161
        return token
162
163
    def _create_token_for_user(self, username, ttl=None):
164
        tokendb = create_token(username=username, ttl=ttl)
165
        return TokenAPI.from_model(tokendb)
166