APIServer._get_napp_metadata()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 2
dl 0
loc 22
rs 9.75
c 0
b 0
f 0
1
"""Module used to handle a API Server."""
2
import logging
3
import os
4
import shutil
5
import warnings
6
import zipfile
7
from datetime import datetime
8
from glob import glob
9
from http import HTTPStatus
10
from typing import Optional, Union
11
from urllib.error import HTTPError, URLError
12
from urllib.request import urlretrieve
13
14
import httpx
15
from anyio import CapacityLimiter
16
from anyio.lowlevel import RunVar
17
from starlette.applications import Starlette
18
from starlette.exceptions import HTTPException
19
from starlette.middleware import Middleware
20
from starlette.middleware.cors import CORSMiddleware
21
from starlette.responses import FileResponse
22
from starlette.staticfiles import StaticFiles
23
from uvicorn import Config as UvicornConfig
24
from uvicorn import Server
25
26
from kytos.core.auth import authenticated
27
from kytos.core.config import KytosConfig
28
from kytos.core.helpers import get_thread_pool_max_workers
29
from kytos.core.rest_api import JSONResponse, Request
30
31
LOG = logging.getLogger(__name__)
32
33
34
class APIServer:
35
    """Api server used to provide Kytos Controller routes."""
36
37
    DEFAULT_METHODS = ('GET',)
38
    _NAPP_PREFIX = "/api/{napp.username}/{napp.name}/"
39
    _CORE_PREFIX = "/api/kytos/core/"
40
    _route_index_count = 0
41
42
    # pylint: disable=too-many-arguments
43
    def __init__(self, listen='0.0.0.0', port=8181,
44
                 napps_manager=None, napps_dir=None,
45
                 response_traceback_on_500=True):
46
        """APIServer.
47
48
        Require controller to get NApps dir and NAppsManager
49
        """
50
        dirname = os.path.dirname(os.path.abspath(__file__))
51
        self.napps_manager = napps_manager
52
        self.napps_dir = napps_dir
53
        self.listen = listen
54
        self.port = port
55
        self.web_ui_dir = os.path.join(dirname, '../web-ui')
56
        self.app = Starlette(
57
            debug=response_traceback_on_500,
58
            exception_handlers={HTTPException: self._http_exc_handler},
59
            middleware=[
60
                Middleware(CORSMiddleware, allow_origins=["*"]),
61
            ],
62
        )
63
        kytos_conf = KytosConfig().options["daemon"]
64
65
        api_threadpool_size = get_thread_pool_max_workers().get('api', 512)
66
67
        concurrency_limit = kytos_conf.api_concurrency_limit
68
        if concurrency_limit == 'threadpool':
69
            concurrency_limit = api_threadpool_size
70
71
        @self.app.on_event("startup")
72
        def _app_startup_func():
73
            RunVar("_default_thread_limiter").set(
74
                CapacityLimiter(api_threadpool_size)
75
            )
76
77
        self.server = Server(
78
            UvicornConfig(
79
                self.app,
80
                host=self.listen,
81
                port=self.port,
82
                limit_concurrency=concurrency_limit
83
            )
84
        )
85
86
        # Update web-ui if necessary
87
        self.update_web_ui(None, force=False)
88
89
    async def _http_exc_handler(self, _request: Request, exc: HTTPException):
90
        """HTTPException handler.
91
92
        The response format is still partly compatible with how werkzeug used
93
        to format http exceptions.
94
        """
95
        return JSONResponse({"description": exc.detail,
96
                             "code": exc.status_code},
97
                            status_code=exc.status_code)
98
99
    @classmethod
100
    def _get_next_route_index(cls) -> int:
101
        """Get next route index.
102
103
        This classmethod is meant to ensure route ordering when allocating
104
        an index for each route, the @rest decorator will use it. Decorated
105
        routes are imported sequentially so this won't need a threading Lock.
106
        """
107
        index = cls._route_index_count
108
        cls._route_index_count += 1
109
        return index
110
111
    async def serve(self):
112
        """Start serving."""
113
        await self.server.serve()
114
115
    def stop(self):
116
        """Stop serving."""
117
        self.server.should_exit = True
118
119
    def register_rest_endpoint(self, url, function, methods):
120
        """Deprecate in favor of @rest decorator."""
121
        warnings.warn("From now on, use @rest decorator.", DeprecationWarning,
122
                      stacklevel=2)
123
        if url.startswith('/'):
124
            url = url[1:]
125
        self._start_endpoint(self.app, f'/kytos/{url}', function,
126
                             methods=methods)
127
128
    def start_api(self):
129
        """Start this APIServer instance API."""
130
        self.register_core_endpoint('status/', self.status_api)
131
        self.register_core_endpoint('web/update/{version}/',
132
                                    self.update_web_ui,
133
                                    methods=['POST'])
134
        self.register_core_endpoint('web/update/',
135
                                    self.update_web_ui,
136
                                    methods=['POST'])
137
138
        self.register_core_napp_services()
139
140
    def start_web_ui(self) -> None:
141
        """Start Web UI endpoints."""
142
        self._start_endpoint(self.app,
143
                             "/ui/{username}/{napp_name}/{filename:path}",
144
                             self.static_web_ui)
145
        self._start_endpoint(self.app,
146
                             "/ui/{section_name}/",
147
                             self.get_ui_components)
148
        self._start_endpoint(self.app, '/', self.web_ui)
149
        self._start_endpoint(self.app, '/index.html', self.web_ui)
150
        self.start_web_ui_static_files()
151
152
    def start_web_ui_static_files(self) -> None:
153
        """Start Web UI static files."""
154
        self.app.router.mount("/", app=StaticFiles(directory=self.web_ui_dir),
155
                              name="dist")
156
157
    def register_core_endpoint(self, route, function, **options):
158
        """Register an endpoint with the URL /api/kytos/core/<route>.
159
160
        Not used by NApps, but controller.
161
        """
162
        self._start_endpoint(self.app, f"{self._CORE_PREFIX}{route}",
163
                             function, **options)
164
165
    def status_api(self, _request: Request):
166
        """Display kytos status using the route ``/kytos/status/``."""
167
        uptime = self.napps_manager._controller.uptime()
168
        response = {
169
            "response": "running",
170
            "started_at": self.napps_manager._controller.started_at,
171
            "uptime_seconds": uptime.seconds if uptime else 0,
172
        }
173
        return JSONResponse(response)
174
175
    def static_web_ui(self,
176
                      request: Request) -> Union[FileResponse, JSONResponse]:
177
        """Serve static files from installed napps."""
178
        username = request.path_params["username"]
179
        napp_name = request.path_params["napp_name"]
180
        filename = request.path_params["filename"]
181
        path = f"{self.napps_dir}/{username}/{napp_name}/ui/{filename}"
182
        if os.path.exists(path):
183
            return FileResponse(path)
184
        return JSONResponse("", status_code=HTTPStatus.NOT_FOUND.value)
185
186
    def get_ui_components(self, request: Request) -> JSONResponse:
187
        """Return all napps ui components from an specific section.
188
189
        The component name generated will have the following structure:
190
        {username}-{nappname}-{component-section}-{filename}`
191
192
        Args:
193
            section_name (str): Specific section name
194
195
        Returns:
196
            str: Json with a list of all components found.
197
198
        """
199
        section_name = request.path_params["section_name"]
200
        section_name = '*' if section_name == "all" else section_name
201
        path = f"{self.napps_dir}/*/*/ui/{section_name}/*.kytos"
202
        components = []
203
        for name in glob(path):
204
            dirs_name = name.split('/')
205
            dirs_name.remove('ui')
206
207
            component_name = '-'.join(dirs_name[-4:]).replace('.kytos', '')
208
            url = f'ui/{"/".join(dirs_name[-4:])}'
209
            component = {'name': component_name, 'url': url}
210
            components.append(component)
211
        return JSONResponse(components)
212
213
    def web_ui(self, _request: Request) -> Union[JSONResponse, FileResponse]:
214
        """Serve the index.html page for the admin-ui."""
215
        index_path = f"{self.web_ui_dir}/index.html"
216
        if os.path.exists(index_path):
217
            return FileResponse(index_path)
218
        return JSONResponse(f"File '{index_path}' not found.",
219
                            status_code=HTTPStatus.NOT_FOUND.value)
220
221
    def _unzip_backup_web_ui(self, package: str, uri: str) -> None:
222
        """Unzip and backup web ui files.
223
224
        backup the old web-ui files and create a new web-ui folder
225
        if there is no path to backup, zip.extractall will
226
        create the path.
227
        """
228
        with zipfile.ZipFile(package, 'r') as zip_ref:
229
            if zip_ref.testzip() is not None:
230
                LOG.error("Web update - Zip file from %s "
231
                          "is corrupted.", uri)
232
                raise ValueError(f'Zip file from {uri} is corrupted.')
233
            if os.path.exists(self.web_ui_dir):
234
                LOG.info("Web update - Performing UI backup.")
235
                date = datetime.now().strftime("%Y%m%d%H%M%S")
236
                shutil.move(self.web_ui_dir, f"{self.web_ui_dir}-{date}")
237
                os.mkdir(self.web_ui_dir)
238
            # unzip and extract files to web-ui/*
239
            zip_ref.extractall(self.web_ui_dir)
240
            zip_ref.close()
241
242
    def _fetch_latest_ui_tag(self) -> str:
243
        """Fetch latest ui tag version from GitHub."""
244
        version = '2022.3.0'
245
        try:
246
            url = ('https://api.github.com/repos/kytos-ng/'
247
                   'ui/releases/latest')
248
            response = httpx.get(url, timeout=10)
249
            version = response.json()['tag_name']
250
        except (httpx.RequestError, KeyError):
251
            msg = "Failed to fetch latest tag from GitHub, " \
252
                  f"falling back to {version}"
253
            LOG.warning(f"Web update - {msg}")
254
        return version
255
256
    def update_web_ui(self, request: Optional[Request],
257
                      version='latest',
258
                      force=True) -> JSONResponse:
259
        """Update the static files for the Web UI.
260
261
        Download the latest files from the UI github repository and update them
262
        in the ui folder.
263
        The repository link is currently hardcoded here.
264
        """
265
        if not os.path.exists(self.web_ui_dir) or force:
266
            if request:
267
                version = request.path_params.get("version", "latest")
268
            if version == 'latest':
269
                version = self._fetch_latest_ui_tag()
270
271
            repository = "https://github.com/kytos-ng/ui"
272
            uri = repository + f"/releases/download/{version}/latest.zip"
273
            try:
274
                LOG.info("Web update - Downloading UI from %s.", uri)
275
                package = urlretrieve(uri)[0]
276
                self._unzip_backup_web_ui(package, uri)
277
            except HTTPError:
278
                LOG.error("Web update - Uri not found %s.", uri)
279
                return JSONResponse(
280
                        f"Uri not found {uri}.",
281
                        status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value
282
                        )
283
            except URLError:
284
                LOG.error("Web update - Error accessing URL %s.", uri)
285
                return JSONResponse(
286
                        f"Error accessing URL {uri}.",
287
                        status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value
288
                       )
289
            except ValueError as exc:
290
                return JSONResponse(
291
                        str(exc),
292
                        status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value
293
                       )
294
            LOG.info("Web update - Updated")
295
            return JSONResponse("Web UI was updated")
296
297
        LOG.error("Web update - Web UI was not updated")
298
        return JSONResponse("Web ui was not updated",
299
                            status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value)
300
301
    # BEGIN decorator methods
302
    @staticmethod
303
    def decorate_as_endpoint(rule, **options):
304
        """Decorate methods as REST endpoints.
305
306
        Example for URL ``/api/myusername/mynapp/sayhello/World``:
307
308
        .. code-block:: python3
309
310
311
           from kytos.core.napps import rest
312
           from kytos.core.rest_api import JSONResponse, Request
313
314
           @rest("sayhello/{name}")
315
           def say_hello(request: Request) -> JSONResponse:
316
               name = request.path_params["name"]
317
               return JSONResponse({"data": f"Hello, {name}!"})
318
319
        ``@rest`` parameters are the same as Starlette's ``add_route``. You can
320
        also add ``methods=['POST']``, for example.
321
322
        As we don't have the NApp instance now, we store the parameters in a
323
        method attribute in order to add the route later, after we have both
324
        APIServer and NApp instances.
325
        """
326
        def store_route_params(function):
327
            """Store ``@route`` parameters in a method attribute.
328
329
            There can be many @route decorators in a single function.
330
            """
331
            # To support any order: @classmethod, @rest or @rest, @classmethod
332
            # class and static decorators return a descriptor with the function
333
            # in __func__.
334
            if isinstance(function, (classmethod, staticmethod)):
335
                inner = function.__func__
336
            else:
337
                inner = function
338
            # Add route parameters
339
            if not hasattr(inner, 'route_params'):
340
                inner.route_params = []
341
            inner.route_params.append((rule, options))
342
            inner.route_index = APIServer._get_next_route_index()
343
            # Return the same function, now with "route_params" attribute
344
            return function
345
        return store_route_params
346
347
    @staticmethod
348
    def get_authenticate_options():
349
        """Return configuration options related to authentication."""
350
        options = KytosConfig().options['daemon']
351
        return options.authenticate_urls
352
353
    def authenticate_endpoints(self, napp):
354
        """Add authentication to defined REST endpoints.
355
356
        If any URL marked for authentication uses a function,
357
        that function will require authentication.
358
        """
359
        authenticate_urls = self.get_authenticate_options()
360
        for function in self._get_decorated_functions(napp):
361
            inner = getattr(function, '__func__', function)
362
            inner.authenticated = False
363
            for rule, _ in function.route_params:
364
                if inner.authenticated:
365
                    break
366
                absolute_rule = self.get_absolute_rule(rule, napp)
367
                for url in authenticate_urls:
368
                    if url in absolute_rule:
369
                        inner.authenticated = True
370
                        break
371
372
    def register_napp_endpoints(self, napp):
373
        """Add all NApp REST endpoints with @rest decorator.
374
375
        URLs will be prefixed with ``/api/{username}/{napp_name}/``.
376
377
        Args:
378
            napp (Napp): Napp instance to register new endpoints.
379
        """
380
        # Start all endpoints for this NApp
381
        for function in self._get_decorated_functions(napp):
382
            for rule, options in function.route_params:
383
                absolute_rule = self.get_absolute_rule(rule, napp)
384
                if getattr(function, 'authenticated', False):
385
                    function = authenticated(function)
386
                self._start_endpoint(self.app, absolute_rule,
387
                                     function, **options)
388
389
    @staticmethod
390
    def _get_decorated_functions(napp):
391
        """Return ``napp``'s methods having the @rest decorator.
392
393
        The callables are yielded based on their decorated order,
394
        this ensures deterministic routing matching order.
395
        """
396
        callables = []
397
        for name in dir(napp):
398
            if not name.startswith('_'):  # discarding private names
399
                pub_attr = getattr(napp, name)
400
                if (
401
                    callable(pub_attr)
402
                    and hasattr(pub_attr, "route_params")
403
                    and hasattr(pub_attr, "route_index")
404
                    and isinstance(pub_attr.route_index, int)
405
                ):
406
                    callables.append(pub_attr)
407
        yield from sorted(callables, key=lambda f: f.route_index)
408
409
    @classmethod
410
    def get_absolute_rule(cls, rule, napp):
411
        """Prefix the rule, e.g. "flow" to "/api/user/napp/flow".
412
413
        This code is used by kytos-utils when generating an OpenAPI skel.
414
        """
415
        # Flask does require 2 slashes if specified, so we remove a starting
416
        # slash if applicable.
417
        relative_rule = rule[1:] if rule.startswith('/') else rule
418
        return cls._NAPP_PREFIX.format(napp=napp) + relative_rule
419
420
    # END decorator methods
421
422
    def _add_non_strict_slash_route(
423
        self,
424
        app: Starlette,
425
        route: str,
426
        function,
427
        **options
428
    ) -> None:
429
        """Try to add a non strict slash route."""
430
        if route == "/":
431
            return
432
        non_strict = route[:-1] if route.endswith("/") else f"{route}/"
433
        app.router.add_route(non_strict, function, **options,
434
                             include_in_schema=False)
435
436
    def _start_endpoint(
437
        self,
438
        app: Starlette,
439
        route: str,
440
        function,
441
        **options
442
    ):
443
        """Start ``function``'s endpoint."""
444
        app.router.add_route(route, function, **options)
445
        self._add_non_strict_slash_route(app, route, function, **options)
446
        LOG.info('Started %s - %s', route,
447
                 ', '.join(options.get('methods', self.DEFAULT_METHODS)))
448
449
    def remove_napp_endpoints(self, napp):
450
        """Remove all decorated endpoints.
451
452
        Args:
453
            napp (Napp): Napp instance to look for rest-decorated methods.
454
        """
455
        prefix = self._NAPP_PREFIX.format(napp=napp)
456
        indexes = []
457
        for index, route in enumerate(self.app.routes):
458
            if route.path.startswith(prefix):
459
                indexes.append(index)
460
        for index in reversed(indexes):
461
            self.app.routes.pop(index)
462
        LOG.info('The Rest endpoints from %s were disabled.', prefix)
463
464
    def register_core_napp_services(self):
465
        """
466
        Register /kytos/core/ services over NApps.
467
468
        It registers enable, disable, install, uninstall NApps that will
469
        be used by kytos-utils.
470
        """
471
        self.register_core_endpoint("napps/{username}/{napp_name}/enable",
472
                                    self._enable_napp)
473
        self.register_core_endpoint("napps/{username}/{napp_name}/disable",
474
                                    self._disable_napp)
475
        self.register_core_endpoint("napps/{username}/{napp_name}/install",
476
                                    self._install_napp)
477
        self.register_core_endpoint("napps/{username}/{napp_name}/uninstall",
478
                                    self._uninstall_napp)
479
        self.register_core_endpoint("napps_enabled",
480
                                    self._list_enabled_napps)
481
        self.register_core_endpoint("napps_installed",
482
                                    self._list_installed_napps)
483
        self.register_core_endpoint(
484
            "napps/{username}/{napp_name}/metadata/{key}",
485
            self._get_napp_metadata)
486
487 View Code Duplication
    def _enable_napp(self, request: Request) -> JSONResponse:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
488
        """Enable an installed NApp."""
489
490
        username = request.path_params["username"]
491
        napp_name = request.path_params["napp_name"]
492
        # Check if the NApp is installed
493
        if not self.napps_manager.is_installed(username, napp_name):
494
            return JSONResponse({"response": "not installed"},
495
                                status_code=HTTPStatus.BAD_REQUEST.value)
496
497
        # Check if the NApp is already been enabled
498
        if not self.napps_manager.is_enabled(username, napp_name):
499
            self.napps_manager.enable(username, napp_name)
500
501
        # Check if NApp is enabled
502
        if not self.napps_manager.is_enabled(username, napp_name):
503
            # If it is not enabled an admin user must check the log file
504
            status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
505
            return JSONResponse({"response": "error"},
506
                                status_code=status_code)
507
508
        return JSONResponse({"response": "enabled"})
509
510 View Code Duplication
    def _disable_napp(self, request: Request) -> JSONResponse:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
511
        """Disable an installed NApp."""
512
513
        username = request.path_params["username"]
514
        napp_name = request.path_params["napp_name"]
515
        # Check if the NApp is installed
516
        if not self.napps_manager.is_installed(username, napp_name):
517
            return JSONResponse({"response": "not installed"},
518
                                status_code=HTTPStatus.BAD_REQUEST.value)
519
520
        # Check if the NApp is enabled
521
        if self.napps_manager.is_enabled(username, napp_name):
522
            self.napps_manager.disable(username, napp_name)
523
524
        # Check if NApp is still enabled
525
        if self.napps_manager.is_enabled(username, napp_name):
526
            # If it is still enabled an admin user must check the log file
527
            status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
528
            return JSONResponse({"response": "error"},
529
                                status_code=status_code)
530
531
        return JSONResponse({"response": "disabled"})
532
533
    def _install_napp(self, request: Request) -> JSONResponse:
534
        username = request.path_params["username"]
535
        napp_name = request.path_params["napp_name"]
536
        # Check if the NApp is installed
537
        if self.napps_manager.is_installed(username, napp_name):
538
            return JSONResponse({"response": "installed"})
539
540
        napp = f"{username}/{napp_name}"
541
542
        # Try to install and enable the napp
543
        try:
544
            if not self.napps_manager.install(napp, enable=True):
545
                # If it is not installed an admin user must check the log file
546
                status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
547
                return JSONResponse({"response": "error"},
548
                                    status_code=status_code)
549
        except HTTPError as exception:
550
            return JSONResponse({"response": "error"},
551
                                status_code=exception.code)
552
553
        return JSONResponse({"response": "installed"})
554
555
    def _uninstall_napp(self, request: Request) -> JSONResponse:
556
        username = request.path_params["username"]
557
        napp_name = request.path_params["napp_name"]
558
        # Check if the NApp is installed
559
        if self.napps_manager.is_installed(username, napp_name):
560
            # Try to unload/uninstall the napp
561
            if not self.napps_manager.uninstall(username, napp_name):
562
                # If it is not uninstalled admin user must check the log file
563
                status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value
564
                return JSONResponse({"response": "error"},
565
                                    status_code=status_code)
566
567
        return JSONResponse({"response": "uninstalled"})
568
569
    def _list_enabled_napps(self, _request: Request) -> JSONResponse:
570
        """Sorted list of (username, napp_name) of enabled napps."""
571
        napps = self.napps_manager.get_enabled_napps()
572
        napps = [[n.username, n.name] for n in napps]
573
        return JSONResponse({"napps": napps})
574
575
    def _list_installed_napps(self, _request: Request) -> JSONResponse:
576
        """Sorted list of (username, napp_name) of installed napps."""
577
        napps = self.napps_manager.get_installed_napps()
578
        napps = [[n.username, n.name] for n in napps]
579
        return JSONResponse({"napps": napps})
580
581
    def _get_napp_metadata(self, request: Request) -> JSONResponse:
582
        """Get NApp metadata value.
583
584
        For safety reasons, only some keys can be retrieved:
585
        napp_dependencies, description, version.
586
587
        """
588
        username = request.path_params["username"]
589
        napp_name = request.path_params["napp_name"]
590
        key = request.path_params["key"]
591
        valid_keys = ['napp_dependencies', 'description', 'version']
592
593
        if not self.napps_manager.is_installed(username, napp_name):
594
            return JSONResponse("NApp is not installed.",
595
                                status_code=HTTPStatus.BAD_REQUEST.value)
596
597
        if key not in valid_keys:
598
            return JSONResponse("Invalid key.",
599
                                status_code=HTTPStatus.BAD_REQUEST.value)
600
601
        data = self.napps_manager.get_napp_metadata(username, napp_name, key)
602
        return JSONResponse({key: data})
603