Passed
Pull Request — master (#595)
by
unknown
13:09
created

tabpy.tabpy_server.app.app.TabPyApp.run()   B

Complexity

Conditions 5

Size

Total Lines 49
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 27.2222

Importance

Changes 0
Metric Value
eloc 35
dl 0
loc 49
ccs 1
cts 26
cp 0.0385
rs 8.5733
c 0
b 0
f 0
cc 5
nop 1
crap 27.2222
1 1
import concurrent.futures
2 1
import configparser
3 1
import logging
4 1
import multiprocessing
5 1
import os
6 1
import shutil
7 1
import signal
8 1
import sys
9 1
import tabpy
10 1
from tabpy.tabpy import __version__
11 1
from tabpy.tabpy_server.app.app_parameters import ConfigParameters, SettingsParameters
12 1
from tabpy.tabpy_server.app.util import parse_pwd_file
13 1
from tabpy.tabpy_server.management.state import TabPyState
14 1
from tabpy.tabpy_server.management.util import _get_state_from_file
15 1
from tabpy.tabpy_server.psws.callbacks import init_model_evaluator, init_ps_server
16 1
from tabpy.tabpy_server.psws.python_service import PythonService, PythonServiceHandler
17 1
from tabpy.tabpy_server.handlers import (
18
    EndpointHandler,
19
    EndpointsHandler,
20
    EvaluationPlaneHandler,
21
    EvaluationPlaneDisabledHandler,
22
    QueryPlaneHandler,
23
    ServiceInfoHandler,
24
    StatusHandler,
25
    UploadDestinationHandler,
26
)
27 1
import tornado
28 1
import tabpy.tabpy_server.app.arrow_server as pa
29 1
import _thread
30
31 1
logger = logging.getLogger(__name__)
32
33 1
def _init_asyncio_patch():
34
    """
35
    Select compatible event loop for Tornado 5+.
36
    As of Python 3.8, the default event loop on Windows is `proactor`,
37
    however Tornado requires the old default "selector" event loop.
38
    As Tornado has decided to leave this to users to set, MkDocs needs
39
    to set it. See https://github.com/tornadoweb/tornado/issues/2608.
40
    """
41 1
    if sys.platform.startswith("win") and sys.version_info >= (3, 8):
42
        import asyncio
43
        try:
44
            from asyncio import WindowsSelectorEventLoopPolicy
45
        except ImportError:
46
            pass  # Can't assign a policy which doesn't exist.
47
        else:
48
            if not isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy):
49
                asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
50
51
52 1
class TabPyApp:
53
    """
54
    TabPy application class for keeping context like settings, state, etc.
55
    """
56
57 1
    settings = {}
58 1
    subdirectory = ""
59 1
    tabpy_state = None
60 1
    python_service = None
61 1
    credentials = {}
62
63 1
    def __init__(self, config_file):
64 1
        if config_file is None:
65 1
            config_file = os.path.join(
66
                os.path.dirname(__file__), os.path.pardir, "common", "default.conf"
67
            )
68
69 1
        if os.path.isfile(config_file):
70 1
            try:
71 1
                from logging import config
72 1
                config.fileConfig(config_file, disable_existing_loggers=False)
73 1
            except KeyError:
74 1
                logging.basicConfig(level=logging.DEBUG)
75
76 1
        self._parse_config(config_file)
77
78 1
    def run(self):
79
        application = self._create_tornado_web_app()
80
        max_request_size = (
81
            int(self.settings[SettingsParameters.MaxRequestSizeInMb]) * 1024 * 1024
82
        )
83
        logger.info(f"Setting max request size to {max_request_size} bytes")
84
85
        init_model_evaluator(self.settings, self.tabpy_state, self.python_service)
86
87
        protocol = self.settings[SettingsParameters.TransferProtocol]
88
        ssl_options = None
89
        if protocol == "https":
90
            ssl_options = {
91
                "certfile": self.settings[SettingsParameters.CertificateFile],
92
                "keyfile": self.settings[SettingsParameters.KeyFile],
93
            }
94
        elif protocol != "http":
95
            msg = f"Unsupported transfer protocol {protocol}."
96
            logger.critical(msg)
97
            raise RuntimeError(msg)
98
99
        settings = {}
100
        if self.settings[SettingsParameters.GzipEnabled] is True:
101
            settings["decompress_request"] = True
102
103
        application.listen(
104
            self.settings[SettingsParameters.Port],
105
            ssl_options=ssl_options,
106
            max_buffer_size=max_request_size,
107
            max_body_size=max_request_size,
108
            **settings,
109
        ) 
110
111
        logger.info(
112
            "Web service listening on port "
113
            f"{str(self.settings[SettingsParameters.Port])}"
114
        )
115
116
        # Define a function for the thread
117
        def start_pyarrow():
118
            pa.start()
119
120
        try:
121
            _thread.start_new_thread(start_pyarrow, ())
122
        except Exception as e:
123
            print(e)
124
            print("Error: unable to start pyarrow server")
125
126
        tornado.ioloop.IOLoop.instance().start()
127
128 1
    def _create_tornado_web_app(self):
129 1
        class TabPyTornadoApp(tornado.web.Application):
130 1
            is_closing = False
131
132 1
            def signal_handler(self, signal, _):
133
                logger.critical(f"Exiting on signal {signal}...")
134
                self.is_closing = True
135
136 1
            def try_exit(self):
137
                if self.is_closing:
138
                    tornado.ioloop.IOLoop.instance().stop()
139
                    logger.info("Shutting down TabPy...")
140
141 1
        logger.info("Initializing TabPy...")
142 1
        tornado.ioloop.IOLoop.instance().run_sync(
143
            lambda: init_ps_server(self.settings, self.tabpy_state)
144
        )
145 1
        logger.info("Done initializing TabPy.")
146
147 1
        executor = concurrent.futures.ThreadPoolExecutor(
148
            max_workers=multiprocessing.cpu_count()
149
        )
150
151
        # initialize Tornado application
152 1
        _init_asyncio_patch()
153 1
        application = TabPyTornadoApp(
154
            [
155
                (
156
                    self.subdirectory + r"/query/([^/]+)",
157
                    QueryPlaneHandler,
158
                    dict(app=self),
159
                ),
160
                (self.subdirectory + r"/status", StatusHandler, dict(app=self)),
161
                (self.subdirectory + r"/info", ServiceInfoHandler, dict(app=self)),
162
                (self.subdirectory + r"/endpoints", EndpointsHandler, dict(app=self)),
163
                (
164
                    self.subdirectory + r"/endpoints/([^/]+)?",
165
                    EndpointHandler,
166
                    dict(app=self),
167
                ),
168
                (
169
                    self.subdirectory + r"/evaluate",
170
                    EvaluationPlaneHandler if self.settings[SettingsParameters.EvaluateEnabled]
171
                    else EvaluationPlaneDisabledHandler,
172
                    dict(executor=executor, app=self),
173
                ),
174
                (
175
                    self.subdirectory + r"/configurations/endpoint_upload_destination",
176
                    UploadDestinationHandler,
177
                    dict(app=self),
178
                ),
179
                (
180
                    self.subdirectory + r"/(.*)",
181
                    tornado.web.StaticFileHandler,
182
                    dict(
183
                        path=self.settings[SettingsParameters.StaticPath],
184
                        default_filename="index.html",
185
                    ),
186
                ),
187
            ],
188
            debug=False,
189
            **self.settings,
190
        )
191
192 1
        signal.signal(signal.SIGINT, application.signal_handler)
193 1
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
194
195 1
        signal.signal(signal.SIGINT, application.signal_handler)
196 1
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
197
198 1
        return application
199
200 1
    def _set_parameter(self, parser, settings_key, config_key, default_val, parse_function):
201 1
        key_is_set = False
202
203 1
        if (
204
            config_key is not None
205
            and parser.has_section("TabPy")
206
            and parser.has_option("TabPy", config_key)
207
        ):
208 1
            if parse_function is None:
209 1
                parse_function = parser.get
210 1
            self.settings[settings_key] = parse_function("TabPy", config_key)
211 1
            key_is_set = True
212 1
            logger.debug(
213
                f"Parameter {settings_key} set to "
214
                f'"{self.settings[settings_key]}" '
215
                "from config file or environment variable"
216
            )
217
218 1
        if not key_is_set and default_val is not None:
219 1
            self.settings[settings_key] = default_val
220 1
            key_is_set = True
221 1
            logger.debug(
222
                f"Parameter {settings_key} set to "
223
                f'"{self.settings[settings_key]}" '
224
                "from default value"
225
            )
226
227 1
        if not key_is_set:
228 1
            logger.debug(f"Parameter {settings_key} is not set")
229
230 1
    def _parse_config(self, config_file):
231
        """Provide consistent mechanism for pulling in configuration.
232
233
        Attempt to retain backward compatibility for
234
        existing implementations by grabbing port
235
        setting from CLI first.
236
237
        Take settings in the following order:
238
239
        1. CLI arguments if present
240
        2. config file
241
        3. OS environment variables (for ease of
242
           setting defaults if not present)
243
        4. current defaults if a setting is not present in any location
244
245
        Additionally provide similar configuration capabilities in between
246
        config file and environment variables.
247
        For consistency use the same variable name in the config file as
248
        in the os environment.
249
        For naming standards use all capitals and start with 'TABPY_'
250
        """
251 1
        self.settings = {}
252 1
        self.subdirectory = ""
253 1
        self.tabpy_state = None
254 1
        self.python_service = None
255 1
        self.credentials = {}
256
257 1
        pkg_path = os.path.dirname(tabpy.__file__)
258
259 1
        parser = configparser.ConfigParser(os.environ)
260 1
        logger.info(f"Parsing config file {config_file}")
261
262 1
        file_exists = False
263 1
        if os.path.isfile(config_file):
264 1
            try:
265 1
                with open(config_file, 'r') as f:
266 1
                    parser.read_string(f.read())
267 1
                    file_exists = True
268
            except Exception:
269
                pass
270
271 1
        if not file_exists:
272 1
            logger.warning(
273
                f"Unable to open config file {config_file}, "
274
                "using default settings."
275
            )
276
277 1
        settings_parameters = [
278
            (SettingsParameters.Port, ConfigParameters.TABPY_PORT, 9004, None),
279
            (SettingsParameters.ServerVersion, None, __version__, None),
280
            (SettingsParameters.EvaluateEnabled, ConfigParameters.TABPY_EVALUATE_ENABLE,
281
             True, parser.getboolean),
282
            (SettingsParameters.EvaluateTimeout, ConfigParameters.TABPY_EVALUATE_TIMEOUT,
283
             30, parser.getfloat),
284
            (SettingsParameters.UploadDir, ConfigParameters.TABPY_QUERY_OBJECT_PATH,
285
             os.path.join(pkg_path, "tmp", "query_objects"), None),
286
            (SettingsParameters.TransferProtocol, ConfigParameters.TABPY_TRANSFER_PROTOCOL,
287
             "http", None),
288
            (SettingsParameters.CertificateFile, ConfigParameters.TABPY_CERTIFICATE_FILE,
289
             None, None),
290
            (SettingsParameters.KeyFile, ConfigParameters.TABPY_KEY_FILE, None, None),
291
            (SettingsParameters.StateFilePath, ConfigParameters.TABPY_STATE_PATH,
292
             os.path.join(pkg_path, "tabpy_server"), None),
293
            (SettingsParameters.StaticPath, ConfigParameters.TABPY_STATIC_PATH,
294
             os.path.join(pkg_path, "tabpy_server", "static"), None),
295
            (ConfigParameters.TABPY_PWD_FILE, ConfigParameters.TABPY_PWD_FILE, None, None),
296
            (SettingsParameters.LogRequestContext, ConfigParameters.TABPY_LOG_DETAILS,
297
             "false", None),
298
            (SettingsParameters.MaxRequestSizeInMb, ConfigParameters.TABPY_MAX_REQUEST_SIZE_MB,
299
             100, None),
300
            (SettingsParameters.GzipEnabled, ConfigParameters.TABPY_GZIP_ENABLE,
301
             True, parser.getboolean),
302
        ]
303
304 1
        for setting, parameter, default_val, parse_function in settings_parameters:
305 1
            self._set_parameter(parser, setting, parameter, default_val, parse_function)
306
307 1
        if not os.path.exists(self.settings[SettingsParameters.UploadDir]):
308 1
            os.makedirs(self.settings[SettingsParameters.UploadDir])
309
310
        # set and validate transfer protocol
311 1
        self.settings[SettingsParameters.TransferProtocol] = self.settings[
312
            SettingsParameters.TransferProtocol
313
        ].lower()
314
315 1
        self._validate_transfer_protocol_settings()
316
317
        # if state.ini does not exist try and create it - remove
318
        # last dependence on batch/shell script
319 1
        self.settings[SettingsParameters.StateFilePath] = os.path.realpath(
320
            os.path.normpath(
321
                os.path.expanduser(self.settings[SettingsParameters.StateFilePath])
322
            )
323
        )
324 1
        state_config, self.tabpy_state = self._build_tabpy_state()
325
326 1
        self.python_service = PythonServiceHandler(PythonService())
327 1
        self.settings["compress_response"] = True
328 1
        self.settings[SettingsParameters.StaticPath] = os.path.abspath(
329
            self.settings[SettingsParameters.StaticPath]
330
        )
331 1
        logger.debug(
332
            f"Static pages folder set to "
333
            f'"{self.settings[SettingsParameters.StaticPath]}"'
334
        )
335
336
        # Set subdirectory from config if applicable
337 1
        if state_config.has_option("Service Info", "Subdirectory"):
338 1
            self.subdirectory = "/" + state_config.get("Service Info", "Subdirectory")
339
340
        # If passwords file specified load credentials
341 1
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
342 1
            if not self._parse_pwd_file():
343 1
                msg = (
344
                    "Failed to read passwords file "
345
                    f"{self.settings[ConfigParameters.TABPY_PWD_FILE]}"
346
                )
347 1
                logger.critical(msg)
348 1
                raise RuntimeError(msg)
349
        else:
350 1
            logger.info(
351
                "Password file is not specified: " "Authentication is not enabled"
352
            )
353
354 1
        features = self._get_features()
355 1
        self.settings[SettingsParameters.ApiVersions] = {"v1": {"features": features}}
356
357 1
        self.settings[SettingsParameters.LogRequestContext] = (
358
            self.settings[SettingsParameters.LogRequestContext].lower() != "false"
359
        )
360 1
        call_context_state = (
361
            "enabled"
362
            if self.settings[SettingsParameters.LogRequestContext]
363
            else "disabled"
364
        )
365 1
        logger.info(f"Call context logging is {call_context_state}")
366
367 1
    def _validate_transfer_protocol_settings(self):
368 1
        if SettingsParameters.TransferProtocol not in self.settings:
369
            msg = "Missing transfer protocol information."
370
            logger.critical(msg)
371
            raise RuntimeError(msg)
372
373 1
        protocol = self.settings[SettingsParameters.TransferProtocol]
374
375 1
        if protocol == "http":
376 1
            return
377
378 1
        if protocol != "https":
379 1
            msg = f"Unsupported transfer protocol: {protocol}"
380 1
            logger.critical(msg)
381 1
            raise RuntimeError(msg)
382
383 1
        self._validate_cert_key_state(
384
            "The parameter(s) {} must be set.",
385
            SettingsParameters.CertificateFile in self.settings,
386
            SettingsParameters.KeyFile in self.settings,
387
        )
388 1
        cert = self.settings[SettingsParameters.CertificateFile]
389
390 1
        self._validate_cert_key_state(
391
            "The parameter(s) {} must point to " "an existing file.",
392
            os.path.isfile(cert),
393
            os.path.isfile(self.settings[SettingsParameters.KeyFile]),
394
        )
395 1
        tabpy.tabpy_server.app.util.validate_cert(cert)
396
397 1
    @staticmethod
398
    def _validate_cert_key_state(msg, cert_valid, key_valid):
399 1
        cert_and_key_param = (
400
            f"{ConfigParameters.TABPY_CERTIFICATE_FILE} and "
401
            f"{ConfigParameters.TABPY_KEY_FILE}"
402
        )
403 1
        https_error = "Error using HTTPS: "
404 1
        err = None
405 1
        if not cert_valid and not key_valid:
406 1
            err = https_error + msg.format(cert_and_key_param)
407 1
        elif not cert_valid:
408 1
            err = https_error + msg.format(ConfigParameters.TABPY_CERTIFICATE_FILE)
409 1
        elif not key_valid:
410 1
            err = https_error + msg.format(ConfigParameters.TABPY_KEY_FILE)
411
412 1
        if err is not None:
413 1
            logger.critical(err)
414 1
            raise RuntimeError(err)
415
416 1
    def _parse_pwd_file(self):
417 1
        succeeded, self.credentials = parse_pwd_file(
418
            self.settings[ConfigParameters.TABPY_PWD_FILE]
419
        )
420
421 1
        if succeeded and len(self.credentials) == 0:
422 1
            logger.error("No credentials found")
423 1
            succeeded = False
424
425 1
        return succeeded
426
427 1
    def _get_features(self):
428 1
        features = {}
429
430
        # Check for auth
431 1
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
432 1
            features["authentication"] = {
433
                "required": True,
434
                "methods": {"basic-auth": {}},
435
            }
436
437 1
        features["evaluate_enabled"] = self.settings[SettingsParameters.EvaluateEnabled]
438 1
        features["gzip_enabled"] = self.settings[SettingsParameters.GzipEnabled]
439 1
        return features
440
441 1
    def _build_tabpy_state(self):
442 1
        pkg_path = os.path.dirname(tabpy.__file__)
443 1
        state_file_dir = self.settings[SettingsParameters.StateFilePath]
444 1
        state_file_path = os.path.join(state_file_dir, "state.ini")
445 1
        if not os.path.isfile(state_file_path):
446
            state_file_template_path = os.path.join(
447
                pkg_path, "tabpy_server", "state.ini.template"
448
            )
449
            logger.debug(
450
                f"File {state_file_path} not found, creating from "
451
                f"template {state_file_template_path}..."
452
            )
453
            shutil.copy(state_file_template_path, state_file_path)
454
455 1
        logger.info(f"Loading state from state file {state_file_path}")
456 1
        tabpy_state = _get_state_from_file(state_file_dir)
457
        return tabpy_state, TabPyState(config=tabpy_state, settings=self.settings)
458