Test Failed
Pull Request — master (#593)
by
unknown
13:45
created

TabPyApp._build_tabpy_state()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.0625

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 17
ccs 3
cts 4
cp 0.75
rs 9.75
c 0
b 0
f 0
cc 2
nop 1
crap 2.0625
1 1
import concurrent.futures
2 1
import configparser
3 1
import logging
4 1
import multiprocessing
5 1
import os
6 1
import shutil
7 1
import signal
8 1
import sys
9 1
import tabpy
10 1
from tabpy.tabpy import __version__
11 1
from tabpy.tabpy_server.app.app_parameters import ConfigParameters, SettingsParameters
12 1
from tabpy.tabpy_server.app.util import parse_pwd_file
13 1
from tabpy.tabpy_server.management.state import TabPyState
14 1
from tabpy.tabpy_server.management.util import _get_state_from_file
15 1
from tabpy.tabpy_server.psws.callbacks import init_model_evaluator, init_ps_server
16 1
from tabpy.tabpy_server.psws.python_service import PythonService, PythonServiceHandler
17 1
from tabpy.tabpy_server.handlers import (
18
    EndpointHandler,
19
    EndpointsHandler,
20
    EvaluationPlaneHandler,
21
    EvaluationPlaneDisabledHandler,
22
    QueryPlaneHandler,
23
    ServiceInfoHandler,
24
    StatusHandler,
25
    UploadDestinationHandler,
26
)
27 1
import tornado
28
import tabpy.tabpy_server.app.arrow_server as pa
29
import _thread
30 1
31
logger = logging.getLogger(__name__)
32
33 1
def _init_asyncio_patch():
34
    """
35
    Select compatible event loop for Tornado 5+.
36
    As of Python 3.8, the default event loop on Windows is `proactor`,
37
    however Tornado requires the old default "selector" event loop.
38
    As Tornado has decided to leave this to users to set, MkDocs needs
39
    to set it. See https://github.com/tornadoweb/tornado/issues/2608.
40
    """
41 1
    if sys.platform.startswith("win") and sys.version_info >= (3, 8):
42
        import asyncio
43
        try:
44
            from asyncio import WindowsSelectorEventLoopPolicy
45
        except ImportError:
46
            pass  # Can't assign a policy which doesn't exist.
47
        else:
48
            if not isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy):
49
                asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
50
51
52 1
class TabPyApp:
53
    """
54
    TabPy application class for keeping context like settings, state, etc.
55
    """
56
57 1
    settings = {}
58 1
    subdirectory = ""
59 1
    tabpy_state = None
60 1
    python_service = None
61 1
    credentials = {}
62
63 1
    def __init__(self, config_file):
64 1
        if config_file is None:
65 1
            config_file = os.path.join(
66
                os.path.dirname(__file__), os.path.pardir, "common", "default.conf"
67
            )
68
69 1
        if os.path.isfile(config_file):
70 1
            try:
71 1
                from logging import config
72 1
                config.fileConfig(config_file, disable_existing_loggers=False)
73 1
            except KeyError:
74 1
                logging.basicConfig(level=logging.DEBUG)
75
76 1
        self._parse_config(config_file)
77
78 1
    def run(self):
79
        application = self._create_tornado_web_app()
80
        max_request_size = (
81
            int(self.settings[SettingsParameters.MaxRequestSizeInMb]) * 1024 * 1024
82
        )
83
        logger.info(f"Setting max request size to {max_request_size} bytes")
84
85
        init_model_evaluator(self.settings, self.tabpy_state, self.python_service)
86
87
        protocol = self.settings[SettingsParameters.TransferProtocol]
88
        ssl_options = None
89
        if protocol == "https":
90
            ssl_options = {
91
                "certfile": self.settings[SettingsParameters.CertificateFile],
92
                "keyfile": self.settings[SettingsParameters.KeyFile],
93
            }
94
        elif protocol != "http":
95
            msg = f"Unsupported transfer protocol {protocol}."
96
            logger.critical(msg)
97
            raise RuntimeError(msg)
98
99
        settings = {}
100
        if self.settings[SettingsParameters.GzipEnabled] is True:
101
            settings["decompress_request"] = True
102
103
        application.listen(
104
            self.settings[SettingsParameters.Port],
105
            ssl_options=ssl_options,
106
            max_buffer_size=max_request_size,
107
            max_body_size=max_request_size,
108
            **settings,
109
        ) 
110
111
        logger.info(
112
            "Web service listening on port "
113
            f"{str(self.settings[SettingsParameters.Port])}"
114
        )
115
116 1
        # Define a function for the thread
117 1
        def start_pyarrow():
118 1
            pa.start()
119
120 1
        try:
121
            _thread.start_new_thread(start_pyarrow, ())
122
        except Exception as e:
123
            print(e)
124 1
            print("Error: unable to start pyarrow server")
125
126
        tornado.ioloop.IOLoop.instance().start()
127
128
    def _create_tornado_web_app(self):
129 1
        class TabPyTornadoApp(tornado.web.Application):
130 1
            is_closing = False
131
132
            def signal_handler(self, signal, _):
133 1
                logger.critical(f"Exiting on signal {signal}...")
134
                self.is_closing = True
135 1
136
            def try_exit(self):
137
                if self.is_closing:
138
                    tornado.ioloop.IOLoop.instance().stop()
139
                    logger.info("Shutting down TabPy...")
140 1
141 1
        logger.info("Initializing TabPy...")
142
        tornado.ioloop.IOLoop.instance().run_sync(
143
            lambda: init_ps_server(self.settings, self.tabpy_state)
144
        )
145
        logger.info("Done initializing TabPy.")
146
147
        executor = concurrent.futures.ThreadPoolExecutor(
148
            max_workers=multiprocessing.cpu_count()
149
        )
150
151
        # initialize Tornado application
152
        _init_asyncio_patch()
153
        application = TabPyTornadoApp(
154
            [
155
                (
156
                    self.subdirectory + r"/query/([^/]+)",
157
                    QueryPlaneHandler,
158
                    dict(app=self),
159
                ),
160
                (self.subdirectory + r"/status", StatusHandler, dict(app=self)),
161
                (self.subdirectory + r"/info", ServiceInfoHandler, dict(app=self)),
162
                (self.subdirectory + r"/endpoints", EndpointsHandler, dict(app=self)),
163
                (
164
                    self.subdirectory + r"/endpoints/([^/]+)?",
165
                    EndpointHandler,
166
                    dict(app=self),
167
                ),
168
                (
169
                    self.subdirectory + r"/evaluate",
170
                    EvaluationPlaneHandler if self.settings[SettingsParameters.EvaluateEnabled]
171
                    else EvaluationPlaneDisabledHandler,
172
                    dict(executor=executor, app=self),
173
                ),
174
                (
175
                    self.subdirectory + r"/configurations/endpoint_upload_destination",
176
                    UploadDestinationHandler,
177
                    dict(app=self),
178
                ),
179
                (
180 1
                    self.subdirectory + r"/(.*)",
181 1
                    tornado.web.StaticFileHandler,
182
                    dict(
183 1
                        path=self.settings[SettingsParameters.StaticPath],
184 1
                        default_filename="index.html",
185
                    ),
186 1
                ),
187
            ],
188 1
            debug=False,
189 1
            **self.settings,
190
        )
191 1
192
        signal.signal(signal.SIGINT, application.signal_handler)
193
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
194
195
        signal.signal(signal.SIGINT, application.signal_handler)
196 1
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
197 1
198 1
        return application
199 1
200 1
    def _set_parameter(self, parser, settings_key, config_key, default_val, parse_function):
201
        key_is_set = False
202
203
        if (
204
            config_key is not None
205
            and parser.has_section("TabPy")
206 1
            and parser.has_option("TabPy", config_key)
207 1
        ):
208 1
            if parse_function is None:
209 1
                parse_function = parser.get
210
            self.settings[settings_key] = parse_function("TabPy", config_key)
211
            key_is_set = True
212
            logger.debug(
213
                f"Parameter {settings_key} set to "
214
                f'"{self.settings[settings_key]}" '
215 1
                "from config file or environment variable"
216 1
            )
217
218 1
        if not key_is_set and default_val is not None:
219
            self.settings[settings_key] = default_val
220
            key_is_set = True
221
            logger.debug(
222
                f"Parameter {settings_key} set to "
223
                f'"{self.settings[settings_key]}" '
224
                "from default value"
225
            )
226
227
        if not key_is_set:
228
            logger.debug(f"Parameter {settings_key} is not set")
229
230
    def _parse_config(self, config_file):
231
        """Provide consistent mechanism for pulling in configuration.
232
233
        Attempt to retain backward compatibility for
234
        existing implementations by grabbing port
235
        setting from CLI first.
236
237
        Take settings in the following order:
238
239 1
        1. CLI arguments if present
240 1
        2. config file
241 1
        3. OS environment variables (for ease of
242 1
           setting defaults if not present)
243 1
        4. current defaults if a setting is not present in any location
244
245 1
        Additionally provide similar configuration capabilities in between
246
        config file and environment variables.
247 1
        For consistency use the same variable name in the config file as
248 1
        in the os environment.
249
        For naming standards use all capitals and start with 'TABPY_'
250 1
        """
251 1
        self.settings = {}
252 1
        self.subdirectory = ""
253 1
        self.tabpy_state = None
254 1
        self.python_service = None
255 1
        self.credentials = {}
256
257
        pkg_path = os.path.dirname(tabpy.__file__)
258
259 1
        parser = configparser.ConfigParser(os.environ)
260 1
        logger.info(f"Parsing config file {config_file}")
261
262
        file_exists = False
263
        if os.path.isfile(config_file):
264
            try:
265 1
                with open(config_file, 'r') as f:
266
                    parser.read_string(f.read())
267
                    file_exists = True
268
            except Exception:
269
                pass
270
271
        if not file_exists:
272
            logger.warning(
273
                f"Unable to open config file {config_file}, "
274
                "using default settings."
275
            )
276
277
        settings_parameters = [
278
            (SettingsParameters.Port, ConfigParameters.TABPY_PORT, 9004, None),
279
            (SettingsParameters.ServerVersion, None, __version__, None),
280
            (SettingsParameters.EvaluateEnabled, ConfigParameters.TABPY_EVALUATE_ENABLE,
281
             True, parser.getboolean),
282
            (SettingsParameters.EvaluateTimeout, ConfigParameters.TABPY_EVALUATE_TIMEOUT,
283
             30, parser.getfloat),
284
            (SettingsParameters.UploadDir, ConfigParameters.TABPY_QUERY_OBJECT_PATH,
285
             os.path.join(pkg_path, "tmp", "query_objects"), None),
286
            (SettingsParameters.TransferProtocol, ConfigParameters.TABPY_TRANSFER_PROTOCOL,
287
             "http", None),
288
            (SettingsParameters.CertificateFile, ConfigParameters.TABPY_CERTIFICATE_FILE,
289
             None, None),
290
            (SettingsParameters.KeyFile, ConfigParameters.TABPY_KEY_FILE, None, None),
291
            (SettingsParameters.StateFilePath, ConfigParameters.TABPY_STATE_PATH,
292 1
             os.path.join(pkg_path, "tabpy_server"), None),
293 1
            (SettingsParameters.StaticPath, ConfigParameters.TABPY_STATIC_PATH,
294
             os.path.join(pkg_path, "tabpy_server", "static"), None),
295 1
            (ConfigParameters.TABPY_PWD_FILE, ConfigParameters.TABPY_PWD_FILE, None, None),
296 1
            (SettingsParameters.LogRequestContext, ConfigParameters.TABPY_LOG_DETAILS,
297
             "false", None),
298
            (SettingsParameters.MaxRequestSizeInMb, ConfigParameters.TABPY_MAX_REQUEST_SIZE_MB,
299 1
             100, None),
300
            (SettingsParameters.GzipEnabled, ConfigParameters.TABPY_GZIP_ENABLED,
301
             True, parser.getboolean),
302
        ]
303 1
304
        for setting, parameter, default_val, parse_function in settings_parameters:
305
            self._set_parameter(parser, setting, parameter, default_val, parse_function)
306
307 1
        if not os.path.exists(self.settings[SettingsParameters.UploadDir]):
308
            os.makedirs(self.settings[SettingsParameters.UploadDir])
309
310
        # set and validate transfer protocol
311
        self.settings[SettingsParameters.TransferProtocol] = self.settings[
312 1
            SettingsParameters.TransferProtocol
313
        ].lower()
314 1
315 1
        self._validate_transfer_protocol_settings()
316 1
317
        # if state.ini does not exist try and create it - remove
318
        # last dependence on batch/shell script
319 1
        self.settings[SettingsParameters.StateFilePath] = os.path.realpath(
320
            os.path.normpath(
321
                os.path.expanduser(self.settings[SettingsParameters.StateFilePath])
322
            )
323
        )
324
        state_config, self.tabpy_state = self._build_tabpy_state()
325 1
326 1
        self.python_service = PythonServiceHandler(PythonService())
327
        self.settings["compress_response"] = True
328
        self.settings[SettingsParameters.StaticPath] = os.path.abspath(
329 1
            self.settings[SettingsParameters.StaticPath]
330 1
        )
331 1
        logger.debug(
332
            f"Static pages folder set to "
333
            f'"{self.settings[SettingsParameters.StaticPath]}"'
334
        )
335 1
336 1
        # Set subdirectory from config if applicable
337
        if state_config.has_option("Service Info", "Subdirectory"):
338 1
            self.subdirectory = "/" + state_config.get("Service Info", "Subdirectory")
339
340
        # If passwords file specified load credentials
341
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
342 1
            if not self._parse_pwd_file():
343 1
                msg = (
344
                    "Failed to read passwords file "
345 1
                    f"{self.settings[ConfigParameters.TABPY_PWD_FILE]}"
346
                )
347
                logger.critical(msg)
348 1
                raise RuntimeError(msg)
349
        else:
350
            logger.info(
351
                "Password file is not specified: " "Authentication is not enabled"
352
            )
353 1
354
        features = self._get_features()
355 1
        self.settings[SettingsParameters.ApiVersions] = {"v1": {"features": features}}
356 1
357
        self.settings[SettingsParameters.LogRequestContext] = (
358
            self.settings[SettingsParameters.LogRequestContext].lower() != "false"
359
        )
360
        call_context_state = (
361 1
            "enabled"
362
            if self.settings[SettingsParameters.LogRequestContext]
363 1
            else "disabled"
364 1
        )
365
        logger.info(f"Call context logging is {call_context_state}")
366 1
367 1
    def _validate_transfer_protocol_settings(self):
368 1
        if SettingsParameters.TransferProtocol not in self.settings:
369 1
            msg = "Missing transfer protocol information."
370
            logger.critical(msg)
371 1
            raise RuntimeError(msg)
372
373
        protocol = self.settings[SettingsParameters.TransferProtocol]
374
375
        if protocol == "http":
376 1
            return
377
378 1
        if protocol != "https":
379
            msg = f"Unsupported transfer protocol: {protocol}"
380
            logger.critical(msg)
381
            raise RuntimeError(msg)
382
383 1
        self._validate_cert_key_state(
384
            "The parameter(s) {} must be set.",
385 1
            SettingsParameters.CertificateFile in self.settings,
386
            SettingsParameters.KeyFile in self.settings,
387 1
        )
388
        cert = self.settings[SettingsParameters.CertificateFile]
389
390
        self._validate_cert_key_state(
391 1
            "The parameter(s) {} must point to " "an existing file.",
392 1
            os.path.isfile(cert),
393 1
            os.path.isfile(self.settings[SettingsParameters.KeyFile]),
394 1
        )
395 1
        tabpy.tabpy_server.app.util.validate_cert(cert)
396 1
397 1
    @staticmethod
398 1
    def _validate_cert_key_state(msg, cert_valid, key_valid):
399
        cert_and_key_param = (
400 1
            f"{ConfigParameters.TABPY_CERTIFICATE_FILE} and "
401 1
            f"{ConfigParameters.TABPY_KEY_FILE}"
402 1
        )
403
        https_error = "Error using HTTPS: "
404 1
        err = None
405 1
        if not cert_valid and not key_valid:
406
            err = https_error + msg.format(cert_and_key_param)
407
        elif not cert_valid:
408
            err = https_error + msg.format(ConfigParameters.TABPY_CERTIFICATE_FILE)
409 1
        elif not key_valid:
410 1
            err = https_error + msg.format(ConfigParameters.TABPY_KEY_FILE)
411 1
412
        if err is not None:
413 1
            logger.critical(err)
414
            raise RuntimeError(err)
415 1
416 1
    def _parse_pwd_file(self):
417
        succeeded, self.credentials = parse_pwd_file(
418
            self.settings[ConfigParameters.TABPY_PWD_FILE]
419 1
        )
420 1
421
        if succeeded and len(self.credentials) == 0:
422
            logger.error("No credentials found")
423
            succeeded = False
424
425 1
        return succeeded
426 1
427 1
    def _get_features(self):
428
        features = {}
429 1
430 1
        # Check for auth
431 1
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
432 1
            features["authentication"] = {
433 1
                "required": True,
434
                "methods": {"basic-auth": {}},
435
            }
436
437
        features["evaluate_enabled"] = self.settings[SettingsParameters.EvaluateEnabled]
438
        features["gzip_enabled"] = self.settings[SettingsParameters.GzipEnabled]
439
        return features
440
441
    def _build_tabpy_state(self):
442
        pkg_path = os.path.dirname(tabpy.__file__)
443 1
        state_file_dir = self.settings[SettingsParameters.StateFilePath]
444 1
        state_file_path = os.path.join(state_file_dir, "state.ini")
445 1
        if not os.path.isfile(state_file_path):
446
            state_file_template_path = os.path.join(
447
                pkg_path, "tabpy_server", "state.ini.template"
448
            )
449
            logger.debug(
450
                f"File {state_file_path} not found, creating from "
451
                f"template {state_file_template_path}..."
452
            )
453
            shutil.copy(state_file_template_path, state_file_path)
454
455
        logger.info(f"Loading state from state file {state_file_path}")
456
        tabpy_state = _get_state_from_file(state_file_dir)
457
        return tabpy_state, TabPyState(config=tabpy_state, settings=self.settings)
458