001_redeploy_evpls_same_vlans   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 147
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 100
dl 0
loc 147
rs 10
c 0
b 0
f 0
wmc 17

6 Functions

Rating   Name   Duplication   Size   Complexity  
B main() 0 46 4
A list_symmetric_evpls() 0 12 3
B update_command() 0 25 6
A is_symmetric_evpl() 0 11 1
A list_command() 0 12 1
A redeploy() 0 9 2
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
import json
4
import sys
5
import logging
6
7
import argparse
8
import asyncio
9
import httpx
10
11
logging.basicConfig(
12
    format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
13
)
14
logger = logging.getLogger(__name__)
15
16
17
def is_symmetric_evpl(evc: dict) -> bool:
18
    """Check whether it's a symmetric (same UNIs vlans) evpl."""
19
    uni_a, uni_z = evc["uni_a"], evc["uni_z"]
20
    return (
21
        "tag" in uni_a
22
        and "tag" in uni_z
23
        and uni_a["tag"]["tag_type"] == "vlan"
24
        and uni_z["tag"]["tag_type"] == "vlan"
25
        and isinstance(uni_a["tag"]["value"], int)
26
        and isinstance(uni_z["tag"]["value"], int)
27
        and uni_a["tag"]["value"] == uni_z["tag"]["value"]
28
    )
29
30
31
async def redeploy(evc_id: str, base_url: str):
32
    """Redeploy."""
33
    endpoint = "/mef_eline/v2/evc"
34
    async with httpx.AsyncClient(base_url=base_url) as client:
35
        res = await client.patch(f"{endpoint}/{evc_id}/redeploy")
36
        logger.info(f"Redeployed evc_id {evc_id}")
37
        assert (
38
            res.status_code == 202
39
        ), f"failed to redeploy evc_id: {evc_id} {res.status_code} {res.text}"
40
41
42
async def list_symmetric_evpls(base_url: str, included_evcs_filter: str = "") -> dict:
43
    """List symmetric (same UNI vlan) evpls."""
44
    endpoint = "/mef_eline/v2/evc"
45
    async with httpx.AsyncClient(base_url=base_url) as client:
46
        resp = await client.get(endpoint, timeout=20)
47
        evcs = {
48
            evc_id: evc for evc_id, evc in resp.json().items() if is_symmetric_evpl(evc)
49
        }
50
        if included_evcs_filter:
51
            included = set(included_evcs_filter.split(","))
52
            evcs = {evc_id: evc for evc_id, evc in evcs.items() if evc_id in included}
53
        return evcs
54
55
56
async def update_command(args: argparse.Namespace) -> None:
57
    """update command.
58
59
    It'll list all symmetric EVPLs (same UNIs vlans) and redeploy them
60
    concurrently. The concurrency slot and wait time can be controlled with
61
    batch_size and batch_sleep_secs
62
63
    If any coroutine fails its exception will be bubbled up.
64
    """
65
    evcs = await list_symmetric_evpls(args.base_url, args.included_evcs_filter)
66
    coros = [redeploy(evc_id, args.base_url) for evc_id, evc in evcs.items()]
67
    batch_size = args.batch_size if args.batch_size > 0 else len(coros)
68
    batch_sleep = args.batch_sleep_secs if args.batch_sleep_secs >= 0 else 0
69
70
    logger.info(
71
        f"It'll redeploy {len(coros)} EVPL(s) using batch_size {batch_size} "
72
        f"and batch_sleep {batch_sleep}"
73
    )
74
75
    for i in range(0, len(coros), batch_size):
76
        sliced = coros[i : i + batch_size]
77
        if i > 0 and batch_sleep:
78
            logger.info(f"Sleeping for {batch_sleep}...")
79
            await asyncio.sleep(batch_sleep)
80
        await asyncio.gather(*sliced)
81
82
83
async def list_command(args: argparse.Namespace) -> None:
84
    """list command."""
85
    evcs = await list_symmetric_evpls(args.base_url, args.included_evcs_filter)
86
    evcs = {
87
        evc_id: {
88
            "name": evc["name"],
89
            "uni_a": evc["uni_a"],
90
            "uni_z": evc["uni_z"],
91
        }
92
        for evc_id, evc in evcs.items()
93
    }
94
    print(json.dumps(evcs))
95
96
97
async def main() -> None:
98
    """Main function."""
99
    parser = argparse.ArgumentParser()
100
    subparsers = parser.add_subparsers(title="commands", dest="command")
101
102
    update_parser = subparsers.add_parser("update", help="Update command")
103
    update_parser.add_argument(
104
        "--batch_sleep_secs", type=int, help="Batch sleep in seconds", default=5
105
    )
106
    update_parser.add_argument("--batch_size", type=int, help="Batch size", default=10)
107
    update_parser.add_argument(
108
        "--included_evcs_filter",
109
        type=str,
110
        help="Included filtered EVC ids separated by comma",
111
        default="",
112
    )
113
    update_parser.add_argument(
114
        "--base_url",
115
        type=str,
116
        default="http://localhost:8181/api/kytos",
117
        help="Kytos-ng API base url",
118
    )
119
120
    list_parser = subparsers.add_parser("list", help="List command")
121
    list_parser.add_argument(
122
        "--base_url",
123
        type=str,
124
        default="http://localhost:8181/api/kytos",
125
        help="Kytos-ng API base url",
126
    )
127
    list_parser.add_argument(
128
        "--included_evcs_filter",
129
        type=str,
130
        help="Included filtered EVC ids separated by comma",
131
        default="",
132
    )
133
    args = parser.parse_args()
134
135
    try:
136
        if args.command == "update":
137
            await update_command(args)
138
        elif args.command == "list":
139
            await list_command(args)
140
    except (httpx.HTTPError, AssertionError) as exc:
141
        logger.exception(f"Error when running '{args.command}': {exc}")
142
        sys.exit(1)
143
144
145
if __name__ == "__main__":
146
    asyncio.run(main())
147