Passed
Pull Request — master (#84)
by Vinicius
02:57
created

build.db.client   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 54
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 38
dl 0
loc 54
ccs 13
cts 13
cp 1
rs 10
c 0
b 0
f 0
wmc 6

2 Functions

Rating   Name   Duplication   Size   Complexity  
A mongo_client() 0 28 1
A bootstrap_index() 0 14 5
1
"""DB client."""
2
3 1
import os
4 1
from typing import Optional
5
6 1
from pymongo import MongoClient
7
8
9 1
def mongo_client(
10
    host_seeds=os.environ.get(
11
        "MONGO_HOST_SEEDS", "mongo1:27017,mongo2:27018,mongo3:27019"
12
    ),
13
    username=os.environ.get("MONGO_USERNAME"),
14
    password=os.environ.get("MONGO_PASSWORD"),
15
    database=os.environ.get("MONGO_DBNAME", "napps"),
16
    connect=False,
17
    retrywrites=True,
18
    retryreads=True,
19
    readpreference='primaryPreferred',
20
    maxpoolsize=int(os.environ.get("MONGO_MAX_POOLSIZE", 100)),
21
    minpoolsize=int(os.environ.get("MONGO_MIN_POOLSIZE", 10)),
22
    **kwargs,
23
) -> MongoClient:
24
    """mongo_client."""
25 1
    return MongoClient(
26
        host_seeds.split(","),
27
        username=username,
28
        password=password,
29
        connect=False,
30
        authsource=database,
31
        retrywrites=retrywrites,
32
        retryreads=retryreads,
33
        readpreference=readpreference,
34
        maxpoolsize=maxpoolsize,
35
        minpoolsize=minpoolsize,
36
        **kwargs,
37
    )
38
39
40 1
def bootstrap_index(
41
    db: MongoClient, collection: str, index: str, direction: int, **kwargs
42
) -> Optional[str]:
43
    """Bootstrap index."""
44 1
    indexes = set()
45
46 1
    for value in db[collection].index_information().values():
47 1
        if "key" in value and isinstance(value["key"], list):
48 1
            indexes.add(value["key"][0])
49
50 1
    if (index, direction) not in indexes:
51 1
        return db[collection].create_index([(index, direction)], **kwargs)
52
53
    return None
54