Passed
Pull Request — master (#90)
by Antonio
12:57 queued 10:07
created

build.main.Main._evc_from_dict()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.storehouse import StoreHouse
24 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
25
26
27
# pylint: disable=too-many-public-methods
28 1
class Main(KytosNApp):
29
    """Main class of amlight/mef_eline NApp.
30
31
    This class is the entry point for this napp.
32
    """
33
34 1
    spec = load_spec()
35
36 1
    def setup(self):
37
        """Replace the '__init__' method for the KytosNApp subclass.
38
39
        The setup method is automatically called by the controller when your
40
        application is loaded.
41
42
        So, if you have any setup routine, insert it here.
43
        """
44
        # object used to scheduler circuit events
45 1
        self.sched = Scheduler()
46
47
        # object to save and load circuits
48 1
        self.storehouse = StoreHouse(self.controller)
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to storehouse.
55 1
        self.circuits = {}
56
57
        # dictionary of EVCs by interface
58 1
        self._circuits_by_interface = {}
59
60 1
        self._lock = Lock()
61
62 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
63 1
        self.load_time = time.time()
64 1
        self.load_all_evcs()
65
66 1
    def execute(self):
67
        """Execute once when the napp is running."""
68
        for circuit in tuple(self.circuits.values()):
69
            if (
70
                circuit.is_enabled()
71
                and not circuit.is_active()
72
                and not circuit.lock.locked()
73
            ):
74
                if circuit.check_traces():
75
                    with circuit.lock:
76
                        circuit.activate()
77
                        circuit.sync()
78
                else:
79
                    running_for = time.time() - self.load_time
80
                    if running_for > settings.WAIT_FOR_OLD_PATH:
81
                        with circuit.lock:
82
                            circuit.deploy()
83
84 1
    def shutdown(self):
85
        """Execute when your napp is unloaded.
86
87
        If you have some cleanup procedure, insert it here.
88
        """
89
90 1
    @rest("/v2/evc/", methods=["GET"])
91
    def list_circuits(self):
92
        """Endpoint to return circuits stored.
93
94
        If archived is set to True return all circuits, else only the ones
95
        not archived.
96
        """
97 1
        log.debug("list_circuits /v2/evc")
98 1
        archived = request.args.get("archived", False)
99 1
        circuits = self.storehouse.get_data()
100 1
        if not circuits:
101 1
            return jsonify({}), 200
102 1
        if archived:
103 1
            return jsonify(circuits), 200
104 1
        return (
105
            jsonify(
106
                {
107
                    circuit_id: circuit
108
                    for circuit_id, circuit in circuits.items()
109
                    if not circuit.get("archived", False)
110
                }
111
            ),
112
            200,
113
        )
114
115 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
116
    def get_circuit(self, circuit_id):
117
        """Endpoint to return a circuit based on id."""
118 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
119 1
        circuits = self.storehouse.get_data()
120
121 1
        try:
122 1
            result = circuits[circuit_id]
123 1
        except KeyError:
124 1
            result = f"circuit_id {circuit_id} not found"
125 1
            log.debug("get_circuit result %s %s", result, 404)
126 1
            raise BadRequest(result) from KeyError
127 1
        status = 200
128 1
        log.debug("get_circuit result %s %s", result, status)
129 1
        return jsonify(result), status
130
131 1
    @rest("/v2/evc/", methods=["POST"])
132 1
    @validate(spec)
133
    def create_circuit(self, data):
134
        """Try to create a new circuit.
135
136
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
137
        UNI_Z's requested C-VID are available from the interfaces' pools. This
138
        is checked when creating the UNI object.
139
140
        Then, E-Line NApp requests a primary and a backup path to the
141
        Pathfinder NApp using the attributes primary_links and backup_links
142
        submitted via REST
143
144
        # For each link composing paths in #3:
145
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
146
        #  - Using the S-VID obtained, generate abstract flow entries to be
147
        #    sent to FlowManager
148
149
        Push abstract flow entries to FlowManager and FlowManager pushes
150
        OpenFlow entries to datapaths
151
152
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
153
        creation
154
155
        Finnaly, notify user of the status of its request.
156
        """
157
        # Try to create the circuit object
158 1
        log.debug("create_circuit /v2/evc/")
159
160 1
        try:
161 1
            evc = self._evc_from_dict(data)
162 1
        except ValueError as exception:
163
            log.debug("create_circuit result %s %s", exception, 400)
164
            raise BadRequest(str(exception)) from BadRequest
165
166 1
        if evc.primary_path:
167
            try:
168
                evc.primary_path.is_valid(
169
                    evc.uni_a.interface.switch,
170
                    evc.uni_z.interface.switch,
171
                    bool(evc.circuit_scheduler),
172
                )
173
            except InvalidPath as exception:
174
                raise BadRequest(
175
                    f"primary_path is not valid: {exception}"
176
                ) from exception
177 1
        if evc.backup_path:
178
            try:
179
                evc.backup_path.is_valid(
180
                    evc.uni_a.interface.switch,
181
                    evc.uni_z.interface.switch,
182
                    bool(evc.circuit_scheduler),
183
                )
184
            except InvalidPath as exception:
185
                raise BadRequest(
186
                    f"backup_path is not valid: {exception}"
187
                ) from exception
188
189
        # verify duplicated evc
190 1
        if self._is_duplicated_evc(evc):
191 1
            result = "The EVC already exists."
192 1
            log.debug("create_circuit result %s %s", result, 409)
193 1
            raise Conflict(result)
194
195 1
        if (
196
            not evc.primary_path
197
            and evc.dynamic_backup_path is False
198
            and evc.uni_a.interface.switch != evc.uni_z.interface.switch
199
        ):
200 1
            result = "The EVC must have a primary path or allow dynamic paths."
201 1
            log.debug("create_circuit result %s %s", result, 400)
202 1
            raise BadRequest(result)
203
204
        # store circuit in dictionary
205 1
        self.circuits[evc.id] = evc
206
207
        # save circuit
208 1
        self.storehouse.save_evc(evc)
209
210
        # Schedule the circuit deploy
211 1
        self.sched.add(evc)
212
213
        # Circuit has no schedule, deploy now
214 1
        if not evc.circuit_scheduler:
215 1
            with evc.lock:
216 1
                evc.deploy()
217
218
        # Notify users
219 1
        event = KytosEvent(
220
            name="kytos.mef_eline.created", content=evc.as_dict()
221
        )
222 1
        self.controller.buffers.app.put(event)
223
224 1
        result = {"circuit_id": evc.id}
225 1
        status = 201
226 1
        log.debug("create_circuit result %s %s", result, status)
227 1
        emit_event(self.controller, "created", evc_id=evc.id)
228 1
        return jsonify(result), status
229
230 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
231
    def update(self, circuit_id):
232
        """Update a circuit based on payload.
233
234
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
235
        """
236 1
        log.debug("update /v2/evc/%s", circuit_id)
237 1
        try:
238 1
            evc = self.circuits[circuit_id]
239 1
        except KeyError:
240 1
            result = f"circuit_id {circuit_id} not found"
241 1
            log.debug("update result %s %s", result, 404)
242 1
            raise NotFound(result) from NotFound
243
244 1
        if evc.archived:
245 1
            result = "Can't update archived EVC"
246 1
            log.debug("update result %s %s", result, 405)
247 1
            raise MethodNotAllowed(["GET"], result)
248
249 1
        try:
250 1
            data = request.get_json()
251 1
        except BadRequest:
252 1
            result = "The request body is not a well-formed JSON."
253 1
            log.debug("update result %s %s", result, 400)
254 1
            raise BadRequest(result) from BadRequest
255 1
        if data is None:
256 1
            result = "The request body mimetype is not application/json."
257 1
            log.debug("update result %s %s", result, 415)
258 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
259
260 1
        try:
261 1
            enable, redeploy = evc.update(
262
                **self._evc_dict_with_instances(data)
263
            )
264 1
        except ValueError as exception:
265 1
            log.error(exception)
266 1
            log.debug("update result %s %s", exception, 400)
267 1
            raise BadRequest(str(exception)) from BadRequest
268
269 1
        if evc.is_active():
270
            if enable is False:  # disable if active
271
                with evc.lock:
272
                    evc.remove()
273
            elif redeploy is not None:  # redeploy if active
274
                with evc.lock:
275
                    evc.remove()
276
                    evc.deploy()
277
        else:
278 1
            if enable is True:  # enable if inactive
279 1
                with evc.lock:
280 1
                    evc.deploy()
281 1
        result = {evc.id: evc.as_dict()}
282 1
        status = 200
283
284 1
        log.debug("update result %s %s", result, status)
285 1
        emit_event(self.controller, "updated", evc_id=evc.id, data=data)
286 1
        return jsonify(result), status
287
288 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
289
    def delete_circuit(self, circuit_id):
290
        """Remove a circuit.
291
292
        First, the flows are removed from the switches, and then the EVC is
293
        disabled.
294
        """
295 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
296 1
        try:
297 1
            evc = self.circuits[circuit_id]
298 1
        except KeyError:
299 1
            result = f"circuit_id {circuit_id} not found"
300 1
            log.debug("delete_circuit result %s %s", result, 404)
301 1
            raise NotFound(result) from NotFound
302
303 1
        if evc.archived:
304
            result = f"Circuit {circuit_id} already removed"
305
            log.debug("delete_circuit result %s %s", result, 404)
306
            raise NotFound(result) from NotFound
307
308 1
        log.info("Removing %s", evc)
309 1
        evc.remove_current_flows()
310 1
        evc.deactivate()
311 1
        evc.disable()
312 1
        self.sched.remove(evc)
313 1
        evc.archive()
314 1
        evc.sync()
315 1
        log.info("EVC removed. %s", evc)
316 1
        result = {"response": f"Circuit {circuit_id} removed"}
317 1
        status = 200
318
319 1
        log.debug("delete_circuit result %s %s", result, status)
320 1
        emit_event(self.controller, "deleted", evc_id=evc.id)
321 1
        return jsonify(result), status
322
323 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
324
    def get_metadata(self, circuit_id):
325
        """Get metadata from an EVC."""
326
        try:
327
            return (
328
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
329
                200,
330
            )
331
        except KeyError as error:
332
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
333
334 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
335
    def add_metadata(self, circuit_id):
336
        """Add metadata to an EVC."""
337 1
        try:
338 1
            metadata = request.get_json()
339 1
            content_type = request.content_type
340
        except BadRequest as error:
341
            result = "The request body is not a well-formed JSON."
342
            raise BadRequest(result) from error
343 1
        if content_type is None:
344
            result = "The request body is empty."
345
            raise BadRequest(result)
346 1
        if metadata is None:
347
            if content_type != "application/json":
348
                result = (
349
                    "The content type must be application/json "
350
                    f"(received {content_type})."
351
                )
352
            else:
353
                result = "Metadata is empty."
354
            raise UnsupportedMediaType(result)
355
356 1
        try:
357 1
            evc = self.circuits[circuit_id]
358
        except KeyError as error:
359
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
360
361 1
        evc.extend_metadata(metadata)
362 1
        evc.sync()
363 1
        return jsonify("Operation successful"), 201
364
365 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
366
    def delete_metadata(self, circuit_id, key):
367
        """Delete metadata from an EVC."""
368
        try:
369
            evc = self.circuits[circuit_id]
370
        except KeyError as error:
371
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
372
373
        evc.remove_metadata(key)
374
        evc.sync()
375
        return jsonify("Operation successful"), 200
376
377 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
378
    def redeploy(self, circuit_id):
379
        """Endpoint to force the redeployment of an EVC."""
380 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
381 1
        try:
382 1
            evc = self.circuits[circuit_id]
383 1
        except KeyError:
384 1
            result = f"circuit_id {circuit_id} not found"
385 1
            raise NotFound(result) from NotFound
386 1
        if evc.is_enabled():
387 1
            with evc.lock:
388 1
                evc.remove_current_flows()
389 1
                evc.deploy()
390 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
391 1
            status = 202
392
        else:
393 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
394 1
            status = 409
395
396 1
        return jsonify(result), status
397
398 1
    @rest("/v2/evc/schedule", methods=["GET"])
399
    def list_schedules(self):
400
        """Endpoint to return all schedules stored for all circuits.
401
402
        Return a JSON with the following template:
403
        [{"schedule_id": <schedule_id>,
404
         "circuit_id": <circuit_id>,
405
         "schedule": <schedule object>}]
406
        """
407 1
        log.debug("list_schedules /v2/evc/schedule")
408 1
        circuits = self.storehouse.get_data().values()
409 1
        if not circuits:
410 1
            result = {}
411 1
            status = 200
412 1
            return jsonify(result), status
413
414 1
        result = []
415 1
        status = 200
416 1
        for circuit in circuits:
417 1
            circuit_scheduler = circuit.get("circuit_scheduler")
418 1
            if circuit_scheduler:
419 1
                for scheduler in circuit_scheduler:
420 1
                    value = {
421
                        "schedule_id": scheduler.get("id"),
422
                        "circuit_id": circuit.get("id"),
423
                        "schedule": scheduler,
424
                    }
425 1
                    result.append(value)
426
427 1
        log.debug("list_schedules result %s %s", result, status)
428 1
        return jsonify(result), status
429
430 1
    @rest("/v2/evc/schedule/", methods=["POST"])
431
    def create_schedule(self):
432
        """
433
        Create a new schedule for a given circuit.
434
435
        This service do no check if there are conflicts with another schedule.
436
        Payload example:
437
            {
438
              "circuit_id":"aa:bb:cc",
439
              "schedule": {
440
                "date": "2019-08-07T14:52:10.967Z",
441
                "interval": "string",
442
                "frequency": "1 * * * *",
443
                "action": "create"
444
              }
445
            }
446
        """
447 1
        log.debug("create_schedule /v2/evc/schedule/")
448
449 1
        json_data = self._json_from_request("create_schedule")
450 1
        try:
451 1
            circuit_id = json_data["circuit_id"]
452
        except TypeError:
453
            result = "The payload should have a dictionary."
454
            log.debug("create_schedule result %s %s", result, 400)
455
            raise BadRequest(result) from BadRequest
456
        except KeyError:
457
            result = "Missing circuit_id."
458
            log.debug("create_schedule result %s %s", result, 400)
459
            raise BadRequest(result) from BadRequest
460
461 1
        try:
462 1
            schedule_data = json_data["schedule"]
463
        except KeyError:
464
            result = "Missing schedule data."
465
            log.debug("create_schedule result %s %s", result, 400)
466
            raise BadRequest(result) from BadRequest
467
468
        # Get EVC from circuits buffer
469 1
        circuits = self._get_circuits_buffer()
470
471
        # get the circuit
472 1
        evc = circuits.get(circuit_id)
473
474
        # get the circuit
475 1
        if not evc:
476
            result = f"circuit_id {circuit_id} not found"
477
            log.debug("create_schedule result %s %s", result, 404)
478
            raise NotFound(result) from NotFound
479
        # Can not modify circuits deleted and archived
480 1
        if evc.archived:
481
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
482
            log.debug("create_schedule result %s %s", result, 403)
483
            raise Forbidden(result) from Forbidden
484
485
        # new schedule from dict
486 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
487
488
        # If there is no schedule, create the list
489 1
        if not evc.circuit_scheduler:
490
            evc.circuit_scheduler = []
491
492
        # Add the new schedule
493 1
        evc.circuit_scheduler.append(new_schedule)
494
495
        # Add schedule job
496 1
        self.sched.add_circuit_job(evc, new_schedule)
497
498
        # save circuit to storehouse
499 1
        evc.sync()
500
501 1
        result = new_schedule.as_dict()
502 1
        status = 201
503
504 1
        log.debug("create_schedule result %s %s", result, status)
505 1
        return jsonify(result), status
506
507 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
508
    def update_schedule(self, schedule_id):
509
        """Update a schedule.
510
511
        Change all attributes from the given schedule from a EVC circuit.
512
        The schedule ID is preserved as default.
513
        Payload example:
514
            {
515
              "date": "2019-08-07T14:52:10.967Z",
516
              "interval": "string",
517
              "frequency": "1 * * *",
518
              "action": "create"
519
            }
520
        """
521 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
522
523
        # Try to find a circuit schedule
524 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
525
526
        # Can not modify circuits deleted and archived
527 1
        if not found_schedule:
528
            result = f"schedule_id {schedule_id} not found"
529
            log.debug("update_schedule result %s %s", result, 404)
530
            raise NotFound(result) from NotFound
531 1
        if evc.archived:
532 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
533 1
            log.debug("update_schedule result %s %s", result, 403)
534 1
            raise Forbidden(result) from Forbidden
535
536 1
        data = self._json_from_request("update_schedule")
537
538 1
        new_schedule = CircuitSchedule.from_dict(data)
539 1
        new_schedule.id = found_schedule.id
540
        # Remove the old schedule
541 1
        evc.circuit_scheduler.remove(found_schedule)
542
        # Append the modified schedule
543 1
        evc.circuit_scheduler.append(new_schedule)
544
545
        # Cancel all schedule jobs
546 1
        self.sched.cancel_job(found_schedule.id)
547
        # Add the new circuit schedule
548 1
        self.sched.add_circuit_job(evc, new_schedule)
549
        # Save EVC to the storehouse
550 1
        evc.sync()
551
552 1
        result = new_schedule.as_dict()
553 1
        status = 200
554
555 1
        log.debug("update_schedule result %s %s", result, status)
556 1
        return jsonify(result), status
557
558 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
559
    def delete_schedule(self, schedule_id):
560
        """Remove a circuit schedule.
561
562
        Remove the Schedule from EVC.
563
        Remove the Schedule from cron job.
564
        Save the EVC to the Storehouse.
565
        """
566 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
567 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
568
569
        # Can not modify circuits deleted and archived
570 1
        if not found_schedule:
571
            result = f"schedule_id {schedule_id} not found"
572
            log.debug("delete_schedule result %s %s", result, 404)
573
            raise NotFound(result)
574
575 1
        if evc.archived:
576 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
577 1
            log.debug("delete_schedule result %s %s", result, 403)
578 1
            raise Forbidden(result)
579
580
        # Remove the old schedule
581 1
        evc.circuit_scheduler.remove(found_schedule)
582
583
        # Cancel all schedule jobs
584 1
        self.sched.cancel_job(found_schedule.id)
585
        # Save EVC to the storehouse
586 1
        evc.sync()
587
588 1
        result = "Schedule removed"
589 1
        status = 200
590
591 1
        log.debug("delete_schedule result %s %s", result, status)
592 1
        return jsonify(result), status
593
594 1
    def _is_duplicated_evc(self, evc):
595
        """Verify if the circuit given is duplicated with the stored evcs.
596
597
        Args:
598
            evc (EVC): circuit to be analysed.
599
600
        Returns:
601
            boolean: True if the circuit is duplicated, otherwise False.
602
603
        """
604 1
        for circuit in tuple(self.circuits.values()):
605 1
            if not circuit.archived and circuit.shares_uni(evc):
606 1
                return True
607 1
        return False
608
609 1
    @listen_to("kytos/topology.link_up")
610
    def handle_link_up(self, event):
611
        """Change circuit when link is up or end_maintenance."""
612 1
        log.debug("Event handle_link_up %s", event)
613 1
        for evc in self.circuits.values():
614 1
            if evc.is_enabled() and not evc.archived:
615 1
                with evc.lock:
616 1
                    evc.handle_link_up(event.content["link"])
617
618 1
    @listen_to("kytos/topology.link_down")
619
    def handle_link_down(self, event):
620
        """Change circuit when link is down or under_mantenance."""
621 1
        log.debug("Event handle_link_down %s", event)
622 1
        for evc in self.circuits.values():
623 1
            with evc.lock:
624 1
                if evc.is_affected_by_link(event.content["link"]):
625 1
                    log.debug(f"Handling evc {evc.id} on link down")
626 1
                    if evc.handle_link_down():
627 1
                        emit_event(
628
                            self.controller,
629
                            "redeployed_link_down",
630
                            evc_id=evc.id,
631
                        )
632
                    else:
633
                        emit_event(
634
                            self.controller,
635
                            "error_redeploy_link_down",
636
                            evc_id=evc.id,
637
                        )
638
639 1
    def load_circuits_by_interface(self, circuits):
640
        """Load circuits in storehouse for in-memory dictionary."""
641 1
        for circuit_id, circuit in circuits.items():
642 1
            if circuit["archived"] is True:
643 1
                continue
644 1
            intf_a = circuit["uni_a"]["interface_id"]
645 1
            self.add_to_dict_of_sets(intf_a, circuit_id)
646 1
            intf_z = circuit["uni_z"]["interface_id"]
647 1
            self.add_to_dict_of_sets(intf_z, circuit_id)
648 1
            for path in ("current_path", "primary_path", "backup_path"):
649 1
                for link in circuit[path]:
650 1
                    intf_a = link["endpoint_a"]["id"]
651 1
                    self.add_to_dict_of_sets(intf_a, circuit_id)
652 1
                    intf_b = link["endpoint_b"]["id"]
653 1
                    self.add_to_dict_of_sets(intf_b, circuit_id)
654
655 1
    def add_to_dict_of_sets(self, intf, circuit_id):
656
        """Add a single item to the dictionary of circuits by interface."""
657 1
        if intf not in self._circuits_by_interface:
658 1
            self._circuits_by_interface[intf] = set()
659 1
        self._circuits_by_interface[intf].add(circuit_id)
660
661 1
    @listen_to("kytos/topology.port.created")
662
    def load_evcs(self, event):
663
        """Try to load the unloaded EVCs from storehouse."""
664
        with self._lock:
665
            log.debug("Event load_evcs %s", event)
666
            circuits = self.storehouse.get_data()
667
            if not self._circuits_by_interface:
668
                self.load_circuits_by_interface(circuits)
669
670
            interface_id = (
671
                str(event.content["switch"]) + ":" + str(event.content["port"])
672
            )
673
674
            for circuit_id in self._circuits_by_interface.get(
675
                interface_id, []
676
            ):
677
                if circuit_id in circuits and circuit_id not in self.circuits:
678
                    self._load_evc(circuits[circuit_id])
679
680 1
    def load_all_evcs(self):
681
        """Try to load all EVCs on startup."""
682 1
        for circuit_id, circuit in self.storehouse.get_data().items():
683
            if circuit_id not in self.circuits:
684
                self._load_evc(circuit)
685
686 1
    def _load_evc(self, circuit_dict):
687
        """Load one EVC from storehouse to memory."""
688
        try:
689
            evc = self._evc_from_dict(circuit_dict)
690
        except ValueError as exception:
691
            log.error(
692
                f'Could not load EVC {circuit_dict["id"]} '
693
                f"because {exception}"
694
            )
695
            return None
696
697
        if evc.archived:
698
            return None
699
        evc.deactivate()
700
        evc.sync()
701
        self.circuits.setdefault(evc.id, evc)
702
        self.sched.add(evc)
703
        return evc
704
705 1
    @listen_to("kytos/flow_manager.flow.error")
706
    def handle_flow_mod_error(self, event):
707
        """Handle flow mod errors related to an EVC."""
708
        flow = event.content["flow"]
709
        command = event.content.get("error_command")
710
        if command != "add":
711
            return
712
        evc_id = f"{flow.cookie:x}"
713
        evc = self.circuits.get(evc_id)
714
        if evc:
715
            evc.remove_current_flows()
716
717 1
    def _evc_dict_with_instances(self, evc_dict):
718
        """Convert some dict values to instance of EVC classes.
719
720
        This method will convert: [UNI, Link]
721
        """
722 1
        data = evc_dict.copy()  # Do not modify the original dict
723
724 1
        for attribute, value in data.items():
725
            # Get multiple attributes.
726
            # Ex: uni_a, uni_z
727 1
            if "uni" in attribute:
728 1
                try:
729 1
                    data[attribute] = self._uni_from_dict(value)
730 1
                except ValueError:
731 1
                    result = "Error creating UNI: Invalid value"
732 1
                    raise BadRequest(result) from BadRequest
733
734 1
            if attribute == "circuit_scheduler":
735 1
                data[attribute] = []
736 1
                for schedule in value:
737 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
738
739
            # Get multiple attributes.
740
            # Ex: primary_links,
741
            #     backup_links,
742
            #     current_links_cache,
743
            #     primary_links_cache,
744
            #     backup_links_cache
745 1
            if "links" in attribute:
746 1
                data[attribute] = [
747
                    self._link_from_dict(link) for link in value
748
                ]
749
750
            # Get multiple attributes.
751
            # Ex: current_path,
752
            #     primary_path,
753
            #     backup_path
754 1
            if "path" in attribute and attribute != "dynamic_backup_path":
755 1
                data[attribute] = Path(
756
                    [self._link_from_dict(link) for link in value]
757
                )
758
759 1
        return data
760
761 1
    def _evc_from_dict(self, evc_dict):
762 1
        data = self._evc_dict_with_instances(evc_dict)
763 1
        return EVC(self.controller, **data)
764
765 1
    def _uni_from_dict(self, uni_dict):
766
        """Return a UNI object from python dict."""
767
        if uni_dict is None:
768
            return False
769
770
        interface_id = uni_dict.get("interface_id")
771
        interface = self.controller.get_interface_by_id(interface_id)
772
        if interface is None:
773
            result = (
774
                "Error creating UNI:"
775
                + f"Could not instantiate interface {interface_id}"
776
            )
777
            raise ValueError(result) from ValueError
778
779
        tag_dict = uni_dict.get("tag", None)
780
        if tag_dict:
781
            tag = TAG.from_dict(tag_dict)
782
        else:
783
            tag = None
784
        uni = UNI(interface, tag)
785
786
        return uni
787
788 1
    def _link_from_dict(self, link_dict):
789
        """Return a Link object from python dict."""
790 1
        id_a = link_dict.get("endpoint_a").get("id")
791 1
        id_b = link_dict.get("endpoint_b").get("id")
792
793 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
794 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
795
796 1
        link = Link(endpoint_a, endpoint_b)
797 1
        if "metadata" in link_dict:
798
            link.extend_metadata(link_dict.get("metadata"))
799
800 1
        s_vlan = link.get_metadata("s_vlan")
801 1
        if s_vlan:
802
            tag = TAG.from_dict(s_vlan)
803
            if tag is False:
804
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
805
                raise ValueError(error_msg)
806
            link.update_metadata("s_vlan", tag)
807 1
        return link
808
809 1
    def _find_evc_by_schedule_id(self, schedule_id):
810
        """
811
        Find an EVC and CircuitSchedule based on schedule_id.
812
813
        :param schedule_id: Schedule ID
814
        :return: EVC and Schedule
815
        """
816 1
        circuits = self._get_circuits_buffer()
817 1
        found_schedule = None
818 1
        evc = None
819
820
        # pylint: disable=unused-variable
821 1
        for c_id, circuit in circuits.items():
822 1
            for schedule in circuit.circuit_scheduler:
823 1
                if schedule.id == schedule_id:
824 1
                    found_schedule = schedule
825 1
                    evc = circuit
826 1
                    break
827 1
            if found_schedule:
828 1
                break
829 1
        return evc, found_schedule
830
831 1
    def _get_circuits_buffer(self):
832
        """
833
        Return the circuit buffer.
834
835
        If the buffer is empty, try to load data from storehouse.
836
        """
837 1
        if not self.circuits:
838
            # Load storehouse circuits to buffer
839 1
            circuits = self.storehouse.get_data()
840 1
            for c_id, circuit in circuits.items():
841 1
                evc = self._evc_from_dict(circuit)
842 1
                self.circuits[c_id] = evc
843 1
        return self.circuits
844
845 1
    @staticmethod
846
    def _json_from_request(caller):
847
        """Return a json from request.
848
849
        If it was not possible to get a json from the request, log, for debug,
850
        who was the caller and the error that ocurred, and raise an
851
        Exception.
852
        """
853 1
        try:
854 1
            json_data = request.get_json()
855
        except ValueError as exception:
856
            log.error(exception)
857
            log.debug(f"{caller} result {exception} 400")
858
            raise BadRequest(str(exception)) from BadRequest
859
        except BadRequest:
860
            result = "The request is not a valid JSON."
861
            log.debug(f"{caller} result {result} 400")
862
            raise BadRequest(result) from BadRequest
863 1
        if json_data is None:
864
            result = "Content-Type must be application/json"
865
            log.debug(f"{caller} result {result} 415")
866
            raise UnsupportedMediaType(result)
867
        return json_data
868