Test Failed
Push — main ( e1f1ea...3adfc8 )
by Jochen
04:13
created

byceps.application._ensure_required_config_keys()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 1
dl 0
loc 11
ccs 4
cts 4
cp 1
crap 3
rs 9.95
c 0
b 0
f 0
1
"""
2
byceps.application
3
~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2022 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from collections.abc import Iterator
11 1
from importlib import import_module
12 1
import os
13 1
from pathlib import Path
14 1
from typing import Any, Callable, Optional
15
16 1
from flask import abort, current_app, Flask, g
17 1
from flask_babel import Babel
18 1
import jinja2
19 1
from redis import Redis
20
import rtoml
21 1
22 1
from .blueprints.blueprints import register_blueprints
23 1
from . import config, config_defaults
24 1
from .database import db
25 1
from .util.authorization import has_current_user_permission, load_permissions
26 1
from .util.l10n import get_current_user_locale
27 1
from .util import templatefilters, templatefunctions
28
from .util.templating import SiteTemplateOverridesLoader
29
30 1
31
def create_app(
32
    *,
33
    config_filename: Optional[Path | str] = None,
34
    config_overrides: Optional[dict[str, Any]] = None,
35
) -> Flask:
36 1
    """Create the actual Flask application."""
37
    app = Flask('byceps')
38 1
39 1
    _configure(app, config_filename, config_overrides)
40 1
41
    # Throw an exception when an undefined name is referenced in a template.
42
    # NB: Set via `app.jinja_options['undefined'] = ` instead of
43 1
    #     `app.jinja_env.undefined = ` as that would create the Jinja
44 1
    #      environment too early.
45
    app.jinja_options['undefined'] = jinja2.StrictUndefined
46
47 1
    babel = Babel(app)
48
    babel.locale_selector_func = get_current_user_locale
49 1
50
    # Initialize database.
51
    db.init_app(app)
52
53
    # Initialize Redis client.
54
    app.redis_client = Redis.from_url(app.config['REDIS_URL'])
55 1
56
    app_mode = config.get_app_mode(app)
57 1
58 1
    load_permissions()
59
60
    register_blueprints(app, app_mode)
61 1
62
    templatefilters.register(app)
63
    templatefunctions.register(app)
64 1
65
    _add_static_file_url_rules(app)
66 1
67
    if app_mode.is_admin():
68 1
        _init_admin_app(app)
69
    elif app_mode.is_site():
70 1
        _init_site_app(app)
71
72 1
    _load_announce_signal_handlers()
73 1
74
    return app
75 1
76
77 1
def _configure(
78 1
    app: Flask,
79 1
    config_filename: Optional[Path | str] = None,
80 1
    config_overrides: Optional[dict[str, Any]] = None,
81
) -> None:
82 1
    """Configure application from file, environment variables, and defaults."""
83
    app.config.from_object(config_defaults)
84 1
85
    if config_filename is None:
86
        config_filename = os.environ.get('BYCEPS_CONFIG')
87 1
88
    if config_filename is not None:
89 1
        if isinstance(config_filename, str):
90
            config_filename = Path(config_filename)
91
92
        if config_filename.suffix == '.py':
93
            app.config.from_pyfile(str(config_filename))
94
        else:
95
            app.config.from_file(str(config_filename), load=rtoml.load)
96 1
97 1
    if config_overrides is not None:
98 1
        app.config.from_mapping(config_overrides)
99
100
    # Allow configuration values to be overridden by environment variables.
101 1
    app.config.update(_get_config_from_environment())
102
103 1
    _ensure_required_config_keys(app)
104
105
    config.init_app(app)
106
107
108
def _get_config_from_environment() -> Iterator[tuple[str, str]]:
109
    """Obtain selected config values from environment variables."""
110
    for key in (
111 1
        'APP_MODE',
112
        'REDIS_URL',
113 1
        'SECRET_KEY',
114
        'SITE_ID',
115 1
        'SQLALCHEMY_DATABASE_URI',
116 1
    ):
117
        value = os.environ.get(key)
118
        if value:
119
            yield key, value
120 1
121
122
def _ensure_required_config_keys(app: Flask) -> None:
123 1
    """Ensure the required configuration keys have values."""
124
    for key in (
125
        'APP_MODE',
126 1
        'REDIS_URL',
127
        'SECRET_KEY',
128
        'SQLALCHEMY_DATABASE_URI',
129 1
    ):
130 1
        if not app.config.get(key):
131
            raise config.ConfigurationError(
132
                f'Missing value for configuration key "{key}".'
133 1
            )
134
135 1
136
def _add_static_file_url_rules(app: Flask) -> None:
137
    """Add URL rules to for static files."""
138
    app.add_url_rule(
139 1
        '/sites/<site_id>/<path:filename>',
140 1
        endpoint='site_file',
141
        methods=['GET'],
142
        build_only=True,
143
    )
144
145 1
146
def _init_admin_app(app: Flask) -> None:
147
    """Initialize admin application."""
148
    import rq_dashboard
149
150
    @rq_dashboard.blueprint.before_request
151
    def require_permission():
152
        if not has_current_user_permission('jobs.view'):
153
            abort(403)
154
155
    app.register_blueprint(rq_dashboard.blueprint, url_prefix='/admin/rq')
156 1
157 1
158
def _init_site_app(app: Flask) -> None:
159 1
    """Initialize site application."""
160 1
    # Incorporate site-specific template overrides.
161 1
    app.jinja_loader = SiteTemplateOverridesLoader()
162
163
    # Set up site-aware template context processor.
164 1
    app._site_context_processors = {}
165
    app.context_processor(_get_site_template_context)
166
167
168
def _get_site_template_context() -> dict[str, Any]:
169
    """Return the site-specific additions to the template context."""
170
    site_context_processor = _find_site_template_context_processor_cached(
171
        g.site_id
172
    )
173 1
174 1
    if not site_context_processor:
175 1
        return {}
176 1
177
    return site_context_processor()
178 1
179
180
def _find_site_template_context_processor_cached(
181
    site_id: str,
182
) -> Optional[Callable[[], dict[str, Any]]]:
183
    """Return the template context processor for the site.
184
185
    A processor will be cached after it has been obtained for the first
186
    time.
187
    """
188
    # `None` is a valid value for a site that does not specify a
189
    # template context processor.
190
191
    if site_id in current_app._site_context_processors:
192 1
        return current_app._site_context_processors.get(site_id)
193
    else:
194
        context_processor = _find_site_template_context_processor(site_id)
195
        current_app._site_context_processors[site_id] = context_processor
196 1
        return context_processor
197
198
199
def _find_site_template_context_processor(
200
    site_id: str,
201
) -> Optional[Callable[[], dict[str, Any]]]:
202
    """Import a template context processor from the site's package.
203
204
    If a site package contains a module named `extension` and that
205
    contains a top-level callable named `template_context_processor`,
206
    then that callable is imported and returned.
207
    """
208
    module_name = f'sites.{site_id}.extension'
209
    try:
210
        module = import_module(module_name)
211
    except ModuleNotFoundError:
212
        # No extension module found in site package.
213
        return None
214
215
    context_processor = getattr(module, 'template_context_processor', None)
216
    if context_processor is None:
217
        # Context processor not found in module.
218
        return None
219
220
    if not callable(context_processor):
221
        # Context processor object is not callable.
222
        return None
223
224
    return context_processor
225
226
227
def _load_announce_signal_handlers() -> None:
228
    """Import modules containing handlers so they connect to the
229
    corresponding signals.
230
    """
231
    from .announce import connections  # noqa: F401
232