000_storehouse_to_mongo.load_topology_status()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 27
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 21
nop 0
dl 0
loc 27
rs 7.9759
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
import json
5
import glob
6
import pickle
7
import os
8
import sys
9
from typing import Any, List, Tuple
10
from napps.kytos.topology.controllers import TopoController
11
from concurrent.futures import ThreadPoolExecutor, as_completed
12
13
topo_controller = TopoController()
14
15
16
def get_storehouse_dir() -> str:
17
    return os.environ["STOREHOUSE_NAMESPACES_DIR"]
18
19
20
def _list_boxes_files(namespace: str, storehouse_dir=get_storehouse_dir()) -> dict:
21
    """List boxes files given the storehouse dir."""
22
    if storehouse_dir.endswith(os.path.sep):
23
        storehouse_dir = storehouse_dir[:-1]
24
    return {
25
        file_name.split(os.path.sep)[-2]: file_name
26
        for file_name in glob.glob(f"{storehouse_dir}/{namespace}**/*", recursive=True)
27
    }
28
29
30
def _load_from_file(file_name) -> Any:
31
    with open(file_name, "rb") as load_file:
32
        return pickle.load(load_file)
33
34
35
def load_boxes_data(namespace: str) -> dict:
36
    """Load boxes data."""
37
    return {k: _load_from_file(v).data for k, v in _list_boxes_files(namespace).items()}
38
39
40
def load_topology_status() -> Tuple[List[dict], List[dict]]:
41
    """Load topology status."""
42
    namespace = "kytos.topology.status"
43
    content = load_boxes_data(namespace)
44
    if namespace not in content:
45
        return ([], [])
46
47
    content = content[namespace]
48
    if "network_status" not in content:
49
        return ([], [])
50
    if "switches" not in content["network_status"]:
51
        return ([], [])
52
53
    links_status = content["network_status"].get("links", {})
54
55
    switches = []
56
    for switch in content["network_status"]["switches"].values():
57
        switch["_id"] = switch["id"]
58
        switches.append(switch)
59
60
    links = []
61
    for link_values in links_status.values():
62
        if "id" in link_values:
63
            link_values["_id"] = link_values["id"]
64
            links.append(link_values)
65
66
    return (switches, links)
67
68
69
def insert_from_topology_status(
70
    topo_controller=topo_controller,
71
) -> Tuple[List[dict], List[dict]]:
72
    """Insert from topology status."""
73
    loaded_switches, loaded_links = load_topology_status()
74
75
    insert_switches = []
76
    with ThreadPoolExecutor(max_workers=len(loaded_switches)) as executor:
77
        futures = [
78
            executor.submit(topo_controller.upsert_switch, switch["id"], switch)
79
            for switch in loaded_switches
80
        ]
81
        for future in as_completed(futures):
82
            response = future.result()
83
            insert_switches.append(response)
84
85
    insert_links = []
86
    with ThreadPoolExecutor(max_workers=len(loaded_links)) as executor:
87
        futures = [
88
            executor.submit(topo_controller.upsert_link, link["id"], link)
89
            for link in loaded_links
90
        ]
91
        for future in as_completed(futures):
92
            response = future.result()
93
            insert_switches.append(response)
94
95
    return (insert_switches, insert_links)
96
97
98
def load_topology_metadata(entity: str) -> dict:
99
    """Load topology metadata."""
100
    namespace = f"kytos.topology.{entity}.metadata"
101
    content = load_boxes_data(namespace)
102
    if namespace not in content:
103
        return {}
104
    content = content[namespace]
105
    return content
106
107
108
def insert_from_topology_switches_metadata(
109
    topo_controller=topo_controller,
110
) -> List[dict]:
111
    """Insert from topology switches metadata namespace."""
112
    switches = load_topology_metadata("switches")
113
    responses = []
114
    with ThreadPoolExecutor(max_workers=len(switches)) as executor:
115
        futures = [
116
            executor.submit(topo_controller.add_switch_metadata, dpid, metadata)
117
            for dpid, metadata in switches.items()
118
        ]
119
        for future in as_completed(futures):
120
            response = future.result()
121
            responses.append(response)
122
    return responses
123
124
125
def insert_from_topology_interfaces_metadata(
126
    topo_controller=topo_controller,
127
) -> List[dict]:
128
    """Insert from topology interfaces metadata namespace."""
129
    interfaces = load_topology_metadata("interfaces")
130
    responses = []
131
    with ThreadPoolExecutor(max_workers=len(interfaces)) as executor:
132
        futures = [
133
            executor.submit(
134
                topo_controller.add_interface_metadata, interface_id, metadata
135
            )
136
            for interface_id, metadata in interfaces.items()
137
        ]
138
        for future in as_completed(futures):
139
            response = future.result()
140
            responses.append(response)
141
    return responses
142
143
144
def insert_from_topology_links_metadata(topo_controller=topo_controller) -> List[dict]:
145
    """Insert from topology links metadata namespace."""
146
    links = load_topology_metadata("links")
147
    responses = []
148
    with ThreadPoolExecutor(max_workers=len(links)) as executor:
149
        futures = [
150
            executor.submit(topo_controller.add_link_metadata, link_id, metadata)
151
            for link_id, metadata in links.items()
152
        ]
153
        for future in as_completed(futures):
154
            response = future.result()
155
            responses.append(response)
156
    return responses
157
158
159
if __name__ == "__main__":
160
    cmds = {
161
        "insert_links_metadata": insert_from_topology_links_metadata,
162
        "insert_switches_metadata": insert_from_topology_switches_metadata,
163
        "insert_interfaces_metadata": insert_from_topology_interfaces_metadata,
164
        "insert_topology": insert_from_topology_status,
165
        "load_topology": lambda: json.dumps(load_topology_status()),
166
        "load_switches_metadata": lambda: json.dumps(
167
            load_topology_metadata("switches")
168
        ),
169
        "load_interfaces_metadata": lambda: json.dumps(
170
            load_topology_metadata("interfaces")
171
        ),
172
        "load_links_metadata": lambda: json.dumps(load_topology_metadata("links")),
173
    }
174
    try:
175
        cmd = os.environ["CMD"]
176
    except KeyError:
177
        print("Please set the 'CMD' env var.")
178
        sys.exit(1)
179
    try:
180
        for command in cmd.split(","):
181
            print(cmds[command]())
182
    except KeyError as e:
183
        print(
184
            f"Unknown cmd: {str(e)}. 'CMD' env var has to be one of these {list(cmds.keys())}."
185
        )
186
        sys.exit(1)
187