Test Failed
Pull Request — master (#592)
by
unknown
06:58
created

tabpy.tabpy_server.app.app._init_asyncio_patch()   B

Complexity

Conditions 6

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 22.9396

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 17
ccs 2
cts 9
cp 0.2222
rs 8.6666
c 0
b 0
f 0
cc 6
nop 0
crap 22.9396
1 1
import concurrent.futures
2 1
import configparser
3 1
import logging
4 1
import multiprocessing
5 1
import os
6 1
import shutil
7 1
import signal
8 1
import sys
9 1
import tabpy
10 1
from tabpy.tabpy import __version__
11 1
from tabpy.tabpy_server.app.app_parameters import ConfigParameters, SettingsParameters
12 1
from tabpy.tabpy_server.app.util import parse_pwd_file
13 1
from tabpy.tabpy_server.management.state import TabPyState
14 1
from tabpy.tabpy_server.management.util import _get_state_from_file
15 1
from tabpy.tabpy_server.psws.callbacks import init_model_evaluator, init_ps_server
16 1
from tabpy.tabpy_server.psws.python_service import PythonService, PythonServiceHandler
17 1
from tabpy.tabpy_server.handlers import (
18
    EndpointHandler,
19
    EndpointsHandler,
20
    EvaluationPlaneHandler,
21
    EvaluationPlaneDisabledHandler,
22
    QueryPlaneHandler,
23
    ServiceInfoHandler,
24
    StatusHandler,
25
    UploadDestinationHandler,
26
)
27 1
import tornado
28
import tabpy.tabpy_server.app.arrow_server as pa
29
import _thread
30 1
31
logger = logging.getLogger(__name__)
32
33 1
def _init_asyncio_patch():
34
    """
35
    Select compatible event loop for Tornado 5+.
36
    As of Python 3.8, the default event loop on Windows is `proactor`,
37
    however Tornado requires the old default "selector" event loop.
38
    As Tornado has decided to leave this to users to set, MkDocs needs
39
    to set it. See https://github.com/tornadoweb/tornado/issues/2608.
40
    """
41 1
    if sys.platform.startswith("win") and sys.version_info >= (3, 8):
42
        import asyncio
43
        try:
44
            from asyncio import WindowsSelectorEventLoopPolicy
45
        except ImportError:
46
            pass  # Can't assign a policy which doesn't exist.
47
        else:
48
            if not isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy):
49
                asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
50
51
52 1
class TabPyApp:
53
    """
54
    TabPy application class for keeping context like settings, state, etc.
55
    """
56
57 1
    settings = {}
58 1
    subdirectory = ""
59 1
    tabpy_state = None
60 1
    python_service = None
61 1
    credentials = {}
62
63 1
    def __init__(self, config_file):
64 1
        if config_file is None:
65 1
            config_file = os.path.join(
66
                os.path.dirname(__file__), os.path.pardir, "common", "default.conf"
67
            )
68
69 1
        if os.path.isfile(config_file):
70 1
            try:
71 1
                from logging import config
72 1
                config.fileConfig(config_file, disable_existing_loggers=False)
73 1
            except KeyError:
74 1
                logging.basicConfig(level=logging.DEBUG)
75
76 1
        self._parse_config(config_file)
77
78 1
    def run(self):
79
        application = self._create_tornado_web_app()
80
        max_request_size = (
81
            int(self.settings[SettingsParameters.MaxRequestSizeInMb]) * 1024 * 1024
82
        )
83
        logger.info(f"Setting max request size to {max_request_size} bytes")
84
85
        init_model_evaluator(self.settings, self.tabpy_state, self.python_service)
86
87
        protocol = self.settings[SettingsParameters.TransferProtocol]
88
        ssl_options = None
89
        if protocol == "https":
90
            ssl_options = {
91
                "certfile": self.settings[SettingsParameters.CertificateFile],
92
                "keyfile": self.settings[SettingsParameters.KeyFile],
93
            }
94
        elif protocol != "http":
95
            msg = f"Unsupported transfer protocol {protocol}."
96
            logger.critical(msg)
97
            raise RuntimeError(msg)
98
99
        settings = {}
100
        if self.settings[SettingsParameters.GzipEnabled] is True:
101
            settings["decompress_request"] = True
102
        application.listen(
103
            self.settings[SettingsParameters.Port],
104
            ssl_options=ssl_options,
105
            max_buffer_size=max_request_size,
106
            max_body_size=max_request_size,
107
            **settings,
108
        )
109
110
        logger.info(
111
            "Web service listening on port "
112
            f"{str(self.settings[SettingsParameters.Port])}"
113
        )
114
115
        # Define a function for the thread
116 1
        def start_pyarrow():
117 1
            pa.start()
118 1
119
        try:
120 1
            _thread.start_new_thread(start_pyarrow, ())
121
        except Exception as e:
122
            print(e)
123
            print("Error: unable to start pyarrow server")
124 1
125
        tornado.ioloop.IOLoop.instance().start()
126
127
    def _create_tornado_web_app(self):
128
        class TabPyTornadoApp(tornado.web.Application):
129 1
            is_closing = False
130 1
131
            def signal_handler(self, signal, _):
132
                logger.critical(f"Exiting on signal {signal}...")
133 1
                self.is_closing = True
134
135 1
            def try_exit(self):
136
                if self.is_closing:
137
                    tornado.ioloop.IOLoop.instance().stop()
138
                    logger.info("Shutting down TabPy...")
139
140 1
        logger.info("Initializing TabPy...")
141 1
        tornado.ioloop.IOLoop.instance().run_sync(
142
            lambda: init_ps_server(self.settings, self.tabpy_state)
143
        )
144
        logger.info("Done initializing TabPy.")
145
146
        executor = concurrent.futures.ThreadPoolExecutor(
147
            max_workers=multiprocessing.cpu_count()
148
        )
149
150
        # initialize Tornado application
151
        _init_asyncio_patch()
152
        application = TabPyTornadoApp(
153
            [
154
                (
155
                    self.subdirectory + r"/query/([^/]+)",
156
                    QueryPlaneHandler,
157
                    dict(app=self),
158
                ),
159
                (self.subdirectory + r"/status", StatusHandler, dict(app=self)),
160
                (self.subdirectory + r"/info", ServiceInfoHandler, dict(app=self)),
161
                (self.subdirectory + r"/endpoints", EndpointsHandler, dict(app=self)),
162
                (
163
                    self.subdirectory + r"/endpoints/([^/]+)?",
164
                    EndpointHandler,
165
                    dict(app=self),
166
                ),
167
                (
168
                    self.subdirectory + r"/evaluate",
169
                    EvaluationPlaneHandler if self.settings[SettingsParameters.EvaluateEnabled]
170
                    else EvaluationPlaneDisabledHandler,
171
                    dict(executor=executor, app=self),
172
                ),
173
                (
174
                    self.subdirectory + r"/configurations/endpoint_upload_destination",
175
                    UploadDestinationHandler,
176
                    dict(app=self),
177
                ),
178
                (
179
                    self.subdirectory + r"/(.*)",
180 1
                    tornado.web.StaticFileHandler,
181 1
                    dict(
182
                        path=self.settings[SettingsParameters.StaticPath],
183 1
                        default_filename="index.html",
184 1
                    ),
185
                ),
186 1
            ],
187
            debug=False,
188 1
            **self.settings,
189 1
        )
190
191 1
        signal.signal(signal.SIGINT, application.signal_handler)
192
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
193
194
        signal.signal(signal.SIGINT, application.signal_handler)
195
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
196 1
197 1
        return application
198 1
199 1
    def _set_parameter(self, parser, settings_key, config_key, default_val, parse_function):
200 1
        key_is_set = False
201
202
        if (
203
            config_key is not None
204
            and parser.has_section("TabPy")
205
            and parser.has_option("TabPy", config_key)
206 1
        ):
207 1
            if parse_function is None:
208 1
                parse_function = parser.get
209 1
            self.settings[settings_key] = parse_function("TabPy", config_key)
210
            key_is_set = True
211
            logger.debug(
212
                f"Parameter {settings_key} set to "
213
                f'"{self.settings[settings_key]}" '
214
                "from config file or environment variable"
215 1
            )
216 1
217
        if not key_is_set and default_val is not None:
218 1
            self.settings[settings_key] = default_val
219
            key_is_set = True
220
            logger.debug(
221
                f"Parameter {settings_key} set to "
222
                f'"{self.settings[settings_key]}" '
223
                "from default value"
224
            )
225
226
        if not key_is_set:
227
            logger.debug(f"Parameter {settings_key} is not set")
228
229
    def _parse_config(self, config_file):
230
        """Provide consistent mechanism for pulling in configuration.
231
232
        Attempt to retain backward compatibility for
233
        existing implementations by grabbing port
234
        setting from CLI first.
235
236
        Take settings in the following order:
237
238
        1. CLI arguments if present
239 1
        2. config file
240 1
        3. OS environment variables (for ease of
241 1
           setting defaults if not present)
242 1
        4. current defaults if a setting is not present in any location
243 1
244
        Additionally provide similar configuration capabilities in between
245 1
        config file and environment variables.
246
        For consistency use the same variable name in the config file as
247 1
        in the os environment.
248 1
        For naming standards use all capitals and start with 'TABPY_'
249
        """
250 1
        self.settings = {}
251 1
        self.subdirectory = ""
252 1
        self.tabpy_state = None
253 1
        self.python_service = None
254 1
        self.credentials = {}
255 1
256
        pkg_path = os.path.dirname(tabpy.__file__)
257
258
        parser = configparser.ConfigParser(os.environ)
259 1
        logger.info(f"Parsing config file {config_file}")
260 1
261
        file_exists = False
262
        if os.path.isfile(config_file):
263
            try:
264
                with open(config_file, 'r') as f:
265 1
                    parser.read_string(f.read())
266
                    file_exists = True
267
            except Exception:
268
                pass
269
270
        if not file_exists:
271
            logger.warning(
272
                f"Unable to open config file {config_file}, "
273
                "using default settings."
274
            )
275
276
        settings_parameters = [
277
            (SettingsParameters.Port, ConfigParameters.TABPY_PORT, 9004, None),
278
            (SettingsParameters.ServerVersion, None, __version__, None),
279
            (SettingsParameters.EvaluateEnabled, ConfigParameters.TABPY_EVALUATE_ENABLE,
280
             True, parser.getboolean),
281
            (SettingsParameters.EvaluateTimeout, ConfigParameters.TABPY_EVALUATE_TIMEOUT,
282
             30, parser.getfloat),
283
            (SettingsParameters.UploadDir, ConfigParameters.TABPY_QUERY_OBJECT_PATH,
284
             os.path.join(pkg_path, "tmp", "query_objects"), None),
285
            (SettingsParameters.TransferProtocol, ConfigParameters.TABPY_TRANSFER_PROTOCOL,
286
             "http", None),
287
            (SettingsParameters.CertificateFile, ConfigParameters.TABPY_CERTIFICATE_FILE,
288
             None, None),
289
            (SettingsParameters.KeyFile, ConfigParameters.TABPY_KEY_FILE, None, None),
290
            (SettingsParameters.StateFilePath, ConfigParameters.TABPY_STATE_PATH,
291
             os.path.join(pkg_path, "tabpy_server"), None),
292 1
            (SettingsParameters.StaticPath, ConfigParameters.TABPY_STATIC_PATH,
293 1
             os.path.join(pkg_path, "tabpy_server", "static"), None),
294
            (ConfigParameters.TABPY_PWD_FILE, ConfigParameters.TABPY_PWD_FILE, None, None),
295 1
            (SettingsParameters.LogRequestContext, ConfigParameters.TABPY_LOG_DETAILS,
296 1
             "false", None),
297
            (SettingsParameters.MaxRequestSizeInMb, ConfigParameters.TABPY_MAX_REQUEST_SIZE_MB,
298
             100, None),
299 1
            (SettingsParameters.GzipEnabled, ConfigParameters.TABPY_GZIP_ENABLE,
300
             True, parser.getboolean),
301
        ]
302
303 1
        for setting, parameter, default_val, parse_function in settings_parameters:
304
            self._set_parameter(parser, setting, parameter, default_val, parse_function)
305
306
        if not os.path.exists(self.settings[SettingsParameters.UploadDir]):
307 1
            os.makedirs(self.settings[SettingsParameters.UploadDir])
308
309
        # set and validate transfer protocol
310
        self.settings[SettingsParameters.TransferProtocol] = self.settings[
311
            SettingsParameters.TransferProtocol
312 1
        ].lower()
313
314 1
        self._validate_transfer_protocol_settings()
315 1
316 1
        # if state.ini does not exist try and create it - remove
317
        # last dependence on batch/shell script
318
        self.settings[SettingsParameters.StateFilePath] = os.path.realpath(
319 1
            os.path.normpath(
320
                os.path.expanduser(self.settings[SettingsParameters.StateFilePath])
321
            )
322
        )
323
        state_config, self.tabpy_state = self._build_tabpy_state()
324
325 1
        self.python_service = PythonServiceHandler(PythonService())
326 1
        self.settings["compress_response"] = True
327
        self.settings[SettingsParameters.StaticPath] = os.path.abspath(
328
            self.settings[SettingsParameters.StaticPath]
329 1
        )
330 1
        logger.debug(
331 1
            f"Static pages folder set to "
332
            f'"{self.settings[SettingsParameters.StaticPath]}"'
333
        )
334
335 1
        # Set subdirectory from config if applicable
336 1
        if state_config.has_option("Service Info", "Subdirectory"):
337
            self.subdirectory = "/" + state_config.get("Service Info", "Subdirectory")
338 1
339
        # If passwords file specified load credentials
340
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
341
            if not self._parse_pwd_file():
342 1
                msg = (
343 1
                    "Failed to read passwords file "
344
                    f"{self.settings[ConfigParameters.TABPY_PWD_FILE]}"
345 1
                )
346
                logger.critical(msg)
347
                raise RuntimeError(msg)
348 1
        else:
349
            logger.info(
350
                "Password file is not specified: " "Authentication is not enabled"
351
            )
352
353 1
        features = self._get_features()
354
        self.settings[SettingsParameters.ApiVersions] = {"v1": {"features": features}}
355 1
356 1
        self.settings[SettingsParameters.LogRequestContext] = (
357
            self.settings[SettingsParameters.LogRequestContext].lower() != "false"
358
        )
359
        call_context_state = (
360
            "enabled"
361 1
            if self.settings[SettingsParameters.LogRequestContext]
362
            else "disabled"
363 1
        )
364 1
        logger.info(f"Call context logging is {call_context_state}")
365
366 1
    def _validate_transfer_protocol_settings(self):
367 1
        if SettingsParameters.TransferProtocol not in self.settings:
368 1
            msg = "Missing transfer protocol information."
369 1
            logger.critical(msg)
370
            raise RuntimeError(msg)
371 1
372
        protocol = self.settings[SettingsParameters.TransferProtocol]
373
374
        if protocol == "http":
375
            return
376 1
377
        if protocol != "https":
378 1
            msg = f"Unsupported transfer protocol: {protocol}"
379
            logger.critical(msg)
380
            raise RuntimeError(msg)
381
382
        self._validate_cert_key_state(
383 1
            "The parameter(s) {} must be set.",
384
            SettingsParameters.CertificateFile in self.settings,
385 1
            SettingsParameters.KeyFile in self.settings,
386
        )
387 1
        cert = self.settings[SettingsParameters.CertificateFile]
388
389
        self._validate_cert_key_state(
390
            "The parameter(s) {} must point to " "an existing file.",
391 1
            os.path.isfile(cert),
392 1
            os.path.isfile(self.settings[SettingsParameters.KeyFile]),
393 1
        )
394 1
        tabpy.tabpy_server.app.util.validate_cert(cert)
395 1
396 1
    @staticmethod
397 1
    def _validate_cert_key_state(msg, cert_valid, key_valid):
398 1
        cert_and_key_param = (
399
            f"{ConfigParameters.TABPY_CERTIFICATE_FILE} and "
400 1
            f"{ConfigParameters.TABPY_KEY_FILE}"
401 1
        )
402 1
        https_error = "Error using HTTPS: "
403
        err = None
404 1
        if not cert_valid and not key_valid:
405 1
            err = https_error + msg.format(cert_and_key_param)
406
        elif not cert_valid:
407
            err = https_error + msg.format(ConfigParameters.TABPY_CERTIFICATE_FILE)
408
        elif not key_valid:
409 1
            err = https_error + msg.format(ConfigParameters.TABPY_KEY_FILE)
410 1
411 1
        if err is not None:
412
            logger.critical(err)
413 1
            raise RuntimeError(err)
414
415 1
    def _parse_pwd_file(self):
416 1
        succeeded, self.credentials = parse_pwd_file(
417
            self.settings[ConfigParameters.TABPY_PWD_FILE]
418
        )
419 1
420 1
        if succeeded and len(self.credentials) == 0:
421
            logger.error("No credentials found")
422
            succeeded = False
423
424
        return succeeded
425 1
426 1
    def _get_features(self):
427 1
        features = {}
428
429 1
        # Check for auth
430 1
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
431 1
            features["authentication"] = {
432 1
                "required": True,
433 1
                "methods": {"basic-auth": {}},
434
            }
435
436
        features["evaluate_enabled"] = self.settings[SettingsParameters.EvaluateEnabled]
437
        features["gzip_enabled"] = self.settings[SettingsParameters.GzipEnabled]
438
        return features
439
440
    def _build_tabpy_state(self):
441
        pkg_path = os.path.dirname(tabpy.__file__)
442
        state_file_dir = self.settings[SettingsParameters.StateFilePath]
443 1
        state_file_path = os.path.join(state_file_dir, "state.ini")
444 1
        if not os.path.isfile(state_file_path):
445 1
            state_file_template_path = os.path.join(
446
                pkg_path, "tabpy_server", "state.ini.template"
447
            )
448
            logger.debug(
449
                f"File {state_file_path} not found, creating from "
450
                f"template {state_file_template_path}..."
451
            )
452
            shutil.copy(state_file_template_path, state_file_path)
453
454
        logger.info(f"Loading state from state file {state_file_path}")
455
        tabpy_state = _get_state_from_file(state_file_dir)
456
        return tabpy_state, TabPyState(config=tabpy_state, settings=self.settings)
457