Test Failed
Pull Request — master (#202)
by Vinicius
11:52 queued 04:29
created

kytos.core.db   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 138
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 92
dl 0
loc 138
rs 10
c 0
b 0
f 0
wmc 14

4 Functions

Rating   Name   Duplication   Size   Complexity  
A _mongo_conn_wait() 0 15 3
A _log_pymongo_thread_traceback() 0 15 3
A mongo_client() 0 38 1
A db_conn_wait() 0 11 2

1 Method

Rating   Name   Duplication   Size   Complexity  
A Mongo.bootstrap_index() 0 20 5
1
"""DB client."""
2
# pylint: disable=invalid-name,redefined-outer-name,too-many-arguments
3
# pylint: disable=unsubscriptable-object,inconsistent-return-statements
4
5
import logging
6
import os
7
import sys
8
import time
9
from typing import Optional
10
11
import pymongo.helpers
12
from pymongo import MongoClient
13
from pymongo.errors import AutoReconnect, OperationFailure
14
15
from kytos.core.exceptions import KytosDBInitException
16
17
LOG = logging.getLogger(__name__)
18
19
20
def _log_pymongo_thread_traceback() -> None:
21
    """Log pymongo thread traceback.
22
23
    Originally, it printed out to sys.stderr poluting it when handling
24
    certain asynchronous errors that can happen, with this patched function
25
    it logs to LOG.error instead.
26
    """
27
    if sys.stderr:
28
        einfo = sys.exc_info()
29
        try:
30
            LOG.error(einfo)
31
        except IOError:
32
            pass
33
        finally:
34
            del einfo
35
36
37
if hasattr(pymongo.helpers, "_handle_exception"):
38
    pymongo.helpers._handle_exception = _log_pymongo_thread_traceback
39
40
41
def mongo_client(
42
    host_seeds=os.environ.get(
43
        "MONGO_HOST_SEEDS", "mongo1:27017,mongo2:27018,mongo3:27019"
44
    ),
45
    username=os.environ.get("MONGO_USERNAME") or "invalid_user",
46
    password=os.environ.get("MONGO_PASSWORD") or "invalid_password",
47
    database=os.environ.get("MONGO_DBNAME") or "napps",
48
    connect=False,
49
    retrywrites=True,
50
    retryreads=True,
51
    readpreference="primaryPreferred",
52
    readconcernlevel="majority",
53
    maxpoolsize=int(os.environ.get("MONGO_MAX_POOLSIZE") or 300),
54
    minpoolsize=int(os.environ.get("MONGO_MIN_POOLSIZE") or 30),
55
    serverselectiontimeoutms=30000,
56
    **kwargs,
57
) -> MongoClient:
58
    """Instantiate a MongoClient instance.
59
60
    MongoClient is thread-safe and has connection-pooling built in. NApps are
61
    supposed to use the Mongo class that wraps a MongoClient. NApps might
62
    use a new MongoClient instance for exceptional cases where a NApp
63
    needs to parametrize differently.
64
    """
65
    return MongoClient(
66
        host_seeds.split(","),
67
        username=username,
68
        password=password,
69
        connect=connect,
70
        authsource=database,
71
        retrywrites=retrywrites,
72
        retryreads=retryreads,
73
        readpreference=readpreference,
74
        maxpoolsize=maxpoolsize,
75
        minpoolsize=minpoolsize,
76
        readconcernlevel=readconcernlevel,
77
        serverselectiontimeoutms=serverselectiontimeoutms,
78
        **kwargs,
79
    )
80
81
82
class Mongo:
83
    """MongoClient instance for NApps."""
84
85
    client = mongo_client(connect=False)
86
    db_name = os.environ.get("MONGO_DBNAME") or "napps"
87
88
    @classmethod
89
    def bootstrap_index(
90
        cls,
91
        collection: str,
92
        index: str,
93
        direction: int,
94
        **kwargs,
95
    ) -> Optional[str]:
96
        """Bootstrap index."""
97
        db = cls.client[cls.db_name]
98
        indexes = set()
99
100
        for value in db[collection].index_information().values():
101
            if "key" in value and isinstance(value["key"], list):
102
                indexes.add(value["key"][0])
103
104
        if (index, direction) not in indexes:
105
            return db[collection].create_index([(index, direction)], **kwargs)
106
107
        return None
108
109
110
def _mongo_conn_wait(mongo_client=mongo_client, retries=10,
111
                     timeout_ms=10000) -> None:
112
    """Try to run 'hello' command on MongoDB and wait for it with retries."""
113
    try:
114
        client = mongo_client(maxpoolsize=6, minpoolsize=3)
115
        LOG.info("Trying to run 'hello' command on MongoDB...")
116
        client.db.command("hello")
117
        LOG.info("Ran 'hello' command on MongoDB successfully. It's ready!")
118
    except (OperationFailure, AutoReconnect) as exc:
119
        retries -= 1
120
        if retries > 0:
121
            time.sleep(max(timeout_ms / 1000, 1))
122
            return _mongo_conn_wait(mongo_client, retries, timeout_ms)
123
        LOG.error("Maximum retries reached when waiting for MongoDB")
124
        raise KytosDBInitException(str(exc), exc)
125
126
127
def db_conn_wait(db_backend="mongodb", retries=12, timeout_ms=10000) -> None:
128
    """DB conn wait."""
129
    try:
130
        LOG.info("Starting DB connection")
131
        conn_wait_funcs = {"mongodb": _mongo_conn_wait}
132
        return conn_wait_funcs[db_backend](retries=retries,
133
                                           timeout_ms=timeout_ms)
134
    except KeyError:
135
        client_names = ",".join(list(conn_wait_funcs.keys()))
0 ignored issues
show
introduced by
The variable conn_wait_funcs does not seem to be defined for all execution paths.
Loading history...
136
        raise KytosDBInitException(
137
            f"DB backend '{db_backend}' isn't supported."
138
            f" Current supported databases: {client_names}"
139
        )
140