Passed
Pull Request — master (#608)
by
unknown
14:06
created

TabPyApp._create_tornado_web_app()   B

Complexity

Conditions 4

Size

Total Lines 71
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 4.2159

Importance

Changes 0
Metric Value
eloc 52
dl 0
loc 71
ccs 16
cts 21
cp 0.7619
rs 8.5709
c 0
b 0
f 0
cc 4
nop 1
crap 4.2159

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1 1
import concurrent.futures
2 1
import configparser
3 1
import logging
4 1
import multiprocessing
5 1
import os
6 1
import shutil
7 1
import signal
8 1
import sys
9 1
import _thread
10
11 1
import tornado
12 1
from tornado.http1connection import HTTP1Connection
13
14 1
import tabpy
15 1
import tabpy.tabpy_server.app.arrow_server as pa
16 1
from tabpy.tabpy import __version__
17 1
from tabpy.tabpy_server.app.app_parameters import ConfigParameters, SettingsParameters
18 1
from tabpy.tabpy_server.app.util import parse_pwd_file
19 1
from tabpy.tabpy_server.handlers.basic_auth_server_middleware_factory import BasicAuthServerMiddlewareFactory
20 1
from tabpy.tabpy_server.handlers.no_op_auth_handler import NoOpAuthHandler
21 1
from tabpy.tabpy_server.management.state import TabPyState
22 1
from tabpy.tabpy_server.management.util import _get_state_from_file
23 1
from tabpy.tabpy_server.psws.callbacks import init_model_evaluator, init_ps_server
24 1
from tabpy.tabpy_server.psws.python_service import PythonService, PythonServiceHandler
25 1
from tabpy.tabpy_server.handlers import (
26
    EndpointHandler,
27
    EndpointsHandler,
28
    EvaluationPlaneHandler,
29
    EvaluationPlaneDisabledHandler,
30
    QueryPlaneHandler,
31
    ServiceInfoHandler,
32
    StatusHandler,
33
    UploadDestinationHandler,
34
)
35
36 1
logger = logging.getLogger(__name__)
37
38 1
def _init_asyncio_patch():
39
    """
40
    Select compatible event loop for Tornado 5+.
41
    As of Python 3.8, the default event loop on Windows is `proactor`,
42
    however Tornado requires the old default "selector" event loop.
43
    As Tornado has decided to leave this to users to set, MkDocs needs
44
    to set it. See https://github.com/tornadoweb/tornado/issues/2608.
45
    """
46 1
    if sys.platform.startswith("win") and sys.version_info >= (3, 8):
47
        import asyncio
48
        try:
49
            from asyncio import WindowsSelectorEventLoopPolicy
50
        except ImportError:
51
            pass  # Can't assign a policy which doesn't exist.
52
        else:
53
            if not isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy):
54
                asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
55
56
57 1
class TabPyApp:
58
    """
59
    TabPy application class for keeping context like settings, state, etc.
60
    """
61
62 1
    settings = {}
63 1
    subdirectory = ""
64 1
    tabpy_state = None
65 1
    python_service = None
66 1
    credentials = {}
67 1
    arrow_server = None
68 1
    max_request_size = None
69
70 1
    def __init__(self, config_file):
71 1
        if config_file is None:
72 1
            config_file = os.path.join(
73
                os.path.dirname(__file__), os.path.pardir, "common", "default.conf"
74
            )
75
76 1
        if os.path.isfile(config_file):
77 1
            try:
78 1
                from logging import config
79 1
                config.fileConfig(config_file, disable_existing_loggers=False)
80 1
            except KeyError:
81 1
                logging.basicConfig(level=logging.DEBUG)
82
83 1
        self._parse_config(config_file)
84
85 1
    def _get_tls_certificates(self, config):
86
        tls_certificates = []
87
        cert = config[SettingsParameters.CertificateFile]
88
        key = config[SettingsParameters.KeyFile]
89
        with open(cert, "rb") as cert_file:
90
            tls_cert_chain = cert_file.read()
91
        with open(key, "rb") as key_file:
92
            tls_private_key = key_file.read()
93
        tls_certificates.append((tls_cert_chain, tls_private_key))
94
        return tls_certificates
95
    
96 1
    def _get_arrow_server(self, config):
97
        verify_client = None
98
        tls_certificates = None
99
        scheme = "grpc+tcp"
100
        if config[SettingsParameters.TransferProtocol] == "https":
101
            scheme = "grpc+tls"
102
            tls_certificates = self._get_tls_certificates(config)
103
104
        host = "0.0.0.0"
105
        port = config.get(SettingsParameters.ArrowFlightPort)
106
        location = "{}://{}:{}".format(scheme, host, port)
107
108
        auth_middleware = None
109
        if "authentication" in config[SettingsParameters.ApiVersions]["v1"]["features"]:
110
            _, creds = parse_pwd_file(config[ConfigParameters.TABPY_PWD_FILE])
111
            auth_middleware = {
112
                "basic": BasicAuthServerMiddlewareFactory(creds)
113
            }
114
115
        server = pa.FlightServer(host, location,
116
                            tls_certificates=tls_certificates,
117
                            verify_client=verify_client, auth_handler=NoOpAuthHandler(),
118
                            middleware=auth_middleware)
119
        return server
120
121 1
    def run(self):
122
        application = self._create_tornado_web_app()
123
        self.max_request_size = (
124
            int(self.settings[SettingsParameters.MaxRequestSizeInMb]) * 1024 * 1024
125
        )
126
        logger.info(f"Setting max request size to {self.max_request_size} bytes")
127
128
        init_model_evaluator(self.settings, self.tabpy_state, self.python_service)
129
130
        protocol = self.settings[SettingsParameters.TransferProtocol]
131
        ssl_options = None
132
        if protocol == "https":
133
            ssl_options = {
134
                "certfile": self.settings[SettingsParameters.CertificateFile],
135
                "keyfile": self.settings[SettingsParameters.KeyFile],
136
            }
137
        elif protocol != "http":
138
            msg = f"Unsupported transfer protocol {protocol}."
139
            logger.critical(msg)
140
            raise RuntimeError(msg)
141
142
        settings = {}
143
        if self.settings[SettingsParameters.GzipEnabled] is True:
144
            settings["decompress_request"] = True
145
146
        application.listen(
147
            self.settings[SettingsParameters.Port],
148
            ssl_options=ssl_options,
149
            max_buffer_size=self.max_request_size,
150
            max_body_size=self.max_request_size,
151
            **settings,
152
        ) 
153
154
        logger.info(
155
            "Web service listening on port "
156
            f"{str(self.settings[SettingsParameters.Port])}"
157
        )
158
159
        if self.settings[SettingsParameters.ArrowEnabled]:
160
            def start_pyarrow():
161
                self.arrow_server = self._get_arrow_server(self.settings)
162
                pa.start(self.arrow_server)
163
164
            try:
165
                _thread.start_new_thread(start_pyarrow, ())
166
            except Exception as e:
167
                logger.critical(f"Failed to start PyArrow server: {e}")
168
169
        tornado.ioloop.IOLoop.instance().start()
170
171 1
    def _create_tornado_web_app(self):
172 1
        class TabPyTornadoApp(tornado.web.Application):
173 1
            is_closing = False
174
175 1
            def signal_handler(self, signal, _):
176
                logger.critical(f"Exiting on signal {signal}...")
177
                self.is_closing = True
178
179 1
            def try_exit(self):
180
                if self.is_closing:
181
                    tornado.ioloop.IOLoop.instance().stop()
182
                    logger.info("Shutting down TabPy...")
183
184 1
        logger.info("Initializing TabPy...")
185 1
        tornado.ioloop.IOLoop.instance().run_sync(
186
            lambda: init_ps_server(self.settings, self.tabpy_state)
187
        )
188 1
        logger.info("Done initializing TabPy.")
189
190 1
        executor = concurrent.futures.ThreadPoolExecutor(
191
            max_workers=multiprocessing.cpu_count()
192
        )
193
194
        # initialize Tornado application
195 1
        _init_asyncio_patch()
196 1
        application = TabPyTornadoApp(
197
            [
198
                (
199
                    self.subdirectory + r"/query/([^/]+)",
200
                    QueryPlaneHandler,
201
                    dict(app=self),
202
                ),
203
                (self.subdirectory + r"/status", StatusHandler, dict(app=self)),
204
                (self.subdirectory + r"/info", ServiceInfoHandler, dict(app=self)),
205
                (self.subdirectory + r"/endpoints", EndpointsHandler, dict(app=self)),
206
                (
207
                    self.subdirectory + r"/endpoints/([^/]+)?",
208
                    EndpointHandler,
209
                    dict(app=self),
210
                ),
211
                (
212
                    self.subdirectory + r"/evaluate",
213
                    EvaluationPlaneHandler if self.settings[SettingsParameters.EvaluateEnabled]
214
                    else EvaluationPlaneDisabledHandler,
215
                    dict(executor=executor, app=self),
216
                ),
217
                (
218
                    self.subdirectory + r"/configurations/endpoint_upload_destination",
219
                    UploadDestinationHandler,
220
                    dict(app=self),
221
                ),
222
                (
223
                    self.subdirectory + r"/(.*)",
224
                    tornado.web.StaticFileHandler,
225
                    dict(
226
                        path=self.settings[SettingsParameters.StaticPath],
227
                        default_filename="index.html",
228
                    ),
229
                ),
230
            ],
231
            debug=False,
232
            **self.settings,
233
        )
234
235 1
        signal.signal(signal.SIGINT, application.signal_handler)
236 1
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
237
238 1
        signal.signal(signal.SIGINT, application.signal_handler)
239 1
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
240
241 1
        return application
242
243 1
    def _set_parameter(self, parser, settings_key, config_key, default_val, parse_function):
244 1
        key_is_set = False
245
246 1
        if (
247
            config_key is not None
248
            and parser.has_section("TabPy")
249
            and parser.has_option("TabPy", config_key)
250
        ):
251 1
            if parse_function is None:
252 1
                parse_function = parser.get
253 1
            self.settings[settings_key] = parse_function("TabPy", config_key)
254 1
            key_is_set = True
255 1
            logger.debug(
256
                f"Parameter {settings_key} set to "
257
                f'"{self.settings[settings_key]}" '
258
                "from config file or environment variable"
259
            )
260
261 1
        if not key_is_set and default_val is not None:
262 1
            self.settings[settings_key] = default_val
263 1
            key_is_set = True
264 1
            logger.debug(
265
                f"Parameter {settings_key} set to "
266
                f'"{self.settings[settings_key]}" '
267
                "from default value"
268
            )
269
270 1
        if not key_is_set:
271 1
            logger.debug(f"Parameter {settings_key} is not set")
272
273 1
    def _parse_config(self, config_file):
274
        """Provide consistent mechanism for pulling in configuration.
275
276
        Attempt to retain backward compatibility for
277
        existing implementations by grabbing port
278
        setting from CLI first.
279
280
        Take settings in the following order:
281
282
        1. CLI arguments if present
283
        2. config file
284
        3. OS environment variables (for ease of
285
           setting defaults if not present)
286
        4. current defaults if a setting is not present in any location
287
288
        Additionally provide similar configuration capabilities in between
289
        config file and environment variables.
290
        For consistency use the same variable name in the config file as
291
        in the os environment.
292
        For naming standards use all capitals and start with 'TABPY_'
293
        """
294 1
        self.settings = {}
295 1
        self.subdirectory = ""
296 1
        self.tabpy_state = None
297 1
        self.python_service = None
298 1
        self.credentials = {}
299
300 1
        pkg_path = os.path.dirname(tabpy.__file__)
301
302 1
        parser = configparser.ConfigParser(os.environ)
303 1
        logger.info(f"Parsing config file {config_file}")
304
305 1
        file_exists = False
306 1
        if os.path.isfile(config_file):
307 1
            try:
308 1
                with open(config_file, 'r') as f:
309 1
                    parser.read_string(f.read())
310 1
                    file_exists = True
311
            except Exception:
312
                pass
313
314 1
        if not file_exists:
315 1
            logger.warning(
316
                f"Unable to open config file {config_file}, "
317
                "using default settings."
318
            )
319
320 1
        settings_parameters = [
321
            (SettingsParameters.Port, ConfigParameters.TABPY_PORT, 9004, None),
322
            (SettingsParameters.ServerVersion, None, __version__, None),
323
            (SettingsParameters.EvaluateEnabled, ConfigParameters.TABPY_EVALUATE_ENABLE,
324
             True, parser.getboolean),
325
            (SettingsParameters.EvaluateTimeout, ConfigParameters.TABPY_EVALUATE_TIMEOUT,
326
             30, parser.getfloat),
327
            (SettingsParameters.UploadDir, ConfigParameters.TABPY_QUERY_OBJECT_PATH,
328
             os.path.join(pkg_path, "tmp", "query_objects"), None),
329
            (SettingsParameters.TransferProtocol, ConfigParameters.TABPY_TRANSFER_PROTOCOL,
330
             "http", None),
331
            (SettingsParameters.CertificateFile, ConfigParameters.TABPY_CERTIFICATE_FILE,
332
             None, None),
333
            (SettingsParameters.KeyFile, ConfigParameters.TABPY_KEY_FILE, None, None),
334
            (SettingsParameters.StateFilePath, ConfigParameters.TABPY_STATE_PATH,
335
             os.path.join(pkg_path, "tabpy_server"), None),
336
            (SettingsParameters.StaticPath, ConfigParameters.TABPY_STATIC_PATH,
337
             os.path.join(pkg_path, "tabpy_server", "static"), None),
338
            (ConfigParameters.TABPY_PWD_FILE, ConfigParameters.TABPY_PWD_FILE, None, None),
339
            (SettingsParameters.LogRequestContext, ConfigParameters.TABPY_LOG_DETAILS,
340
             "false", None),
341
            (SettingsParameters.MaxRequestSizeInMb, ConfigParameters.TABPY_MAX_REQUEST_SIZE_MB,
342
             100, None),
343
            (SettingsParameters.GzipEnabled, ConfigParameters.TABPY_GZIP_ENABLE,
344
             True, parser.getboolean),
345
            (SettingsParameters.ArrowEnabled, ConfigParameters.TABPY_ARROW_ENABLE, False, parser.getboolean), 
346
            (SettingsParameters.ArrowFlightPort, ConfigParameters.TABPY_ARROWFLIGHT_PORT, 13622, parser.getint),
347
        ]
348
349 1
        for setting, parameter, default_val, parse_function in settings_parameters:
350 1
            self._set_parameter(parser, setting, parameter, default_val, parse_function)
351
352 1
        if not os.path.exists(self.settings[SettingsParameters.UploadDir]):
353 1
            os.makedirs(self.settings[SettingsParameters.UploadDir])
354
355
        # set and validate transfer protocol
356 1
        self.settings[SettingsParameters.TransferProtocol] = self.settings[
357
            SettingsParameters.TransferProtocol
358
        ].lower()
359
360 1
        self._validate_transfer_protocol_settings()
361
362
        # if state.ini does not exist try and create it - remove
363
        # last dependence on batch/shell script
364 1
        self.settings[SettingsParameters.StateFilePath] = os.path.realpath(
365
            os.path.normpath(
366
                os.path.expanduser(self.settings[SettingsParameters.StateFilePath])
367
            )
368
        )
369 1
        state_config, self.tabpy_state = self._build_tabpy_state()
370
371 1
        self.python_service = PythonServiceHandler(PythonService())
372 1
        self.settings["compress_response"] = True
373 1
        self.settings[SettingsParameters.StaticPath] = os.path.abspath(
374
            self.settings[SettingsParameters.StaticPath]
375
        )
376 1
        logger.debug(
377
            f"Static pages folder set to "
378
            f'"{self.settings[SettingsParameters.StaticPath]}"'
379
        )
380
381
        # Set subdirectory from config if applicable
382 1
        if state_config.has_option("Service Info", "Subdirectory"):
383 1
            self.subdirectory = "/" + state_config.get("Service Info", "Subdirectory")
384
385
        # If passwords file specified load credentials
386 1
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
387 1
            if not self._parse_pwd_file():
388 1
                msg = (
389
                    "Failed to read passwords file "
390
                    f"{self.settings[ConfigParameters.TABPY_PWD_FILE]}"
391
                )
392 1
                logger.critical(msg)
393 1
                raise RuntimeError(msg)
394
        else:
395 1
            logger.info(
396
                "Password file is not specified: " "Authentication is not enabled"
397
            )
398
399 1
        features = self._get_features()
400 1
        self.settings[SettingsParameters.ApiVersions] = {"v1": {"features": features}}
401
402 1
        self.settings[SettingsParameters.LogRequestContext] = (
403
            self.settings[SettingsParameters.LogRequestContext].lower() != "false"
404
        )
405 1
        call_context_state = (
406
            "enabled"
407
            if self.settings[SettingsParameters.LogRequestContext]
408
            else "disabled"
409
        )
410 1
        logger.info(f"Call context logging is {call_context_state}")
411
412 1
    def _validate_transfer_protocol_settings(self):
413 1
        if SettingsParameters.TransferProtocol not in self.settings:
414
            msg = "Missing transfer protocol information."
415
            logger.critical(msg)
416
            raise RuntimeError(msg)
417
418 1
        protocol = self.settings[SettingsParameters.TransferProtocol]
419
420 1
        if protocol == "http":
421 1
            return
422
423 1
        if protocol != "https":
424 1
            msg = f"Unsupported transfer protocol: {protocol}"
425 1
            logger.critical(msg)
426 1
            raise RuntimeError(msg)
427
428 1
        self._validate_cert_key_state(
429
            "The parameter(s) {} must be set.",
430
            SettingsParameters.CertificateFile in self.settings,
431
            SettingsParameters.KeyFile in self.settings,
432
        )
433 1
        cert = self.settings[SettingsParameters.CertificateFile]
434
435 1
        self._validate_cert_key_state(
436
            "The parameter(s) {} must point to " "an existing file.",
437
            os.path.isfile(cert),
438
            os.path.isfile(self.settings[SettingsParameters.KeyFile]),
439
        )
440 1
        tabpy.tabpy_server.app.util.validate_cert(cert)
441
442 1
    @staticmethod
443
    def _validate_cert_key_state(msg, cert_valid, key_valid):
444 1
        cert_and_key_param = (
445
            f"{ConfigParameters.TABPY_CERTIFICATE_FILE} and "
446
            f"{ConfigParameters.TABPY_KEY_FILE}"
447
        )
448 1
        https_error = "Error using HTTPS: "
449 1
        err = None
450 1
        if not cert_valid and not key_valid:
451 1
            err = https_error + msg.format(cert_and_key_param)
452 1
        elif not cert_valid:
453 1
            err = https_error + msg.format(ConfigParameters.TABPY_CERTIFICATE_FILE)
454 1
        elif not key_valid:
455 1
            err = https_error + msg.format(ConfigParameters.TABPY_KEY_FILE)
456
457 1
        if err is not None:
458 1
            logger.critical(err)
459 1
            raise RuntimeError(err)
460
461 1
    def _parse_pwd_file(self):
462 1
        succeeded, self.credentials = parse_pwd_file(
463
            self.settings[ConfigParameters.TABPY_PWD_FILE]
464
        )
465
466 1
        if succeeded and len(self.credentials) == 0:
467 1
            logger.error("No credentials found")
468 1
            succeeded = False
469
470 1
        return succeeded
471
472 1
    def _get_features(self):
473 1
        features = {}
474
475
        # Check for auth
476 1
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
477 1
            features["authentication"] = {
478
                "required": True,
479
                "methods": {"basic-auth": {}},
480
            }
481
482 1
        features["evaluate_enabled"] = self.settings[SettingsParameters.EvaluateEnabled]
483 1
        features["gzip_enabled"] = self.settings[SettingsParameters.GzipEnabled]
484 1
        features["arrow_enabled"] = self.settings[SettingsParameters.ArrowEnabled]
485 1
        return features
486
487 1
    def _build_tabpy_state(self):
488 1
        pkg_path = os.path.dirname(tabpy.__file__)
489 1
        state_file_dir = self.settings[SettingsParameters.StateFilePath]
490 1
        state_file_path = os.path.join(state_file_dir, "state.ini")
491 1
        if not os.path.isfile(state_file_path):
492
            state_file_template_path = os.path.join(
493
                pkg_path, "tabpy_server", "state.ini.template"
494
            )
495
            logger.debug(
496
                f"File {state_file_path} not found, creating from "
497
                f"template {state_file_template_path}..."
498
            )
499
            shutil.copy(state_file_template_path, state_file_path)
500
501 1
        logger.info(f"Loading state from state file {state_file_path}")
502 1
        tabpy_state = _get_state_from_file(state_file_dir)
503 1
        return tabpy_state, TabPyState(config=tabpy_state, settings=self.settings)
504
505
506
# Override _read_body to allow content with size exceeding max_body_size
507
# This enables proper handling of 413 errors in base_handler
508 1
def _read_body_allow_max_size(self, code, headers, delegate):
509 1
    if "Content-Length" in headers:
510 1
        content_length = int(headers["Content-Length"])
511 1
        if content_length > self._max_body_size:
512
            return
513 1
    return self.original_read_body(code, headers, delegate)
514
515 1
HTTP1Connection.original_read_body = HTTP1Connection._read_body
516
HTTP1Connection._read_body = _read_body_allow_max_size
517