Test Failed
Pull Request — master (#209)
by Vinicius
04:39 queued 01:02
created

kytos.core.apm.ElasticAPM.init_flask_app()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""APM module."""
2
import os
3
from functools import wraps
4
5
from elasticapm import Client
6
from elasticapm.contrib.flask import ElasticAPM as FlaskAPM
7
from elasticapm.traces import execution_context
8
9
from kytos.core.exceptions import KytosAPMInitException
10
11
12
def init_apm(apm_backend="es", **kwargs):
13
    """Init APM backend."""
14
    backends = {"es": ElasticAPM}
15
    try:
16
        return backends[apm_backend].init_client(**kwargs)
17
    except KeyError:
18
        client_names = ",".join(list(backends.keys()))
19
        raise KytosAPMInitException(
20
            f"APM backend '{apm_backend}' isn't supported."
21
            f" Current supported APMs: {client_names}"
22
        )
23
24
25
def begin_span(func, span_type="custom"):
26
    """APM transaction begin_span decorator."""
27
28
    @wraps(func)
29
    def wrapper(*args, **kwds):
30
        transaction = execution_context.get_transaction()
31
        if transaction:
32
            transaction.begin_span(func.__name__, span_type)
33
        result = func(*args, **kwds)
34
        if transaction:
35
            transaction.end_span()
36
        return result
37
38
    return wrapper
39
40
41
class ElasticAPM:
42
    """ElasticAPM Client instance."""
43
44
    _flask_apm: FlaskAPM = None
45
    _client: Client = None
46
47
    @classmethod
48
    def get_client(cls, **kwargs) -> Client:
49
        """Get client."""
50
        if not cls._client:
51
            return cls.init_client(**kwargs)
52
        return cls._client
53
54
    @classmethod
55
    def init_flask_app(cls, app) -> FlaskAPM:
56
        """Init Flask APM Client."""
57
        if not cls._flask_apm:
58
            return None
59
        cls._flask_apm.init_app(app)
60
        return cls._flask_apm
61
62
    @classmethod
63
    def init_client(
64
        cls,
65
        service_name=os.environ.get("ELASTIC_APM_SERVICE_NAME", "kytos"),
66
        server_url=os.environ.get("ELASTIC_APM_URL", "http://localhost:8200"),
67
        secret_token=os.environ.get("ELASTIC_APM_SECRET_TOKEN",
68
                                    "elasticapm_token"),
69
        **kwargs,
70
    ) -> Client:
71
        """Init APM Client."""
72
        app = kwargs.pop("app", None)
73
        if not cls._client:
74
            cls._client = Client(
75
                service_name=service_name,
76
                server_url=server_url,
77
                secret_token=secret_token,
78
                **kwargs,
79
            )
80
        if not cls._flask_apm:
81
            cls._flask_apm = FlaskAPM(client=cls._client, app=app)
82
        return cls._client
83