000_storehouse_to_mongo.load_boxes_data()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
import json
5
import glob
6
import pickle
7
import os
8
import sys
9
from typing import Any, List, Tuple
10
from napps.kytos.mef_eline.controllers import ELineController
11
from concurrent.futures import ThreadPoolExecutor, as_completed
12
13
eline_controller = ELineController()
14
15
16
def get_storehouse_dir() -> str:
17
    return os.environ["STOREHOUSE_NAMESPACES_DIR"]
18
19
20
def _list_boxes_files(namespace: str, storehouse_dir=get_storehouse_dir()) -> dict:
21
    """List boxes files given the storehouse dir."""
22
    if storehouse_dir.endswith(os.path.sep):
23
        storehouse_dir = storehouse_dir[:-1]
24
    return {
25
        file_name.split(os.path.sep)[-2]: file_name
26
        for file_name in glob.glob(f"{storehouse_dir}/{namespace}**/*", recursive=True)
27
    }
28
29
30
def _load_from_file(file_name) -> Any:
31
    with open(file_name, "rb") as load_file:
32
        return pickle.load(load_file)
33
34
35
def load_boxes_data(namespace: str) -> dict:
36
    """Load boxes data."""
37
    return {k: _load_from_file(v).data for k, v in _list_boxes_files(namespace).items()}
38
39
40
def load_mef_eline_evcs() -> List[dict]:
41
    """Load mef_eline evcs."""
42
    namespace = "kytos.mef_eline.circuits"
43
    content = load_boxes_data(namespace)
44
    if namespace not in content:
45
        return ([], [])
46
47
    content = content[namespace]
48
49
    evcs = []
50
    for evc in content.values():
51
        evcs.append(evc)
52
53
    return evcs
54
55
56
def insert_from_mef_eline_evcs(
57
    eline_controller=eline_controller,
58
) -> List[dict]:
59
    """Insert from mef_eline evcs."""
60
    loaded_evcs = load_mef_eline_evcs()
61
62
    insert_evcs = []
63
    with ThreadPoolExecutor(max_workers=len(loaded_evcs)) as executor:
64
        futures = [
65
            executor.submit(eline_controller.upsert_evc, evc)
66
            for evc in loaded_evcs
67
        ]
68
        for future in as_completed(futures):
69
            response = future.result()
70
            insert_evcs.append(response)
71
72
    return insert_evcs
73
74
75
if __name__ == "__main__":
76
    cmds = {
77
        "insert_evcs": insert_from_mef_eline_evcs,
78
        "load_evcs": lambda: json.dumps(load_mef_eline_evcs()),
79
    }
80
    try:
81
        cmd = os.environ["CMD"]
82
    except KeyError:
83
        print("Please set the 'CMD' env var.")
84
        sys.exit(1)
85
    try:
86
        for command in cmd.split(","):
87
            print(cmds[command]())
88
    except KeyError as e:
89
        print(
90
            f"Unknown cmd: {str(e)}. 'CMD' env var has to be one of these {list(cmds.keys())}."
91
        )
92
        sys.exit(1)
93