Test Failed
Pull Request — master (#375)
by Vinicius
05:01
created

kytos.core.api_server.APIServer._enable_napp()   A

Complexity

Conditions 4

Size

Total Lines 22
Code Lines 13

Duplication

Lines 22
Ratio 100 %

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 2
dl 22
loc 22
rs 9.75
c 0
b 0
f 0
1
"""Module used to handle a API Server."""
2
import logging
3
import os
4
import shutil
5
import sys
6
import warnings
7
import zipfile
8
from datetime import datetime
9
from glob import glob
10
from http import HTTPStatus
11
from typing import Optional, Union
12
from urllib.error import HTTPError, URLError
13
from urllib.request import urlretrieve
14
15
import httpx
16
from starlette.applications import Starlette
17
from starlette.exceptions import HTTPException
18
from starlette.middleware import Middleware
19
from starlette.middleware.cors import CORSMiddleware
20
from starlette.responses import FileResponse
21
from starlette.staticfiles import StaticFiles
22
from uvicorn import Config as UvicornConfig
23
from uvicorn import Server
24
25
from kytos.core.auth import authenticated
26
from kytos.core.config import KytosConfig
27
from kytos.core.rest_api import JSONResponse, Request
28
29
LOG = logging.getLogger(__name__)
30
31
32
class APIServer:
33
    """Api server used to provide Kytos Controller routes."""
34
35
    #: tuple: Default Flask HTTP methods.
36
    DEFAULT_METHODS = ('GET',)
37
    _NAPP_PREFIX = "/api/{napp.username}/{napp.name}/"
38
    _CORE_PREFIX = "/api/kytos/core/"
39
40
    # pylint: disable=too-many-arguments
41
    def __init__(self, listen='0.0.0.0', port=8181,
42
                 napps_manager=None, napps_dir=None):
43
        """Start a Flask+SocketIO server.
44
45
        Require controller to get NApps dir and NAppsManager
46
47
        Args:
48
            listen (string): host name used by api server instance
49
            port (int): Port number used by api server instance
50
            controller(kytos.core.controller): A controller instance.
51
        """
52
        dirname = os.path.dirname(os.path.abspath(__file__))
53
        self.napps_manager = napps_manager
54
        self.napps_dir = napps_dir
55
        self.listen = listen
56
        self.port = port
57
        self.web_ui_dir = os.path.join(dirname, '../web-ui')
58
        self.app = Starlette(
59
            exception_handlers={HTTPException: self._http_exc_handler},
60
            middleware=[
61
                Middleware(CORSMiddleware, allow_origins=["*"]),
62
            ],
63
        )
64
        self.server = Server(
65
            UvicornConfig(
66
                self.app,
67
                host=self.listen,
68
                port=self.port,
69
            )
70
        )
71
72
        # Update web-ui if necessary
73
        self.update_web_ui(None, force=False)
74
75
    async def _http_exc_handler(self, _request: Request, exc: HTTPException):
76
        """HTTPException handler.
77
78
        The response format is still partly compatible with how werkzeug used
79
        to format http exceptions.
80
        """
81
        return JSONResponse({"description": exc.detail,
82
                             "code": exc.status_code},
83
                            status_code=exc.status_code)
84
85
    def run(self):
86
        """Run API Server."""
87
        try:
88
            self._run_uvicorn()
89
        except (RuntimeError, ValueError, OSError) as exception:
90
            msg = f"Couldn't start API Server: {exception}"
91
            LOG.critical(msg)
92
            sys.exit(msg)
93
94
    def _run_uvicorn(self) -> None:
95
        self.server.run()
96
97
    def _stop_uvicorn(self) -> None:
98
        self.server.should_exit = True
99
100
    def register_rest_endpoint(self, url, function, methods):
101
        """Deprecate in favor of @rest decorator."""
102
        warnings.warn("From now on, use @rest decorator.", DeprecationWarning,
103
                      stacklevel=2)
104
        if url.startswith('/'):
105
            url = url[1:]
106
        self._start_endpoint(self.app, f'/kytos/{url}', function,
107
                             methods=methods)
108
109
    def start_api(self):
110
        """Start this APIServer instance API."""
111
        self.register_core_endpoint('status/', self.status_api)
112
        self.register_core_endpoint('web/update/{version}/',
113
                                    self.update_web_ui,
114
                                    methods=['POST'])
115
        self.register_core_endpoint('web/update/',
116
                                    self.update_web_ui,
117
                                    methods=['POST'])
118
119
        self.register_core_napp_services()
120
121
    def start_web_ui(self) -> None:
122
        """Start Web UI endpoints."""
123
        self._start_endpoint(self.app,
124
                             "/ui/{username}/{napp_name}/{filename:path}",
125
                             self.static_web_ui)
126
        self._start_endpoint(self.app,
127
                             "/ui/{section_name}/",
128
                             self.get_ui_components)
129
        self._start_endpoint(self.app, '/', self.web_ui)
130
        self._start_endpoint(self.app, '/index.html', self.web_ui)
131
        self.app.router.mount("/", app=StaticFiles(directory=self.web_ui_dir),
132
                              name="dist")
133
134
    def register_core_endpoint(self, route, function, **options):
135
        """Register an endpoint with the URL /api/kytos/core/<route>.
136
137
        Not used by NApps, but controller.
138
        """
139
        self._start_endpoint(self.app, f"{self._CORE_PREFIX}{route}",
140
                             function, **options)
141
142
    def status_api(self, _request: Request):
143
        """Display kytos status using the route ``/kytos/status/``."""
144
        return JSONResponse({"response": "running"})
145
146
    def stop_api_server(self):
147
        """Stop API server."""
148
        self._stop_uvicorn()
149
150
    def static_web_ui(self,
151
                      request: Request) -> Union[FileResponse, JSONResponse]:
152
        """Serve static files from installed napps."""
153
        username = request.path_params["username"]
154
        napp_name = request.path_params["napp_name"]
155
        filename = request.path_params["filename"]
156
        path = f"{self.napps_dir}/{username}/{napp_name}/ui/{filename}"
157
        if os.path.exists(path):
158
            return FileResponse(path)
159
        return JSONResponse("", status_code=HTTPStatus.NOT_FOUND.value)
160
161
    def get_ui_components(self, request: Request) -> JSONResponse:
162
        """Return all napps ui components from an specific section.
163
164
        The component name generated will have the following structure:
165
        {username}-{nappname}-{component-section}-{filename}`
166
167
        Args:
168
            section_name (str): Specific section name
169
170
        Returns:
171
            str: Json with a list of all components found.
172
173
        """
174
        section_name = request.path_params["section_name"]
175
        section_name = '*' if section_name == "all" else section_name
176
        path = f"{self.napps_dir}/*/*/ui/{section_name}/*.kytos"
177
        components = []
178
        for name in glob(path):
179
            dirs_name = name.split('/')
180
            dirs_name.remove('ui')
181
182
            component_name = '-'.join(dirs_name[-4:]).replace('.kytos', '')
183
            url = f'ui/{"/".join(dirs_name[-4:])}'
184
            component = {'name': component_name, 'url': url}
185
            components.append(component)
186
        return JSONResponse(components)
187
188
    def web_ui(self, _request: Request) -> Union[JSONResponse, FileResponse]:
189
        """Serve the index.html page for the admin-ui."""
190
        index_path = f"{self.web_ui_dir}/index.html"
191
        if os.path.exists(index_path):
192
            return FileResponse(index_path)
193
        return JSONResponse(f"File '{index_path}' not found.",
194
                            status_code=HTTPStatus.NOT_FOUND.value)
195
196
    def _unzip_backup_web_ui(self, package: str, uri: str) -> None:
197
        """Unzip and backup web ui files.
198
199
        backup the old web-ui files and create a new web-ui folder
200
        if there is no path to backup, zip.extractall will
201
        create the path.
202
        """
203
        with zipfile.ZipFile(package, 'r') as zip_ref:
204
            if zip_ref.testzip() is not None:
205
                LOG.error("Web update - Zip file from %s "
206
                          "is corrupted.", uri)
207
                raise ValueError(f'Zip file from {uri} is corrupted.')
208
            if os.path.exists(self.web_ui_dir):
209
                LOG.info("Web update - Performing UI backup.")
210
                date = datetime.now().strftime("%Y%m%d%H%M%S")
211
                shutil.move(self.web_ui_dir, f"{self.web_ui_dir}-{date}")
212
                os.mkdir(self.web_ui_dir)
213
            # unzip and extract files to web-ui/*
214
            zip_ref.extractall(self.web_ui_dir)
215
            zip_ref.close()
216
217
    def _fetch_latest_ui_tag(self) -> str:
218
        """Fetch latest ui tag version from GitHub."""
219
        version = '2022.3.0'
220
        try:
221
            url = ('https://api.github.com/repos/kytos-ng/'
222
                   'ui/releases/latest')
223
            response = httpx.get(url, timeout=10)
224
            version = response.json()['tag_name']
225
        except (httpx.RequestError, KeyError):
226
            msg = "Failed to fetch latest tag from GitHub, " \
227
                  f"falling back to {version}"
228
            LOG.warning(f"Web update - {msg}")
229
        return version
230
231
    def update_web_ui(self, request: Optional[Request],
232
                      version='latest',
233
                      force=True) -> JSONResponse:
234
        """Update the static files for the Web UI.
235
236
        Download the latest files from the UI github repository and update them
237
        in the ui folder.
238
        The repository link is currently hardcoded here.
239
        """
240
        if not os.path.exists(self.web_ui_dir) or force:
241
            if request:
242
                version = request.path_params.get("version", "latest")
243
            if version == 'latest':
244
                version = self._fetch_latest_ui_tag()
245
246
            repository = "https://github.com/kytos-ng/ui"
247
            uri = repository + f"/releases/download/{version}/latest.zip"
248
            try:
249
                LOG.info("Web update - Downloading UI from %s.", uri)
250
                package = urlretrieve(uri)[0]
251
                self._unzip_backup_web_ui(package, uri)
252
            except HTTPError:
253
                LOG.error("Web update - Uri not found %s.", uri)
254
                return JSONResponse(
255
                        f"Uri not found {uri}.",
256
                        status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value
257
                        )
258
            except URLError:
259
                LOG.error("Web update - Error accessing URL %s.", uri)
260
                return JSONResponse(
261
                        f"Error accessing URL {uri}.",
262
                        status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value
263
                       )
264
            except ValueError as exc:
265
                return JSONResponse(
266
                        str(exc),
267
                        status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value
268
                       )
269
            LOG.info("Web update - Updated")
270
            return JSONResponse("Web UI was updated")
271
272
        LOG.error("Web update - Web UI was not updated")
273
        return JSONResponse("Web ui was not updated",
274
                            status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value)
275
276
    # BEGIN decorator methods
277
    @staticmethod
278
    def decorate_as_endpoint(rule, **options):
279
        """Decorate methods as REST endpoints.
280
281
        Example for URL ``/api/myusername/mynapp/sayhello/World``:
282
283
        .. code-block:: python3
284
285
286
           from kytos.core.napps import rest
287
           from kytos.core.rest_api import JSONResponse, Request
288
289
           @rest("sayhello/{name}")
290
           def say_hello(request: Request) -> JSONResponse:
291
               name = request.path_params["name"]
292
               return JSONResponse({"data": f"Hello, {name}!"})
293
294
        ``@rest`` parameters are the same as Starlette's ``add_route``. You can
295
        also add ``methods=['POST']``, for example.
296
297
        As we don't have the NApp instance now, we store the parameters in a
298
        method attribute in order to add the route later, after we have both
299
        APIServer and NApp instances.
300
        """
301
        def store_route_params(function):
302
            """Store ``Flask`` ``@route`` parameters in a method attribute.
303
304
            There can be many @route decorators in a single function.
305
            """
306
            # To support any order: @classmethod, @rest or @rest, @classmethod
307
            # class and static decorators return a descriptor with the function
308
            # in __func__.
309
            if isinstance(function, (classmethod, staticmethod)):
310
                inner = function.__func__
311
            else:
312
                inner = function
313
            # Add route parameters
314
            if not hasattr(inner, 'route_params'):
315
                inner.route_params = []
316
            inner.route_params.append((rule, options))
317
            inner.created_at = datetime.utcnow()
318
            # Return the same function, now with "route_params" attribute
319
            return function
320
        return store_route_params
321
322
    @staticmethod
323
    def get_authenticate_options():
324
        """Return configuration options related to authentication."""
325
        options = KytosConfig().options['daemon']
326
        return options.authenticate_urls
327
328
    def authenticate_endpoints(self, napp):
329
        """Add authentication to defined REST endpoints.
330
331
        If any URL marked for authentication uses a function,
332
        that function will require authentication.
333
        """
334
        authenticate_urls = self.get_authenticate_options()
335
        for function in self._get_decorated_functions(napp):
336
            inner = getattr(function, '__func__', function)
337
            inner.authenticated = False
338
            for rule, _ in function.route_params:
339
                if inner.authenticated:
340
                    break
341
                absolute_rule = self.get_absolute_rule(rule, napp)
342
                for url in authenticate_urls:
343
                    if url in absolute_rule:
344
                        inner.authenticated = True
345
                        break
346
347
    def register_napp_endpoints(self, napp):
348
        """Add all NApp REST endpoints with @rest decorator.
349
350
        URLs will be prefixed with ``/api/{username}/{napp_name}/``.
351
352
        Args:
353
            napp (Napp): Napp instance to register new endpoints.
354
        """
355
        # Start all endpoints for this NApp
356
        for function in self._get_decorated_functions(napp):
357
            for rule, options in function.route_params:
358
                absolute_rule = self.get_absolute_rule(rule, napp)
359
                if getattr(function, 'authenticated', False):
360
                    function = authenticated(function)
361
                self._start_endpoint(self.app, absolute_rule,
362
                                     function, **options)
363
364
    @staticmethod
365
    def _get_decorated_functions(napp):
366
        """Return ``napp``'s methods having the @rest decorator.
367
368
        The callables are yielded based on their decorated order (created_at),
369
        this ensures deterministic routing matching order.
370
        """
371
        callables = []
372
        for name in dir(napp):
373
            if not name.startswith('_'):  # discarding private names
374
                pub_attr = getattr(napp, name)
375
                if callable(pub_attr) and hasattr(pub_attr, 'route_params'):
376
                    callables.append(pub_attr)
377
        try:
378
            callables = sorted(callables, key=lambda f: f.created_at)
379
        except TypeError:
380
            pass
381
382
        for pub_attr in callables:
383
            yield pub_attr
384
385
    @classmethod
386
    def get_absolute_rule(cls, rule, napp):
387
        """Prefix the rule, e.g. "flow" to "/api/user/napp/flow".
388
389
        This code is used by kytos-utils when generating an OpenAPI skel.
390
        """
391
        # Flask does require 2 slashes if specified, so we remove a starting
392
        # slash if applicable.
393
        relative_rule = rule[1:] if rule.startswith('/') else rule
394
        return cls._NAPP_PREFIX.format(napp=napp) + relative_rule
395
396
    # END decorator methods
397
398
    def _add_non_strict_slash_route(
399
        self,
400
        app: Starlette,
401
        route: str,
402
        function,
403
        **options
404
    ) -> None:
405
        """Try to add a non strict slash route."""
406
        if route == "/":
407
            return
408
        non_strict = route[:-1] if route.endswith("/") else f"{route}/"
409
        app.router.add_route(non_strict, function, **options,
410
                             include_in_schema=False)
411
412
    def _start_endpoint(
413
        self,
414
        app: Starlette,
415
        route: str,
416
        function,
417
        **options
418
    ):
419
        """Start ``function``'s endpoint."""
420
        app.router.add_route(route, function, **options)
421
        self._add_non_strict_slash_route(app, route, function, **options)
422
        LOG.info('Started %s - %s', route,
423
                 ', '.join(options.get('methods', self.DEFAULT_METHODS)))
424
425
    def remove_napp_endpoints(self, napp):
426
        """Remove all decorated endpoints.
427
428
        Args:
429
            napp (Napp): Napp instance to look for rest-decorated methods.
430
        """
431
        prefix = self._NAPP_PREFIX.format(napp=napp)
432
        indexes = []
433
        for index, route in enumerate(self.app.routes):
434
            if route.path.startswith(prefix):
435
                indexes.append(index)
436
        for index in reversed(indexes):
437
            self.app.routes.pop(index)
438
        LOG.info('The Rest endpoints from %s were disabled.', prefix)
439
440
    def register_core_napp_services(self):
441
        """
442
        Register /kytos/core/ services over NApps.
443
444
        It registers enable, disable, install, uninstall NApps that will
445
        be used by kytos-utils.
446
        """
447
        self.register_core_endpoint("napps/{username}/{napp_name}/enable",
448
                                    self._enable_napp)
449
        self.register_core_endpoint("napps/{username}/{napp_name}/disable",
450
                                    self._disable_napp)
451
        self.register_core_endpoint("napps/{username}/{napp_name}/install",
452
                                    self._install_napp)
453
        self.register_core_endpoint("napps/{username}/{napp_name}/uninstall",
454
                                    self._uninstall_napp)
455
        self.register_core_endpoint("napps_enabled",
456
                                    self._list_enabled_napps)
457
        self.register_core_endpoint("napps_installed",
458
                                    self._list_installed_napps)
459
        self.register_core_endpoint(
460
            "napps/{username}/{napp_name}/metadata/{key}",
461
            self._get_napp_metadata)
462
463 View Code Duplication
    def _enable_napp(self, request: Request) -> JSONResponse:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
464
        """Enable an installed NApp."""
465
466
        username = request.path_params["username"]
467
        napp_name = request.path_params["napp_name"]
468
        # Check if the NApp is installed
469
        if not self.napps_manager.is_installed(username, napp_name):
470
            return JSONResponse({"response": "not installed"},
471
                                status_code=HTTPStatus.BAD_REQUEST.value)
472
473
        # Check if the NApp is already been enabled
474
        if not self.napps_manager.is_enabled(username, napp_name):
475
            self.napps_manager.enable(username, napp_name)
476
477
        # Check if NApp is enabled
478
        if not self.napps_manager.is_enabled(username, napp_name):
479
            # If it is not enabled an admin user must check the log file
480
            status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
481
            return JSONResponse({"response": "error"},
482
                                status_code=status_code)
483
484
        return JSONResponse({"response": "enabled"})
485
486 View Code Duplication
    def _disable_napp(self, request: Request) -> JSONResponse:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
487
        """Disable an installed NApp."""
488
489
        username = request.path_params["username"]
490
        napp_name = request.path_params["napp_name"]
491
        # Check if the NApp is installed
492
        if not self.napps_manager.is_installed(username, napp_name):
493
            return JSONResponse({"response": "not installed"},
494
                                status_code=HTTPStatus.BAD_REQUEST.value)
495
496
        # Check if the NApp is enabled
497
        if self.napps_manager.is_enabled(username, napp_name):
498
            self.napps_manager.disable(username, napp_name)
499
500
        # Check if NApp is still enabled
501
        if self.napps_manager.is_enabled(username, napp_name):
502
            # If it is still enabled an admin user must check the log file
503
            status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
504
            return JSONResponse({"response": "error"},
505
                                status_code=status_code)
506
507
        return JSONResponse({"response": "disabled"})
508
509
    def _install_napp(self, request: Request) -> JSONResponse:
510
        username = request.path_params["username"]
511
        napp_name = request.path_params["napp_name"]
512
        # Check if the NApp is installed
513
        if self.napps_manager.is_installed(username, napp_name):
514
            return JSONResponse({"response": "installed"})
515
516
        napp = f"{username}/{napp_name}"
517
518
        # Try to install and enable the napp
519
        try:
520
            if not self.napps_manager.install(napp, enable=True):
521
                # If it is not installed an admin user must check the log file
522
                status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
523
                return JSONResponse({"response": "error"},
524
                                    status_code=status_code)
525
        except HTTPError as exception:
526
            return JSONResponse({"response": "error"},
527
                                status_code=exception.code)
528
529
        return JSONResponse({"response": "installed"})
530
531
    def _uninstall_napp(self, request: Request) -> JSONResponse:
532
        username = request.path_params["username"]
533
        napp_name = request.path_params["napp_name"]
534
        # Check if the NApp is installed
535
        if self.napps_manager.is_installed(username, napp_name):
536
            # Try to unload/uninstall the napp
537
            if not self.napps_manager.uninstall(username, napp_name):
538
                # If it is not uninstalled admin user must check the log file
539
                status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
540
                return JSONResponse({"response": "error"},
541
                                    status_code=status_code)
542
543
        return JSONResponse({"response": "uninstalled"})
544
545
    def _list_enabled_napps(self, _request: Request) -> JSONResponse:
546
        """Sorted list of (username, napp_name) of enabled napps."""
547
        napps = self.napps_manager.get_enabled_napps()
548
        napps = [[n.username, n.name] for n in napps]
549
        return JSONResponse({"napps": napps})
550
551
    def _list_installed_napps(self, _request: Request) -> JSONResponse:
552
        """Sorted list of (username, napp_name) of installed napps."""
553
        napps = self.napps_manager.get_installed_napps()
554
        napps = [[n.username, n.name] for n in napps]
555
        return JSONResponse({"napps": napps})
556
557
    def _get_napp_metadata(self, request: Request) -> JSONResponse:
558
        """Get NApp metadata value.
559
560
        For safety reasons, only some keys can be retrieved:
561
        napp_dependencies, description, version.
562
563
        """
564
        username = request.path_params["username"]
565
        napp_name = request.path_params["napp_name"]
566
        key = request.path_params["key"]
567
        valid_keys = ['napp_dependencies', 'description', 'version']
568
569
        if not self.napps_manager.is_installed(username, napp_name):
570
            return JSONResponse("NApp is not installed.",
571
                                status_code=HTTPStatus.BAD_REQUEST.value)
572
573
        if key not in valid_keys:
574
            return JSONResponse("Invalid key.",
575
                                status_code=HTTPStatus.BAD_REQUEST.value)
576
577
        data = self.napps_manager.get_napp_metadata(username, napp_name, key)
578
        return JSONResponse({key: data})
579