Passed
Push — master ( 4d1ed9...27ce7e )
by Vinicius
14:54 queued 08:58
created

kytos.core.db._mongo_conn_wait()   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 15
nop 3
dl 0
loc 16
ccs 13
cts 13
cp 1
crap 3
rs 9.65
c 0
b 0
f 0
1
"""DB client."""
2
# pylint: disable=invalid-name,redefined-outer-name,too-many-arguments
3
# pylint: disable=unsubscriptable-object,inconsistent-return-statements
4
5 2
import logging
6 2
import os
7 2
import sys
8 2
import time
9 2
from typing import List, Optional, Tuple
10
11 2
import pymongo.helpers
12 2
from pymongo import MongoClient
13 2
from pymongo.errors import AutoReconnect, OperationFailure
14
15 2
from kytos.core.exceptions import KytosDBInitException
16
17 2
LOG = logging.getLogger(__name__)
18
19
20 2
def _log_pymongo_thread_traceback() -> None:
21
    """Log pymongo thread traceback.
22
23
    Originally, it printed out to sys.stderr polluting it when handling
24
    certain asynchronous errors that can happen, with this patched function
25
    it logs to LOG.error instead.
26
    """
27 2
    if sys.stderr:
28 2
        einfo = sys.exc_info()
29 2
        try:
30 2
            LOG.error(einfo)
31
        except IOError:
32
            pass
33
        finally:
34 2
            del einfo
35
36
37 2
if hasattr(pymongo.helpers, "_handle_exception"):
38 2
    pymongo.helpers._handle_exception = _log_pymongo_thread_traceback
39
40
41 2
def mongo_client(
42
    host_seeds=os.environ.get(
43
        "MONGO_HOST_SEEDS", "mongo1:27017,mongo2:27018,mongo3:27019"
44
    ),
45
    username=os.environ.get("MONGO_USERNAME") or "invalid_user",
46
    password=os.environ.get("MONGO_PASSWORD") or "invalid_password",
47
    database=os.environ.get("MONGO_DBNAME") or "napps",
48
    connect=False,
49
    retrywrites=True,
50
    retryreads=True,
51
    readpreference="primaryPreferred",
52
    readconcernlevel="majority",
53
    maxpoolsize=int(os.environ.get("MONGO_MAX_POOLSIZE") or 300),
54
    minpoolsize=int(os.environ.get("MONGO_MIN_POOLSIZE") or 30),
55
    serverselectiontimeoutms=int(os.environ.get("MONGO_TIMEOUTMS") or 30000),
56
    **kwargs,
57
) -> MongoClient:
58
    """Instantiate a MongoClient instance.
59
60
    MongoClient is thread-safe and has connection-pooling built in. NApps are
61
    supposed to use the Mongo class that wraps a MongoClient. NApps might
62
    use a new MongoClient instance for exceptional cases where a NApp
63
    needs to parametrize differently.
64
    """
65 2
    return MongoClient(
66
        host_seeds.split(","),
67
        username=username,
68
        password=password,
69
        connect=connect,
70
        authsource=database,
71
        retrywrites=retrywrites,
72
        retryreads=retryreads,
73
        readpreference=readpreference,
74
        maxpoolsize=maxpoolsize,
75
        minpoolsize=minpoolsize,
76
        readconcernlevel=readconcernlevel,
77
        serverselectiontimeoutms=serverselectiontimeoutms,
78
        **kwargs,
79
    )
80
81
82 2
class Mongo:
83
    """MongoClient instance for NApps."""
84
85 2
    client = mongo_client(connect=False)
86 2
    db_name = os.environ.get("MONGO_DBNAME") or "napps"
87
88 2
    @classmethod
89 2
    def bootstrap_index(
90
        cls,
91
        collection: str,
92
        keys: List[Tuple[str, int]],
93
        **kwargs,
94
    ) -> Optional[str]:
95
        """Bootstrap index."""
96 2
        db = cls.client[cls.db_name]
97 2
        indexes = set()
98 2
        if "background" not in kwargs:
99 2
            kwargs["background"] = True
100
101 2
        for value in db[collection].index_information().values():
102
            if "key" in value and isinstance(value["key"], list):
103
                indexes.add(tuple(value["key"]))
104
105 2
        if tuple(keys) not in indexes:
106 2
            return db[collection].create_index(keys, **kwargs)
107
108
        return None
109
110
111 2
def _mongo_conn_wait(mongo_client=mongo_client, retries=12,
112
                     timeout_ms=10000) -> None:
113
    """Try to run 'hello' command on MongoDB and wait for it with retries."""
114 2
    try:
115 2
        client = mongo_client(maxpoolsize=6, minpoolsize=3,
116
                              serverselectiontimeoutms=timeout_ms)
117 2
        LOG.info("Trying to run 'hello' command on MongoDB...")
118 2
        client.db.command("hello")
119 2
        LOG.info("Ran 'hello' command on MongoDB successfully. It's ready!")
120 2
    except (OperationFailure, AutoReconnect) as exc:
121 2
        retries -= 1
122 2
        if retries > 0:
123 2
            time.sleep(max(timeout_ms / 1000, 1))
124 2
            return _mongo_conn_wait(mongo_client, retries, timeout_ms)
125 2
        LOG.error("Maximum retries reached when waiting for MongoDB")
126 2
        raise KytosDBInitException(str(exc), exc)
127
128
129 2
def db_conn_wait(db_backend="mongodb", retries=12, timeout_ms=10000) -> None:
130
    """DB conn wait."""
131 2
    try:
132 2
        LOG.info("Starting DB connection")
133 2
        conn_wait_funcs = {"mongodb": _mongo_conn_wait}
134 2
        return conn_wait_funcs[db_backend](retries=retries,
135
                                           timeout_ms=timeout_ms)
136 2
    except KeyError:
137 2
        client_names = ",".join(list(conn_wait_funcs.keys()))
0 ignored issues
show
introduced by
The variable conn_wait_funcs does not seem to be defined for all execution paths.
Loading history...
138 2
        raise KytosDBInitException(
139
            f"DB backend '{db_backend}' isn't supported."
140
            f" Current supported databases: {client_names}"
141
        )
142