1 | """Main module of kytos/mef_eline Kytos Network Application. |
||
2 | |||
3 | NApp to provision circuits from user request. |
||
4 | """ |
||
5 | 1 | from flask import jsonify, request |
|
6 | 1 | from werkzeug.exceptions import (BadRequest, Conflict, Forbidden, |
|
7 | MethodNotAllowed, NotFound, |
||
8 | UnsupportedMediaType) |
||
9 | |||
10 | 1 | from kytos.core import KytosNApp, log, rest |
|
11 | 1 | from kytos.core.events import KytosEvent |
|
12 | 1 | from kytos.core.helpers import listen_to |
|
13 | 1 | from kytos.core.interface import TAG, UNI |
|
14 | 1 | from kytos.core.link import Link |
|
15 | 1 | from napps.kytos.mef_eline import settings |
|
16 | 1 | from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path |
|
17 | 1 | from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler |
|
18 | 1 | from napps.kytos.mef_eline.storehouse import StoreHouse |
|
19 | |||
20 | |||
21 | 1 | class Main(KytosNApp): |
|
22 | """Main class of amlight/mef_eline NApp. |
||
23 | |||
24 | This class is the entry point for this napp. |
||
25 | """ |
||
26 | |||
27 | 1 | def setup(self): |
|
28 | """Replace the '__init__' method for the KytosNApp subclass. |
||
29 | |||
30 | The setup method is automatically called by the controller when your |
||
31 | application is loaded. |
||
32 | |||
33 | So, if you have any setup routine, insert it here. |
||
34 | """ |
||
35 | # object used to scheduler circuit events |
||
36 | 1 | self.sched = Scheduler() |
|
37 | |||
38 | # object to save and load circuits |
||
39 | 1 | self.storehouse = StoreHouse(self.controller) |
|
40 | |||
41 | # set the controller that will manager the dynamic paths |
||
42 | 1 | DynamicPathManager.set_controller(self.controller) |
|
43 | |||
44 | # dictionary of EVCs created. It acts as a circuit buffer. |
||
45 | # Every create/update/delete must be synced to storehouse. |
||
46 | 1 | self.circuits = {} |
|
47 | |||
48 | # dictionary of EVCs by interface |
||
49 | 1 | self._circuits_by_interface = {} |
|
50 | |||
51 | 1 | self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL) |
|
52 | |||
53 | 1 | def execute(self): |
|
54 | """Execute once when the napp is running.""" |
||
55 | for circuit in self.circuits.values(): |
||
56 | if circuit.is_enabled() and not circuit.is_active(): |
||
57 | circuit.deploy() |
||
58 | |||
59 | 1 | def shutdown(self): |
|
60 | """Execute when your napp is unloaded. |
||
61 | |||
62 | If you have some cleanup procedure, insert it here. |
||
63 | """ |
||
64 | |||
65 | 1 | @rest('/v2/evc/', methods=['GET']) |
|
66 | def list_circuits(self): |
||
67 | """Endpoint to return circuits stored. |
||
68 | |||
69 | If archived is set to True return all circuits, else only the ones |
||
70 | not archived. |
||
71 | """ |
||
72 | 1 | log.debug('list_circuits /v2/evc') |
|
73 | 1 | archived = request.args.get('archived', False) |
|
74 | 1 | circuits = self.storehouse.get_data() |
|
75 | 1 | if not circuits: |
|
76 | 1 | return jsonify({}), 200 |
|
77 | 1 | if archived: |
|
78 | 1 | return jsonify(circuits), 200 |
|
79 | 1 | return jsonify({circuit_id: circuit |
|
80 | for circuit_id, circuit in circuits.items() |
||
81 | if not circuit.get('archived', False)}), 200 |
||
82 | |||
83 | 1 | @rest('/v2/evc/<circuit_id>', methods=['GET']) |
|
84 | def get_circuit(self, circuit_id): |
||
85 | """Endpoint to return a circuit based on id.""" |
||
86 | 1 | log.debug('get_circuit /v2/evc/%s', circuit_id) |
|
87 | 1 | circuits = self.storehouse.get_data() |
|
88 | |||
89 | 1 | try: |
|
90 | 1 | result = circuits[circuit_id] |
|
91 | 1 | except KeyError: |
|
92 | 1 | result = f'circuit_id {circuit_id} not found' |
|
93 | 1 | log.debug('get_circuit result %s %s', result, 404) |
|
94 | 1 | raise NotFound(result) |
|
95 | |||
96 | 1 | status = 200 |
|
97 | 1 | log.debug('get_circuit result %s %s', result, status) |
|
98 | 1 | return jsonify(result), status |
|
99 | |||
100 | 1 | @rest('/v2/evc/', methods=['POST']) |
|
101 | def create_circuit(self): |
||
102 | """Try to create a new circuit. |
||
103 | |||
104 | Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and |
||
105 | UNI_Z's requested C-VID are available from the interfaces' pools. This |
||
106 | is checked when creating the UNI object. |
||
107 | |||
108 | Then, E-Line NApp requests a primary and a backup path to the |
||
109 | Pathfinder NApp using the attributes primary_links and backup_links |
||
110 | submitted via REST |
||
111 | |||
112 | # For each link composing paths in #3: |
||
113 | # - E-Line NApp requests a S-VID available from the link VLAN pool. |
||
114 | # - Using the S-VID obtained, generate abstract flow entries to be |
||
115 | # sent to FlowManager |
||
116 | |||
117 | Push abstract flow entries to FlowManager and FlowManager pushes |
||
118 | OpenFlow entries to datapaths |
||
119 | |||
120 | E-Line NApp generates an event to notify all Kytos NApps of a new EVC |
||
121 | creation |
||
122 | |||
123 | Finnaly, notify user of the status of its request. |
||
124 | """ |
||
125 | # Try to create the circuit object |
||
126 | 1 | log.debug('create_circuit /v2/evc/') |
|
127 | 1 | try: |
|
128 | 1 | data = request.get_json() |
|
129 | 1 | except BadRequest: |
|
130 | 1 | result = 'The request body is not a well-formed JSON.' |
|
131 | 1 | log.debug('create_circuit result %s %s', result, 400) |
|
132 | 1 | raise BadRequest(result) |
|
133 | |||
134 | 1 | if data is None: |
|
135 | 1 | result = 'The request body mimetype is not application/json.' |
|
136 | 1 | log.debug('create_circuit result %s %s', result, 415) |
|
137 | 1 | raise UnsupportedMediaType(result) |
|
138 | 1 | try: |
|
139 | 1 | evc = self._evc_from_dict(data) |
|
140 | 1 | except ValueError as exception: |
|
141 | 1 | log.debug('create_circuit result %s %s', exception, 400) |
|
142 | 1 | raise BadRequest(str(exception)) |
|
143 | |||
144 | # verify duplicated evc |
||
145 | 1 | if self._is_duplicated_evc(evc): |
|
146 | 1 | result = "The EVC already exists." |
|
147 | 1 | log.debug('create_circuit result %s %s', result, 409) |
|
148 | 1 | raise Conflict(result) |
|
149 | |||
150 | # store circuit in dictionary |
||
151 | 1 | self.circuits[evc.id] = evc |
|
152 | |||
153 | # save circuit |
||
154 | 1 | self.storehouse.save_evc(evc) |
|
155 | |||
156 | # Schedule the circuit deploy |
||
157 | 1 | self.sched.add(evc) |
|
158 | |||
159 | # Circuit has no schedule, deploy now |
||
160 | 1 | if not evc.circuit_scheduler: |
|
161 | 1 | evc.deploy() |
|
162 | |||
163 | # Notify users |
||
164 | 1 | event = KytosEvent(name='kytos.mef_eline.created', |
|
165 | content=evc.as_dict()) |
||
166 | 1 | self.controller.buffers.app.put(event) |
|
167 | |||
168 | 1 | result = {"circuit_id": evc.id} |
|
169 | 1 | status = 201 |
|
170 | 1 | log.debug('create_circuit result %s %s', result, status) |
|
171 | 1 | return jsonify(result), status |
|
172 | |||
173 | 1 | @rest('/v2/evc/<circuit_id>', methods=['PATCH']) |
|
174 | def update(self, circuit_id): |
||
175 | """Update a circuit based on payload. |
||
176 | |||
177 | The EVC required attributes (name, uni_a, uni_z) can't be updated. |
||
178 | """ |
||
179 | 1 | log.debug('update /v2/evc/%s', circuit_id) |
|
180 | 1 | try: |
|
181 | 1 | evc = self.circuits[circuit_id] |
|
182 | 1 | except KeyError: |
|
183 | 1 | result = f'circuit_id {circuit_id} not found' |
|
184 | 1 | log.debug('update result %s %s', result, 404) |
|
185 | 1 | raise NotFound(result) |
|
186 | |||
187 | 1 | if evc.archived: |
|
188 | 1 | result = "Can't update archived EVC" |
|
189 | 1 | log.debug('update result %s %s', result, 405) |
|
190 | 1 | raise MethodNotAllowed(['GET'], result) |
|
191 | |||
192 | 1 | try: |
|
193 | 1 | data = request.get_json() |
|
194 | 1 | except BadRequest: |
|
195 | 1 | result = 'The request body is not a well-formed JSON.' |
|
196 | 1 | log.debug('update result %s %s', result, 400) |
|
197 | 1 | raise BadRequest(result) |
|
198 | 1 | if data is None: |
|
199 | 1 | result = 'The request body mimetype is not application/json.' |
|
200 | 1 | log.debug('update result %s %s', result, 415) |
|
201 | 1 | raise UnsupportedMediaType(result) |
|
202 | |||
203 | 1 | try: |
|
204 | 1 | enable, path = \ |
|
205 | evc.update(**self._evc_dict_with_instances(data)) |
||
206 | except ValueError as exception: |
||
207 | log.error(exception) |
||
208 | log.debug('update result %s %s', exception, 400) |
||
209 | raise BadRequest(str(exception)) |
||
210 | |||
211 | 1 | if evc.is_active(): |
|
212 | 1 | if enable is False: # disable if active |
|
213 | evc.remove() |
||
214 | 1 | elif path is not None: # redeploy if active |
|
215 | 1 | evc.remove() |
|
216 | 1 | evc.deploy() |
|
217 | else: |
||
218 | 1 | if enable is True: # enable if inactive |
|
219 | 1 | evc.deploy() |
|
220 | 1 | result = {evc.id: evc.as_dict()} |
|
221 | 1 | status = 200 |
|
222 | |||
223 | 1 | log.debug('update result %s %s', result, status) |
|
224 | 1 | return jsonify(result), status |
|
225 | |||
226 | 1 | @rest('/v2/evc/<circuit_id>', methods=['DELETE']) |
|
227 | def delete_circuit(self, circuit_id): |
||
228 | """Remove a circuit. |
||
229 | |||
230 | First, the flows are removed from the switches, and then the EVC is |
||
231 | disabled. |
||
232 | """ |
||
233 | 1 | log.debug('delete_circuit /v2/evc/%s', circuit_id) |
|
234 | 1 | try: |
|
235 | 1 | evc = self.circuits[circuit_id] |
|
236 | 1 | except KeyError: |
|
237 | 1 | result = f'circuit_id {circuit_id} not found' |
|
238 | 1 | log.debug('delete_circuit result %s %s', result, 404) |
|
239 | 1 | raise NotFound(result) |
|
240 | |||
241 | 1 | if evc.archived: |
|
242 | result = f'Circuit {circuit_id} already removed' |
||
243 | log.debug('delete_circuit result %s %s', result, 404) |
||
244 | raise NotFound(result) |
||
245 | |||
246 | 1 | log.info('Removing %s', evc) |
|
247 | 1 | evc.remove_current_flows() |
|
248 | 1 | evc.deactivate() |
|
249 | 1 | evc.disable() |
|
250 | 1 | self.sched.remove(evc) |
|
251 | 1 | evc.archive() |
|
252 | 1 | evc.sync() |
|
253 | 1 | log.info('EVC removed. %s', evc) |
|
254 | 1 | result = {'response': f'Circuit {circuit_id} removed'} |
|
255 | 1 | status = 200 |
|
256 | |||
257 | 1 | log.debug('delete_circuit result %s %s', result, status) |
|
258 | 1 | return jsonify(result), status |
|
259 | |||
260 | 1 | @rest('/v2/evc/schedule', methods=['GET']) |
|
261 | def list_schedules(self): |
||
262 | """Endpoint to return all schedules stored for all circuits. |
||
263 | |||
264 | Return a JSON with the following template: |
||
265 | [{"schedule_id": <schedule_id>, |
||
266 | "circuit_id": <circuit_id>, |
||
267 | "schedule": <schedule object>}] |
||
268 | """ |
||
269 | 1 | log.debug('list_schedules /v2/evc/schedule') |
|
270 | 1 | circuits = self.storehouse.get_data().values() |
|
271 | 1 | if not circuits: |
|
272 | 1 | result = {} |
|
273 | 1 | status = 200 |
|
274 | 1 | return jsonify(result), status |
|
275 | |||
276 | 1 | result = [] |
|
277 | 1 | status = 200 |
|
278 | 1 | for circuit in circuits: |
|
279 | 1 | circuit_scheduler = circuit.get("circuit_scheduler") |
|
280 | 1 | if circuit_scheduler: |
|
281 | 1 | for scheduler in circuit_scheduler: |
|
282 | 1 | value = {"schedule_id": scheduler.get("id"), |
|
283 | "circuit_id": circuit.get("id"), |
||
284 | "schedule": scheduler} |
||
285 | 1 | result.append(value) |
|
286 | |||
287 | 1 | log.debug('list_schedules result %s %s', result, status) |
|
288 | 1 | return jsonify(result), status |
|
289 | |||
290 | 1 | @rest('/v2/evc/schedule/', methods=['POST']) |
|
291 | def create_schedule(self): |
||
292 | """ |
||
293 | Create a new schedule for a given circuit. |
||
294 | |||
295 | This service do no check if there are conflicts with another schedule. |
||
296 | Payload example: |
||
297 | { |
||
298 | "circuit_id":"aa:bb:cc", |
||
299 | "schedule": { |
||
300 | "date": "2019-08-07T14:52:10.967Z", |
||
301 | "interval": "string", |
||
302 | "frequency": "1 * * * *", |
||
303 | "action": "create" |
||
304 | } |
||
305 | } |
||
306 | """ |
||
307 | 1 | log.debug('create_schedule /v2/evc/schedule/') |
|
308 | |||
309 | 1 | json_data = self.json_from_request('create_schedule') |
|
310 | 1 | try: |
|
311 | 1 | circuit_id = json_data['circuit_id'] |
|
312 | except TypeError: |
||
313 | result = 'The payload should have a dictionary.' |
||
314 | log.debug('create_schedule result %s %s', result, 400) |
||
315 | raise BadRequest(result) |
||
316 | except KeyError: |
||
317 | result = 'Missing circuit_id.' |
||
318 | log.debug('create_schedule result %s %s', result, 400) |
||
319 | raise BadRequest(result) |
||
320 | |||
321 | 1 | try: |
|
322 | 1 | schedule_data = json_data['schedule'] |
|
323 | except KeyError: |
||
324 | result = 'Missing schedule data.' |
||
325 | log.debug('create_schedule result %s %s', result, 400) |
||
326 | raise BadRequest(result) |
||
327 | |||
328 | # Get EVC from circuits buffer |
||
329 | 1 | circuits = self._get_circuits_buffer() |
|
330 | |||
331 | # get the circuit |
||
332 | 1 | evc = circuits.get(circuit_id) |
|
333 | |||
334 | # get the circuit |
||
335 | 1 | if not evc: |
|
336 | result = f'circuit_id {circuit_id} not found' |
||
337 | log.debug('create_schedule result %s %s', result, 404) |
||
338 | raise NotFound(result) |
||
339 | # Can not modify circuits deleted and archived |
||
340 | 1 | if evc.archived: |
|
341 | result = f'Circuit {circuit_id} is archived. Update is forbidden.' |
||
342 | log.debug('create_schedule result %s %s', result, 403) |
||
343 | raise Forbidden(result) |
||
344 | |||
345 | # new schedule from dict |
||
346 | 1 | new_schedule = CircuitSchedule.from_dict(schedule_data) |
|
347 | |||
348 | # If there is no schedule, create the list |
||
349 | 1 | if not evc.circuit_scheduler: |
|
350 | evc.circuit_scheduler = [] |
||
351 | |||
352 | # Add the new schedule |
||
353 | 1 | evc.circuit_scheduler.append(new_schedule) |
|
354 | |||
355 | # Add schedule job |
||
356 | 1 | self.sched.add_circuit_job(evc, new_schedule) |
|
357 | |||
358 | # save circuit to storehouse |
||
359 | 1 | evc.sync() |
|
360 | |||
361 | 1 | result = new_schedule.as_dict() |
|
362 | 1 | status = 201 |
|
363 | |||
364 | 1 | log.debug('create_schedule result %s %s', result, status) |
|
365 | 1 | return jsonify(result), status |
|
366 | |||
367 | 1 | @rest('/v2/evc/schedule/<schedule_id>', methods=['PATCH']) |
|
368 | def update_schedule(self, schedule_id): |
||
369 | """Update a schedule. |
||
370 | |||
371 | Change all attributes from the given schedule from a EVC circuit. |
||
372 | The schedule ID is preserved as default. |
||
373 | Payload example: |
||
374 | { |
||
375 | "date": "2019-08-07T14:52:10.967Z", |
||
376 | "interval": "string", |
||
377 | "frequency": "1 * * *", |
||
378 | "action": "create" |
||
379 | } |
||
380 | """ |
||
381 | 1 | log.debug('update_schedule /v2/evc/schedule/%s', schedule_id) |
|
382 | |||
383 | # Try to find a circuit schedule |
||
384 | 1 | evc, found_schedule = self._find_evc_by_schedule_id(schedule_id) |
|
385 | |||
386 | # Can not modify circuits deleted and archived |
||
387 | 1 | if not found_schedule: |
|
388 | result = f'schedule_id {schedule_id} not found' |
||
389 | log.debug('update_schedule result %s %s', result, 404) |
||
390 | raise NotFound(result) |
||
391 | 1 | if evc.archived: |
|
392 | 1 | result = f'Circuit {evc.id} is archived. Update is forbidden.' |
|
393 | 1 | log.debug('update_schedule result %s %s', result, 403) |
|
394 | 1 | raise Forbidden(result) |
|
395 | |||
396 | 1 | data = self.json_from_request('update_schedule') |
|
397 | |||
398 | 1 | new_schedule = CircuitSchedule.from_dict(data) |
|
399 | 1 | new_schedule.id = found_schedule.id |
|
400 | # Remove the old schedule |
||
401 | 1 | evc.circuit_scheduler.remove(found_schedule) |
|
402 | # Append the modified schedule |
||
403 | 1 | evc.circuit_scheduler.append(new_schedule) |
|
404 | |||
405 | # Cancel all schedule jobs |
||
406 | 1 | self.sched.cancel_job(found_schedule.id) |
|
407 | # Add the new circuit schedule |
||
408 | 1 | self.sched.add_circuit_job(evc, new_schedule) |
|
409 | # Save EVC to the storehouse |
||
410 | 1 | evc.sync() |
|
411 | |||
412 | 1 | result = new_schedule.as_dict() |
|
413 | 1 | status = 200 |
|
414 | |||
415 | 1 | log.debug('update_schedule result %s %s', result, status) |
|
416 | 1 | return jsonify(result), status |
|
417 | |||
418 | 1 | @rest('/v2/evc/schedule/<schedule_id>', methods=['DELETE']) |
|
419 | def delete_schedule(self, schedule_id): |
||
420 | """Remove a circuit schedule. |
||
421 | |||
422 | Remove the Schedule from EVC. |
||
423 | Remove the Schedule from cron job. |
||
424 | Save the EVC to the Storehouse. |
||
425 | """ |
||
426 | 1 | log.debug('delete_schedule /v2/evc/schedule/%s', schedule_id) |
|
427 | 1 | evc, found_schedule = self._find_evc_by_schedule_id(schedule_id) |
|
428 | |||
429 | # Can not modify circuits deleted and archived |
||
430 | 1 | if not found_schedule: |
|
431 | result = f'schedule_id {schedule_id} not found' |
||
432 | log.debug('delete_schedule result %s %s', result, 404) |
||
433 | raise NotFound(result) |
||
434 | |||
435 | 1 | if evc.archived: |
|
436 | 1 | result = f'Circuit {evc.id} is archived. Update is forbidden.' |
|
437 | 1 | log.debug('delete_schedule result %s %s', result, 403) |
|
438 | 1 | raise Forbidden(result) |
|
439 | |||
440 | # Remove the old schedule |
||
441 | 1 | evc.circuit_scheduler.remove(found_schedule) |
|
442 | |||
443 | # Cancel all schedule jobs |
||
444 | 1 | self.sched.cancel_job(found_schedule.id) |
|
445 | # Save EVC to the storehouse |
||
446 | 1 | evc.sync() |
|
447 | |||
448 | 1 | result = "Schedule removed" |
|
449 | 1 | status = 200 |
|
450 | |||
451 | 1 | log.debug('delete_schedule result %s %s', result, status) |
|
452 | 1 | return jsonify(result), status |
|
453 | |||
454 | 1 | def _is_duplicated_evc(self, evc): |
|
455 | """Verify if the circuit given is duplicated with the stored evcs. |
||
456 | |||
457 | Args: |
||
458 | evc (EVC): circuit to be analysed. |
||
459 | |||
460 | Returns: |
||
461 | boolean: True if the circuit is duplicated, otherwise False. |
||
462 | |||
463 | """ |
||
464 | 1 | for circuit in self.circuits.values(): |
|
465 | 1 | if not circuit.archived and circuit.shares_uni(evc): |
|
466 | 1 | return True |
|
467 | 1 | return False |
|
468 | |||
469 | 1 | @listen_to('kytos/topology.link_up') |
|
470 | def handle_link_up(self, event): |
||
471 | """Change circuit when link is up or end_maintenance.""" |
||
472 | 1 | log.debug("Event handle_link_up %s", event) |
|
473 | 1 | for evc in self.circuits.values(): |
|
474 | 1 | if evc.is_enabled() and not evc.archived: |
|
475 | 1 | with evc.lock: |
|
476 | 1 | evc.handle_link_up(event.content['link']) |
|
477 | |||
478 | 1 | @listen_to('kytos/topology.link_down') |
|
479 | def handle_link_down(self, event): |
||
480 | """Change circuit when link is down or under_mantenance.""" |
||
481 | 1 | log.debug("Event handle_link_down %s", event) |
|
482 | 1 | for evc in self.circuits.values(): |
|
483 | 1 | with evc.lock: |
|
484 | 1 | if evc.is_affected_by_link(event.content['link']): |
|
485 | 1 | log.debug(f'Handling evc {evc.id} on link down') |
|
486 | 1 | evc.handle_link_down() |
|
487 | |||
488 | 1 | def load_circuits_by_interface(self, circuits): |
|
489 | """Load circuits in storehouse for in-memory dictionary.""" |
||
490 | 1 | for circuit_id, circuit in circuits.items(): |
|
491 | 1 | intf_a = circuit['uni_a']['interface_id'] |
|
492 | 1 | self.add_to_dict_of_sets(intf_a, circuit_id) |
|
493 | 1 | intf_z = circuit['uni_z']['interface_id'] |
|
494 | 1 | self.add_to_dict_of_sets(intf_z, circuit_id) |
|
495 | 1 | for path in ('current_path', 'primary_path', 'backup_path'): |
|
496 | 1 | for link in circuit[path]: |
|
497 | 1 | intf_a = link['endpoint_a']['id'] |
|
498 | 1 | self.add_to_dict_of_sets(intf_a, circuit_id) |
|
499 | 1 | intf_b = link['endpoint_b']['id'] |
|
500 | 1 | self.add_to_dict_of_sets(intf_b, circuit_id) |
|
501 | |||
502 | 1 | def add_to_dict_of_sets(self, intf, circuit_id): |
|
503 | """Add a single item to the dictionary of circuits by interface.""" |
||
504 | 1 | if intf not in self._circuits_by_interface: |
|
505 | 1 | self._circuits_by_interface[intf] = set() |
|
506 | 1 | self._circuits_by_interface[intf].add(circuit_id) |
|
507 | |||
508 | 1 | @listen_to('kytos/topology.port.created') |
|
509 | def load_evcs(self, event): |
||
510 | """Try to load the unloaded EVCs from storehouse.""" |
||
511 | log.debug("Event load_evcs %s", event) |
||
512 | circuits = self.storehouse.get_data() |
||
513 | if not self._circuits_by_interface: |
||
514 | self.load_circuits_by_interface(circuits) |
||
515 | |||
516 | interface_id = '{}:{}'.format(event.content['switch'], |
||
517 | event.content['port']) |
||
518 | |||
519 | for circuit_id in self._circuits_by_interface.get(interface_id, []): |
||
520 | if circuit_id in circuits and circuit_id not in self.circuits: |
||
521 | try: |
||
522 | evc = self._evc_from_dict(circuits[circuit_id]) |
||
523 | except ValueError as exception: |
||
524 | log.info( |
||
525 | f'Could not load EVC {circuit_id} because {exception}') |
||
526 | continue |
||
527 | |||
528 | evc.deactivate() |
||
529 | evc.current_path = Path([]) |
||
530 | evc.sync() |
||
531 | self.circuits.setdefault(circuit_id, evc) |
||
532 | self.sched.add(evc) |
||
533 | |||
534 | 1 | @listen_to('kytos/flow_manager.flow.error') |
|
535 | def handle_flow_mod_error(self, event): |
||
536 | """Handle flow mod errors related to an EVC.""" |
||
537 | flow = event.content['flow'] |
||
538 | command = event.content.get('error_command') |
||
539 | if command != 'add': |
||
540 | return |
||
541 | evc_id = f'{flow.cookie:x}' |
||
542 | evc = self.circuits.get(evc_id) |
||
543 | if evc: |
||
544 | evc.remove_current_flows() |
||
545 | |||
546 | 1 | def _evc_dict_with_instances(self, evc_dict): |
|
547 | """Convert some dict values to instance of EVC classes. |
||
548 | |||
549 | This method will convert: [UNI, Link] |
||
550 | """ |
||
551 | 1 | data = evc_dict.copy() # Do not modify the original dict |
|
552 | |||
553 | 1 | for attribute, value in data.items(): |
|
554 | # Get multiple attributes. |
||
555 | # Ex: uni_a, uni_z |
||
556 | 1 | if 'uni' in attribute: |
|
557 | 1 | try: |
|
558 | 1 | data[attribute] = self._uni_from_dict(value) |
|
559 | 1 | except ValueError as exc: |
|
560 | 1 | raise ValueError(f'Error creating UNI: {exc}') |
|
561 | |||
562 | 1 | if attribute == 'circuit_scheduler': |
|
563 | 1 | data[attribute] = [] |
|
564 | 1 | for schedule in value: |
|
565 | 1 | data[attribute].append(CircuitSchedule.from_dict(schedule)) |
|
566 | |||
567 | # Get multiple attributes. |
||
568 | # Ex: primary_links, |
||
569 | # backup_links, |
||
570 | # current_links_cache, |
||
571 | # primary_links_cache, |
||
572 | # backup_links_cache |
||
573 | 1 | if 'links' in attribute: |
|
574 | 1 | data[attribute] = [self._link_from_dict(link) |
|
575 | for link in value] |
||
576 | |||
577 | # Get multiple attributes. |
||
578 | # Ex: current_path, |
||
579 | # primary_path, |
||
580 | # backup_path |
||
581 | 1 | if 'path' in attribute and attribute != 'dynamic_backup_path': |
|
582 | 1 | data[attribute] = Path([self._link_from_dict(link) |
|
583 | for link in value]) |
||
584 | |||
585 | 1 | return data |
|
586 | |||
587 | 1 | def _evc_from_dict(self, evc_dict): |
|
588 | 1 | data = self._evc_dict_with_instances(evc_dict) |
|
589 | 1 | return EVC(self.controller, **data) |
|
590 | |||
591 | 1 | def _uni_from_dict(self, uni_dict): |
|
592 | """Return a UNI object from python dict.""" |
||
593 | if uni_dict is None: |
||
594 | return False |
||
595 | |||
596 | interface_id = uni_dict.get("interface_id") |
||
597 | interface = self.controller.get_interface_by_id(interface_id) |
||
598 | if interface is None: |
||
599 | raise ValueError(f'Could not instantiate interface {interface_id}') |
||
600 | |||
601 | try: |
||
602 | tag_dict = uni_dict["tag"] |
||
603 | except KeyError: |
||
604 | tag = None |
||
605 | else: |
||
606 | tag = TAG.from_dict(tag_dict) |
||
607 | uni = UNI(interface, tag) |
||
608 | |||
609 | return uni |
||
610 | |||
611 | 1 | def _link_from_dict(self, link_dict): |
|
612 | """Return a Link object from python dict.""" |
||
613 | 1 | id_a = link_dict.get('endpoint_a').get('id') |
|
614 | 1 | id_b = link_dict.get('endpoint_b').get('id') |
|
615 | |||
616 | 1 | endpoint_a = self.controller.get_interface_by_id(id_a) |
|
617 | 1 | endpoint_b = self.controller.get_interface_by_id(id_b) |
|
618 | |||
619 | 1 | link = Link(endpoint_a, endpoint_b) |
|
620 | 1 | if 'metadata' in link_dict: |
|
621 | link.extend_metadata(link_dict.get('metadata')) |
||
622 | |||
623 | 1 | s_vlan = link.get_metadata('s_vlan') |
|
624 | 1 | if s_vlan: |
|
625 | tag = TAG.from_dict(s_vlan) |
||
626 | if tag is False: |
||
627 | error_msg = f'Could not instantiate tag from dict {s_vlan}' |
||
628 | raise ValueError(error_msg) |
||
629 | link.update_metadata('s_vlan', tag) |
||
630 | 1 | return link |
|
631 | |||
632 | 1 | def _find_evc_by_schedule_id(self, schedule_id): |
|
633 | """ |
||
634 | Find an EVC and CircuitSchedule based on schedule_id. |
||
635 | |||
636 | :param schedule_id: Schedule ID |
||
637 | :return: EVC and Schedule |
||
638 | """ |
||
639 | 1 | circuits = self._get_circuits_buffer() |
|
640 | 1 | found_schedule = None |
|
641 | 1 | evc = None |
|
642 | |||
643 | # pylint: disable=unused-variable |
||
644 | 1 | for c_id, circuit in circuits.items(): |
|
645 | 1 | for schedule in circuit.circuit_scheduler: |
|
646 | 1 | if schedule.id == schedule_id: |
|
647 | 1 | found_schedule = schedule |
|
648 | 1 | evc = circuit |
|
649 | 1 | break |
|
650 | 1 | if found_schedule: |
|
651 | 1 | break |
|
652 | 1 | return evc, found_schedule |
|
653 | |||
654 | 1 | def _get_circuits_buffer(self): |
|
655 | """ |
||
656 | Return the circuit buffer. |
||
657 | |||
658 | If the buffer is empty, try to load data from storehouse. |
||
659 | """ |
||
660 | 1 | if not self.circuits: |
|
661 | # Load storehouse circuits to buffer |
||
662 | 1 | circuits = self.storehouse.get_data() |
|
663 | 1 | for c_id, circuit in circuits.items(): |
|
664 | 1 | evc = self._evc_from_dict(circuit) |
|
665 | 1 | self.circuits[c_id] = evc |
|
666 | 1 | return self.circuits |
|
667 | |||
668 | 1 | @staticmethod |
|
669 | def json_from_request(caller): |
||
670 | """Return a json from request. |
||
671 | |||
672 | If it was not possible to get a json from the request, log, for debug, |
||
673 | who was the caller and the error that ocurred, and raise an |
||
674 | Exception. |
||
675 | """ |
||
676 | 1 | try: |
|
677 | 1 | json_data = request.get_json() |
|
678 | except ValueError as exception: |
||
679 | log.error(exception) |
||
680 | log.debug(f'{caller} result {exception} 400') |
||
681 | raise BadRequest(str(exception)) |
||
682 | except BadRequest: |
||
683 | result = 'The request is not a valid JSON.' |
||
684 | log.debug(f'{caller} result {result} 400') |
||
685 | raise BadRequest(result) |
||
686 | 1 | if json_data is None: |
|
687 | result = 'Content-Type must be application/json' |
||
688 | log.debug(f'{caller} result {result} 415') |
||
689 | raise UnsupportedMediaType(result) |
||
690 | return json_data |
||
691 |