| 1 |  |  | """ This module was created to be the main interface between the telemetry napp and all | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  | other kytos napps' APIs """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 | 1 |  | from collections import defaultdict | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 | 1 |  | from typing import Union | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 | 1 |  | import httpx | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 | 1 |  | from napps.kytos.telemetry_int import settings | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 | 1 |  | from napps.kytos.telemetry_int.exceptions import UnrecoverableError | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 | 1 |  | from tenacity import ( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  |     retry, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  |     retry_if_exception_type, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  |     stop_after_attempt, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |     wait_combine, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  |     wait_fixed, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  |     wait_random, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  | ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 | 1 |  | from kytos.core.retry import before_sleep | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 | 1 |  | @retry( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  |     stop=stop_after_attempt(5), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  |     wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  |     before_sleep=before_sleep, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  |     retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  | ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 | 1 |  | async def get_evcs(**kwargs) -> dict: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  |     """Get EVCs.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 | 1 |  |     archived = "false" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 | 1 |  |     async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 | 1 |  |         endpoint = f"/evc/?archived={archived}" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 | 1 |  |         if kwargs: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 |  |  |             query_args = [f"{k}={v}" for k, v in kwargs.items()] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 |  |  |             endpoint = f"{endpoint}&{'&'.join(query_args)}" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 | 1 |  |         response = await client.get(endpoint, timeout=10) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 | 1 |  |         if response.is_server_error: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 |  |  |             raise httpx.RequestError(response.text) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 | 1 |  |         if not response.is_success: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 |  |  |             raise UnrecoverableError( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 |  |  |                 f"Failed to get_evcs archived {archived}" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 |  |  |                 f"status code {response.status_code}, response text: {response.text}" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  |             ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 | 1 |  |         return response.json() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 |  |  |  | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 47 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 48 | 1 |  | @retry( | 
            
                                                                        
                            
            
                                    
            
            
                | 49 |  |  |     stop=stop_after_attempt(5), | 
            
                                                                        
                            
            
                                    
            
            
                | 50 |  |  |     wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), | 
            
                                                                        
                            
            
                                    
            
            
                | 51 |  |  |     before_sleep=before_sleep, | 
            
                                                                        
                            
            
                                    
            
            
                | 52 |  |  |     retry=retry_if_exception_type(httpx.RequestError), | 
            
                                                                        
                            
            
                                    
            
            
                | 53 |  |  | ) | 
            
                                                                        
                            
            
                                    
            
            
                | 54 | 1 |  | async def get_evc(evc_id: str, exclude_archived=True) -> dict: | 
            
                                                                        
                            
            
                                    
            
            
                | 55 |  |  |     """Get EVC.""" | 
            
                                                                        
                            
            
                                    
            
            
                | 56 | 1 |  |     async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client: | 
            
                                                                        
                            
            
                                    
            
            
                | 57 | 1 |  |         response = await client.get(f"/evc/{evc_id}", timeout=10) | 
            
                                                                        
                            
            
                                    
            
            
                | 58 | 1 |  |         if response.status_code == 404: | 
            
                                                                        
                            
            
                                    
            
            
                | 59 |  |  |             return {} | 
            
                                                                        
                            
            
                                    
            
            
                | 60 | 1 |  |         if response.is_server_error: | 
            
                                                                        
                            
            
                                    
            
            
                | 61 |  |  |             raise httpx.RequestError(response.text) | 
            
                                                                        
                            
            
                                    
            
            
                | 62 | 1 |  |         if not response.is_success: | 
            
                                                                        
                            
            
                                    
            
            
                | 63 |  |  |             raise UnrecoverableError( | 
            
                                                                        
                            
            
                                    
            
            
                | 64 |  |  |                 f"Failed to get_evc id {evc_id} " | 
            
                                                                        
                            
            
                                    
            
            
                | 65 |  |  |                 f"status code {response.status_code}, response text: {response.text}" | 
            
                                                                        
                            
            
                                    
            
            
                | 66 |  |  |             ) | 
            
                                                                        
                            
            
                                    
            
            
                | 67 | 1 |  |         data = response.json() | 
            
                                                                        
                            
            
                                    
            
            
                | 68 | 1 |  |         if data["archived"] and exclude_archived: | 
            
                                                                        
                            
            
                                    
            
            
                | 69 |  |  |             return {} | 
            
                                                                        
                            
            
                                    
            
            
                | 70 | 1 |  |         return {data["id"]: data} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 71 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 72 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 73 | 1 |  | @retry( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 74 |  |  |     stop=stop_after_attempt(5), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 75 |  |  |     wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 76 |  |  |     before_sleep=before_sleep, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 77 |  |  |     retry=retry_if_exception_type(httpx.RequestError), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 78 |  |  | ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 79 | 1 |  | async def get_stored_flows( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 80 |  |  |     cookies: list[Union[int, tuple[int, int]]] = None, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 81 |  |  | ) -> dict[int, list[dict]]: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 82 |  |  |     """Get flow_manager stored_flows grouped by cookies given a list of cookies.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 83 | 1 |  |     cookies = cookies or [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 84 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 85 | 1 |  |     cookie_range_args = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 86 | 1 |  |     for cookie in cookies: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 87 | 1 |  |         if isinstance(cookie, int): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 88 |  |  |             # gte cookie | 
            
                                                                                                            
                            
            
                                    
            
            
                | 89 | 1 |  |             cookie_range_args.append(cookie) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 90 |  |  |             # lte cookie | 
            
                                                                                                            
                            
            
                                    
            
            
                | 91 | 1 |  |             cookie_range_args.append(cookie) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 92 |  |  |         elif isinstance(cookie, tuple) and len(cookie) == 2: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 93 |  |  |             # gte cookie | 
            
                                                                                                            
                            
            
                                    
            
            
                | 94 |  |  |             cookie_range_args.append(cookie[0]) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 95 |  |  |             # lte cookie | 
            
                                                                                                            
                            
            
                                    
            
            
                | 96 |  |  |             cookie_range_args.append(cookie[1]) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 97 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 98 | 1 |  |     endpoint = "stored_flows?state=installed&state=pending" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 99 | 1 |  |     async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 100 | 1 |  |         if cookie_range_args: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 101 | 1 |  |             response = await client.request( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 102 |  |  |                 "GET", | 
            
                                                                                                            
                            
            
                                    
            
            
                | 103 |  |  |                 f"/{endpoint}", | 
            
                                                                                                            
                            
            
                                    
            
            
                | 104 |  |  |                 json={"cookie_range": cookie_range_args}, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 105 |  |  |                 timeout=10, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 106 |  |  |             ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 107 |  |  |         else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 108 | 1 |  |             response = await client.get(f"/{endpoint}", timeout=10) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 109 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 110 | 1 |  |         if response.is_server_error: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 111 |  |  |             raise httpx.RequestError(response.text) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 112 | 1 |  |         if not response.is_success: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 113 |  |  |             raise UnrecoverableError( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 114 |  |  |                 f"Failed to get_stored_flows cookies {cookies} " | 
            
                                                                                                            
                            
            
                                    
            
            
                | 115 |  |  |                 f"status code {response.status_code}, response text: {response.text}" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 116 |  |  |             ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 117 | 1 |  |         return _map_stored_flows_by_cookies(response.json()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 118 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 119 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 120 | 1 |  | def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 121 |  |  |     """Map stored flows by cookies. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 122 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 123 |  |  |     This is for mapping the data by cookies, just to it can be | 
            
                                                                                                            
                            
            
                                    
            
            
                | 124 |  |  |     reused upfront by bulk operations. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 125 |  |  |     """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 126 | 1 |  |     flows_by_cookies = defaultdict(list) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 127 | 1 |  |     for flows in stored_flows.values(): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 128 | 1 |  |         for flow in flows: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 129 | 1 |  |             flows_by_cookies[flow["flow"]["cookie"]].append(flow) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 130 | 1 |  |     return flows_by_cookies | 
            
                                                                                                            
                            
            
                                    
            
            
                | 131 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 132 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 133 | 1 |  | @retry( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 134 |  |  |     stop=stop_after_attempt(5), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 135 |  |  |     wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 136 |  |  |     before_sleep=before_sleep, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 137 |  |  |     retry=retry_if_exception_type(httpx.RequestError), | 
            
                                                                                                            
                            
            
                                    
            
            
                | 138 |  |  | ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 139 | 1 |  | async def add_evcs_metadata( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 140 |  |  |     evcs: dict[str, dict], new_metadata: dict, force=False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 141 |  |  | ) -> dict: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 142 |  |  |     """Add EVC metadata.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 143 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 144 |  |  |     circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 145 |  |  |     # return early if there's no circuits to update their metadata | 
            
                                                                                                            
                            
            
                                    
            
            
                | 146 |  |  |     if not circuit_ids: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 147 |  |  |         return {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 148 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 149 |  |  |     async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 150 |  |  |         response = await client.post( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 151 |  |  |             "/evc/metadata", | 
            
                                                                                                            
                            
            
                                    
            
            
                | 152 |  |  |             timeout=10, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 153 |  |  |             json={ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 154 |  |  |                 **new_metadata, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 155 |  |  |                 **{"circuit_ids": circuit_ids}, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 156 |  |  |             }, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 157 |  |  |         ) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 158 |  |  |         if response.is_success: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 159 |  |  |             return response.json() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 160 |  |  |         # Ignore 404 if force just so it's easier to handle this concurrently | 
            
                                                                                                            
                            
            
                                    
            
            
                | 161 |  |  |         if response.status_code == 404 and force: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 162 |  |  |             return {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 163 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 164 |  |  |         if response.is_server_error: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 165 |  |  |             raise httpx.RequestError(response.text) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 166 |  |  |         raise UnrecoverableError( | 
            
                                                                                                            
                            
            
                                    
            
            
                | 167 |  |  |             f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} " | 
            
                                                                                                            
                            
            
                                    
            
            
                | 168 |  |  |             f"status code {response.status_code}, response text: {response.text}" | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 169 |  |  |         ) | 
            
                                                        
            
                                    
            
            
                | 170 |  |  |  |