Test Failed
Pull Request — master (#84)
by Vinicius
38:36
created

build.db.client   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 56
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 56
rs 10
c 0
b 0
f 0
wmc 7

2 Functions

Rating   Name   Duplication   Size   Complexity  
A mongo_client() 0 28 1
B bootstrap_index() 0 16 6
1
"""DB client."""
2
3
import os
4
from typing import Optional
5
6
from pymongo import MongoClient
7
8
9
def mongo_client(
10
    host_seeds=os.environ.get(
11
        "MONGO_HOST_SEEDS", "mongo1:27017,mongo2:27018,mongo3:27019"
12
    ),
13
    username=os.environ.get("MONGO_USERNAME"),
14
    password=os.environ.get("MONGO_PASSWORD"),
15
    database=os.environ.get("MONGO_DBNAME", "napps"),
16
    connect=False,
17
    retrywrites=True,
18
    retryreads=True,
19
    readpreference='primaryPreferred',
20
    maxpoolsize=int(os.environ.get("MONGO_MAX_POOLSIZE", 100)),
21
    minpoolsize=int(os.environ.get("MONGO_MIN_POOLSIZE", 10)),
22
    **kwargs,
23
) -> MongoClient:
24
    """mongo_client."""
25
    return MongoClient(
26
        host_seeds.split(","),
27
        username=username,
28
        password=password,
29
        connect=False,
30
        authsource=database,
31
        retrywrites=retrywrites,
32
        retryreads=retryreads,
33
        readpreference=readpreference,
34
        maxpoolsize=maxpoolsize,
35
        minpoolsize=minpoolsize,
36
        **kwargs,
37
    )
38
39
40
def bootstrap_index(
41
    db: MongoClient, collection: str, index: str, direction: int, **kwargs
42
) -> Optional[str]:
43
    """Bootstrap index."""
44
    indexes = set()
45
46
    for value in db[collection].index_information().values():
47
        if "key" not in value:
48
            pass
49
        if value["key"] and isinstance(value["key"], list):
50
            indexes.add(value["key"][0])
51
52
    if (index, direction) not in indexes:
53
        return db[collection].create_index([(index, direction)], **kwargs)
54
55
    return None
56