Passed
Pull Request — master (#298)
by
unknown
04:09
created

build.main.Main.bulk_add_metadata()   A

Complexity

Conditions 4

Size

Total Lines 18
Code Lines 15

Duplication

Lines 18
Ratio 100 %

Code Coverage

Tests 15
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 15
nop 2
dl 18
loc 18
ccs 15
cts 15
cp 1
crap 4
rs 9.65
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from pydantic import ValidationError
11 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
12
                                 MethodNotAllowed, NotFound,
13
                                 UnsupportedMediaType)
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (emit_event, load_spec,
25
                                         map_evc_event_content, validate)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec()
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self._lock = Lock()
60 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
61
62 1
        self.load_all_evcs()
63
64 1
    def get_evcs_by_svc_level(self) -> list:
65
        """Get circuits sorted by desc service level and asc creation_time.
66
67
        In the future, as more ops are offloaded it should be get from the DB.
68
        """
69 1
        return sorted(self.circuits.values(),
70
                      key=lambda x: (-x.service_level, x.creation_time))
71
72 1
    @staticmethod
73 1
    def get_eline_controller():
74
        """Return the ELineController instance."""
75
        return controllers.ELineController()
76
77 1
    def execute(self):
78
        """Execute once when the napp is running."""
79 1
        if self._lock.locked():
80 1
            return
81 1
        log.debug("Starting consistency routine")
82 1
        with self._lock:
83 1
            self.execute_consistency()
84 1
        log.debug("Finished consistency routine")
85
86 1
    @staticmethod
87 1
    def should_be_checked(circuit):
88
        "Verify if the circuit meets the necessary conditions to be checked"
89
        # pylint: disable=too-many-boolean-expressions
90 1
        if (
91
                circuit.is_enabled()
92
                and not circuit.is_active()
93
                and not circuit.lock.locked()
94
                and not circuit.has_recent_removed_flow()
95
                and not circuit.is_recent_updated()
96
                # if a inter-switch EVC does not have current_path, it does not
97
                # make sense to run sdntrace on it
98
                and (circuit.is_intra_switch() or circuit.current_path)
99
                ):
100 1
            return True
101
        return False
102
103 1
    def execute_consistency(self):
104
        """Execute consistency routine."""
105 1
        circuits_to_check = []
106 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
107 1
        for circuit in self.get_evcs_by_svc_level():
108 1
            stored_circuits.pop(circuit.id, None)
109 1
            if self.should_be_checked(circuit):
110 1
                circuits_to_check.append(circuit)
111 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
112 1
        for circuit in circuits_to_check:
113 1
            is_checked = circuits_checked.get(circuit.id)
114 1
            if is_checked:
115 1
                circuit.execution_rounds = 0
116 1
                log.info(f"{circuit} enabled but inactive - activating")
117 1
                with circuit.lock:
118 1
                    circuit.activate()
119 1
                    circuit.sync()
120
            else:
121 1
                circuit.execution_rounds += 1
122 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
123 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
124 1
                    with circuit.lock:
125 1
                        circuit.deploy()
126 1
        for circuit_id in stored_circuits:
127 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
128 1
            self._load_evc(stored_circuits[circuit_id])
129
130 1
    def shutdown(self):
131
        """Execute when your napp is unloaded.
132
133
        If you have some cleanup procedure, insert it here.
134
        """
135
136 1
    @rest("/v2/evc/", methods=["GET"])
137 1
    def list_circuits(self):
138
        """Endpoint to return circuits stored.
139
140
        archive query arg if defined (not null) will be filtered
141
        accordingly, by default only non archived evcs will be listed
142
        """
143 1
        log.debug("list_circuits /v2/evc")
144 1
        args = request.args.to_dict()
145 1
        archived = args.pop("archived", "false").lower()
146 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
147
                                                      metadata=args)
148 1
        circuits = circuits['circuits']
149 1
        return jsonify(circuits), 200
150
151 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
152 1
    def get_circuit(self, circuit_id):
153
        """Endpoint to return a circuit based on id."""
154 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
155 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
156 1
        if not circuit:
157 1
            result = f"circuit_id {circuit_id} not found"
158 1
            log.debug("get_circuit result %s %s", result, 404)
159 1
            raise NotFound(result)
160 1
        status = 200
161 1
        log.debug("get_circuit result %s %s", circuit, status)
162 1
        return jsonify(circuit), status
163
164 1
    @rest("/v2/evc/", methods=["POST"])
165 1
    @validate(spec)
166 1
    def create_circuit(self, data):
167
        """Try to create a new circuit.
168
169
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
170
        UNI_Z's requested C-VID are available from the interfaces' pools. This
171
        is checked when creating the UNI object.
172
173
        Then, E-Line NApp requests a primary and a backup path to the
174
        Pathfinder NApp using the attributes primary_links and backup_links
175
        submitted via REST
176
177
        # For each link composing paths in #3:
178
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
179
        #  - Using the S-VID obtained, generate abstract flow entries to be
180
        #    sent to FlowManager
181
182
        Push abstract flow entries to FlowManager and FlowManager pushes
183
        OpenFlow entries to datapaths
184
185
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
186
        creation
187
188
        Finnaly, notify user of the status of its request.
189
        """
190
        # Try to create the circuit object
191 1
        log.debug("create_circuit /v2/evc/")
192
193 1
        try:
194 1
            evc = self._evc_from_dict(data)
195 1
        except ValueError as exception:
196 1
            log.debug("create_circuit result %s %s", exception, 400)
197 1
            raise BadRequest(str(exception)) from BadRequest
198
199 1
        if evc.primary_path:
200
            try:
201
                evc.primary_path.is_valid(
202
                    evc.uni_a.interface.switch,
203
                    evc.uni_z.interface.switch,
204
                    bool(evc.circuit_scheduler),
205
                )
206
            except InvalidPath as exception:
207
                raise BadRequest(
208
                    f"primary_path is not valid: {exception}"
209
                ) from exception
210 1
        if evc.backup_path:
211
            try:
212
                evc.backup_path.is_valid(
213
                    evc.uni_a.interface.switch,
214
                    evc.uni_z.interface.switch,
215
                    bool(evc.circuit_scheduler),
216
                )
217
            except InvalidPath as exception:
218
                raise BadRequest(
219
                    f"backup_path is not valid: {exception}"
220
                ) from exception
221
222
        # verify duplicated evc
223 1
        if self._is_duplicated_evc(evc):
224 1
            result = "The EVC already exists."
225 1
            log.debug("create_circuit result %s %s", result, 409)
226 1
            raise Conflict(result)
227
228 1
        try:
229 1
            evc._validate_has_primary_or_dynamic()
230 1
        except ValueError as exception:
231 1
            raise BadRequest(str(exception)) from exception
232
233
        # save circuit
234 1
        try:
235 1
            evc.sync()
236
        except ValidationError as exception:
237
            raise BadRequest(str(exception)) from exception
238
239
        # store circuit in dictionary
240 1
        self.circuits[evc.id] = evc
241
242
        # Schedule the circuit deploy
243 1
        self.sched.add(evc)
244
245
        # Circuit has no schedule, deploy now
246 1
        if not evc.circuit_scheduler:
247 1
            with evc.lock:
248 1
                evc.deploy()
249
250
        # Notify users
251 1
        result = {"circuit_id": evc.id}
252 1
        status = 201
253 1
        log.debug("create_circuit result %s %s", result, status)
254 1
        emit_event(self.controller, name="created",
255
                   content=map_evc_event_content(evc))
256 1
        return jsonify(result), status
257
258 1
    @listen_to('kytos/flow_manager.flow.removed')
259 1
    def on_flow_delete(self, event):
260
        """Capture delete messages to keep track when flows got removed."""
261
        self.handle_flow_delete(event)
262
263 1
    def handle_flow_delete(self, event):
264
        """Keep track when the EVC got flows removed by deriving its cookie."""
265 1
        flow = event.content["flow"]
266 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
267 1
        if evc:
268 1
            log.debug("Flow removed in EVC %s", evc.id)
269 1
            evc.set_flow_removed_at()
270
271 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
272 1
    def update(self, circuit_id):
273
        """Update a circuit based on payload.
274
275
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
276
        """
277 1
        log.debug("update /v2/evc/%s", circuit_id)
278 1
        try:
279 1
            evc = self.circuits[circuit_id]
280 1
        except KeyError:
281 1
            result = f"circuit_id {circuit_id} not found"
282 1
            log.debug("update result %s %s", result, 404)
283 1
            raise NotFound(result) from NotFound
284
285 1
        if evc.archived:
286 1
            result = "Can't update archived EVC"
287 1
            log.debug("update result %s %s", result, 405)
288 1
            raise MethodNotAllowed(["GET"], result)
289
290 1
        try:
291 1
            data = request.get_json()
292 1
        except BadRequest:
293 1
            result = "The request body is not a well-formed JSON."
294 1
            log.debug("update result %s %s", result, 400)
295 1
            raise BadRequest(result) from BadRequest
296 1
        if data is None:
297 1
            result = "The request body mimetype is not application/json."
298 1
            log.debug("update result %s %s", result, 415)
299 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
300
301 1
        try:
302 1
            enable, redeploy = evc.update(
303
                **self._evc_dict_with_instances(data)
304
            )
305 1
        except ValidationError as exception:
306
            raise BadRequest(str(exception)) from exception
307 1
        except ValueError as exception:
308 1
            log.error(exception)
309 1
            log.debug("update result %s %s", exception, 400)
310 1
            raise BadRequest(str(exception)) from BadRequest
311
312 1
        if evc.is_active():
313
            if enable is False:  # disable if active
314
                with evc.lock:
315
                    evc.remove()
316
            elif redeploy is not None:  # redeploy if active
317
                with evc.lock:
318
                    evc.remove()
319
                    evc.deploy()
320
        else:
321 1
            if enable is True:  # enable if inactive
322 1
                with evc.lock:
323 1
                    evc.deploy()
324 1
        result = {evc.id: evc.as_dict()}
325 1
        status = 200
326
327 1
        log.debug("update result %s %s", result, status)
328 1
        emit_event(self.controller, "updated",
329
                   content=map_evc_event_content(evc, **data))
330 1
        return jsonify(result), status
331
332 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
333 1
    def delete_circuit(self, circuit_id):
334
        """Remove a circuit.
335
336
        First, the flows are removed from the switches, and then the EVC is
337
        disabled.
338
        """
339 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
340 1
        try:
341 1
            evc = self.circuits[circuit_id]
342 1
        except KeyError:
343 1
            result = f"circuit_id {circuit_id} not found"
344 1
            log.debug("delete_circuit result %s %s", result, 404)
345 1
            raise NotFound(result) from NotFound
346
347 1
        if evc.archived:
348 1
            result = f"Circuit {circuit_id} already removed"
349 1
            log.debug("delete_circuit result %s %s", result, 404)
350 1
            raise NotFound(result) from NotFound
351
352 1
        log.info("Removing %s", evc)
353 1
        with evc.lock:
354 1
            evc.remove_current_flows()
355 1
            evc.remove_failover_flows(sync=False)
356 1
            evc.deactivate()
357 1
            evc.disable()
358 1
            self.sched.remove(evc)
359 1
            evc.archive()
360 1
            evc.sync()
361 1
        log.info("EVC removed. %s", evc)
362 1
        result = {"response": f"Circuit {circuit_id} removed"}
363 1
        status = 200
364
365 1
        log.debug("delete_circuit result %s %s", result, status)
366 1
        emit_event(self.controller, "deleted",
367
                   content=map_evc_event_content(evc))
368 1
        return jsonify(result), status
369
370 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
371 1
    def get_metadata(self, circuit_id):
372
        """Get metadata from an EVC."""
373 1
        try:
374 1
            return (
375
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
376
                200,
377
            )
378
        except KeyError as error:
379
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
380
381 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
382 1
    @validate(spec)
383 1
    def bulk_add_metadata(self, data):
384
        """Add metadata to a bulk of EVCs."""
385 1
        circuit_ids = data.pop("circuit_ids")
386 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
387
388 1
        fail_evcs = []
389 1
        for _id in circuit_ids:
390 1
            try:
391 1
                evc = self.circuits[_id]
392 1
                evc.extend_metadata(data)
393 1
            except KeyError:
394 1
                fail_evcs.append(_id)
395
396 1
        if fail_evcs:
397 1
            return jsonify(fail_evcs), 404
398 1
        return jsonify("Operation successful"), 201
399
400 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
401 1
    def add_metadata(self, circuit_id):
402
        """Add metadata to an EVC."""
403 1
        try:
404 1
            metadata = request.get_json()
405 1
            content_type = request.content_type
406 1
        except BadRequest as error:
407 1
            result = "The request body is not a well-formed JSON."
408 1
            raise BadRequest(result) from error
409 1
        if content_type is None:
410 1
            result = "The request body is empty."
411 1
            raise BadRequest(result)
412 1
        if metadata is None:
413 1
            if content_type != "application/json":
414 1
                result = (
415
                    "The content type must be application/json "
416
                    f"(received {content_type})."
417
                )
418
            else:
419
                result = "Metadata is empty."
420 1
            raise UnsupportedMediaType(result)
421
422 1
        try:
423 1
            evc = self.circuits[circuit_id]
424 1
        except KeyError as error:
425 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
426
427 1
        evc.extend_metadata(metadata)
428 1
        evc.sync()
429 1
        return jsonify("Operation successful"), 201
430
431 1 View Code Duplication
    @rest("v2/evc/metadata/<key>", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
432 1
    @validate(spec)
433 1
    def bulk_delete_metadata(self, data, key):
434
        """Delete metada from a bulk of EVCs"""
435 1
        circuit_ids = data.pop("circuit_ids")
436 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
437
438 1
        fail_evcs = []
439 1
        for _id in circuit_ids:
440 1
            try:
441 1
                evc = self.circuits[_id]
442 1
                evc.remove_metadata(key)
443
            except KeyError:
444
                fail_evcs.append(_id)
445
446 1
        if fail_evcs:
447
            return jsonify(fail_evcs), 404
448 1
        return jsonify("Operation successful"), 200
449
450 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
451 1
    def delete_metadata(self, circuit_id, key):
452
        """Delete metadata from an EVC."""
453 1
        try:
454 1
            evc = self.circuits[circuit_id]
455 1
        except KeyError as error:
456 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
457
458 1
        evc.remove_metadata(key)
459 1
        evc.sync()
460 1
        return jsonify("Operation successful"), 200
461
462 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
463 1
    def redeploy(self, circuit_id):
464
        """Endpoint to force the redeployment of an EVC."""
465 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
466 1
        try:
467 1
            evc = self.circuits[circuit_id]
468 1
        except KeyError:
469 1
            result = f"circuit_id {circuit_id} not found"
470 1
            raise NotFound(result) from NotFound
471 1
        if evc.is_enabled():
472 1
            with evc.lock:
473 1
                evc.remove_current_flows()
474 1
                evc.deploy()
475 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
476 1
            status = 202
477
        else:
478 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
479 1
            status = 409
480
481 1
        return jsonify(result), status
482
483 1
    @rest("/v2/evc/schedule", methods=["GET"])
484 1
    def list_schedules(self):
485
        """Endpoint to return all schedules stored for all circuits.
486
487
        Return a JSON with the following template:
488
        [{"schedule_id": <schedule_id>,
489
         "circuit_id": <circuit_id>,
490
         "schedule": <schedule object>}]
491
        """
492 1
        log.debug("list_schedules /v2/evc/schedule")
493 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
494 1
        if not circuits:
495 1
            result = {}
496 1
            status = 200
497 1
            return jsonify(result), status
498
499 1
        result = []
500 1
        status = 200
501 1
        for circuit in circuits:
502 1
            circuit_scheduler = circuit.get("circuit_scheduler")
503 1
            if circuit_scheduler:
504 1
                for scheduler in circuit_scheduler:
505 1
                    value = {
506
                        "schedule_id": scheduler.get("id"),
507
                        "circuit_id": circuit.get("id"),
508
                        "schedule": scheduler,
509
                    }
510 1
                    result.append(value)
511
512 1
        log.debug("list_schedules result %s %s", result, status)
513 1
        return jsonify(result), status
514
515 1
    @rest("/v2/evc/schedule/", methods=["POST"])
516 1
    def create_schedule(self):
517
        """
518
        Create a new schedule for a given circuit.
519
520
        This service do no check if there are conflicts with another schedule.
521
        Payload example:
522
            {
523
              "circuit_id":"aa:bb:cc",
524
              "schedule": {
525
                "date": "2019-08-07T14:52:10.967Z",
526
                "interval": "string",
527
                "frequency": "1 * * * *",
528
                "action": "create"
529
              }
530
            }
531
        """
532 1
        log.debug("create_schedule /v2/evc/schedule/")
533
534 1
        json_data = self._json_from_request("create_schedule")
535 1
        try:
536 1
            circuit_id = json_data["circuit_id"]
537 1
        except TypeError:
538 1
            result = "The payload should have a dictionary."
539 1
            log.debug("create_schedule result %s %s", result, 400)
540 1
            raise BadRequest(result) from BadRequest
541 1
        except KeyError:
542 1
            result = "Missing circuit_id."
543 1
            log.debug("create_schedule result %s %s", result, 400)
544 1
            raise BadRequest(result) from BadRequest
545
546 1
        try:
547 1
            schedule_data = json_data["schedule"]
548 1
        except KeyError:
549 1
            result = "Missing schedule data."
550 1
            log.debug("create_schedule result %s %s", result, 400)
551 1
            raise BadRequest(result) from BadRequest
552
553
        # Get EVC from circuits buffer
554 1
        circuits = self._get_circuits_buffer()
555
556
        # get the circuit
557 1
        evc = circuits.get(circuit_id)
558
559
        # get the circuit
560 1
        if not evc:
561 1
            result = f"circuit_id {circuit_id} not found"
562 1
            log.debug("create_schedule result %s %s", result, 404)
563 1
            raise NotFound(result) from NotFound
564
        # Can not modify circuits deleted and archived
565 1
        if evc.archived:
566 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
567 1
            log.debug("create_schedule result %s %s", result, 403)
568 1
            raise Forbidden(result) from Forbidden
569
570
        # new schedule from dict
571 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
572
573
        # If there is no schedule, create the list
574 1
        if not evc.circuit_scheduler:
575 1
            evc.circuit_scheduler = []
576
577
        # Add the new schedule
578 1
        evc.circuit_scheduler.append(new_schedule)
579
580
        # Add schedule job
581 1
        self.sched.add_circuit_job(evc, new_schedule)
582
583
        # save circuit to mongodb
584 1
        evc.sync()
585
586 1
        result = new_schedule.as_dict()
587 1
        status = 201
588
589 1
        log.debug("create_schedule result %s %s", result, status)
590 1
        return jsonify(result), status
591
592 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
593 1
    def update_schedule(self, schedule_id):
594
        """Update a schedule.
595
596
        Change all attributes from the given schedule from a EVC circuit.
597
        The schedule ID is preserved as default.
598
        Payload example:
599
            {
600
              "date": "2019-08-07T14:52:10.967Z",
601
              "interval": "string",
602
              "frequency": "1 * * *",
603
              "action": "create"
604
            }
605
        """
606 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
607
608
        # Try to find a circuit schedule
609 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
610
611
        # Can not modify circuits deleted and archived
612 1
        if not found_schedule:
613 1
            result = f"schedule_id {schedule_id} not found"
614 1
            log.debug("update_schedule result %s %s", result, 404)
615 1
            raise NotFound(result) from NotFound
616 1
        if evc.archived:
617 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
618 1
            log.debug("update_schedule result %s %s", result, 403)
619 1
            raise Forbidden(result) from Forbidden
620
621 1
        data = self._json_from_request("update_schedule")
622
623 1
        new_schedule = CircuitSchedule.from_dict(data)
624 1
        new_schedule.id = found_schedule.id
625
        # Remove the old schedule
626 1
        evc.circuit_scheduler.remove(found_schedule)
627
        # Append the modified schedule
628 1
        evc.circuit_scheduler.append(new_schedule)
629
630
        # Cancel all schedule jobs
631 1
        self.sched.cancel_job(found_schedule.id)
632
        # Add the new circuit schedule
633 1
        self.sched.add_circuit_job(evc, new_schedule)
634
        # Save EVC to mongodb
635 1
        evc.sync()
636
637 1
        result = new_schedule.as_dict()
638 1
        status = 200
639
640 1
        log.debug("update_schedule result %s %s", result, status)
641 1
        return jsonify(result), status
642
643 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
644 1
    def delete_schedule(self, schedule_id):
645
        """Remove a circuit schedule.
646
647
        Remove the Schedule from EVC.
648
        Remove the Schedule from cron job.
649
        Save the EVC to the Storehouse.
650
        """
651 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
652 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
653
654
        # Can not modify circuits deleted and archived
655 1
        if not found_schedule:
656 1
            result = f"schedule_id {schedule_id} not found"
657 1
            log.debug("delete_schedule result %s %s", result, 404)
658 1
            raise NotFound(result)
659
660 1
        if evc.archived:
661 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
662 1
            log.debug("delete_schedule result %s %s", result, 403)
663 1
            raise Forbidden(result)
664
665
        # Remove the old schedule
666 1
        evc.circuit_scheduler.remove(found_schedule)
667
668
        # Cancel all schedule jobs
669 1
        self.sched.cancel_job(found_schedule.id)
670
        # Save EVC to mongodb
671 1
        evc.sync()
672
673 1
        result = "Schedule removed"
674 1
        status = 200
675
676 1
        log.debug("delete_schedule result %s %s", result, status)
677 1
        return jsonify(result), status
678
679 1
    def _is_duplicated_evc(self, evc):
680
        """Verify if the circuit given is duplicated with the stored evcs.
681
682
        Args:
683
            evc (EVC): circuit to be analysed.
684
685
        Returns:
686
            boolean: True if the circuit is duplicated, otherwise False.
687
688
        """
689 1
        for circuit in tuple(self.circuits.values()):
690 1
            if not circuit.archived and circuit.shares_uni(evc):
691 1
                return True
692 1
        return False
693
694 1
    @listen_to("kytos/topology.link_up")
695 1
    def on_link_up(self, event):
696
        """Change circuit when link is up or end_maintenance."""
697
        self.handle_link_up(event)
698
699 1
    def handle_link_up(self, event):
700
        """Change circuit when link is up or end_maintenance."""
701 1
        log.info("Event handle_link_up %s", event.content["link"])
702 1
        for evc in self.get_evcs_by_svc_level():
703 1
            if evc.is_enabled() and not evc.archived:
704 1
                with evc.lock:
705 1
                    evc.handle_link_up(event.content["link"])
706
707 1
    @listen_to("kytos/topology.link_down")
708 1
    def on_link_down(self, event):
709
        """Change circuit when link is down or under_mantenance."""
710
        self.handle_link_down(event)
711
712 1
    def handle_link_down(self, event):
713
        """Change circuit when link is down or under_mantenance."""
714 1
        link = event.content["link"]
715 1
        log.info("Event handle_link_down %s", link)
716 1
        switch_flows = {}
717 1
        evcs_with_failover = []
718 1
        evcs_normal = []
719 1
        check_failover = []
720 1
        for evc in self.get_evcs_by_svc_level():
721 1
            if evc.is_affected_by_link(link):
722
                # if there is no failover path, handles link down the
723
                # tradditional way
724 1
                if (
725
                    not getattr(evc, 'failover_path', None) or
726
                    evc.is_failover_path_affected_by_link(link)
727
                ):
728 1
                    evcs_normal.append(evc)
729 1
                    continue
730 1
                for dpid, flows in evc.get_failover_flows().items():
731 1
                    switch_flows.setdefault(dpid, [])
732 1
                    switch_flows[dpid].extend(flows)
733 1
                evcs_with_failover.append(evc)
734
            else:
735 1
                check_failover.append(evc)
736
737 1
        offset = 0
738 1
        while switch_flows:
739 1
            offset = (offset + settings.BATCH_SIZE) or None
740 1
            switches = list(switch_flows.keys())
741 1
            for dpid in switches:
742 1
                emit_event(
743
                    self.controller,
744
                    context="kytos.flow_manager",
745
                    name="flows.install",
746
                    content={
747
                        "dpid": dpid,
748
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
749
                    }
750
                )
751 1
                if offset is None or offset >= len(switch_flows[dpid]):
752 1
                    del switch_flows[dpid]
753 1
                    continue
754 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
755 1
            time.sleep(settings.BATCH_INTERVAL)
756
757 1
        for evc in evcs_with_failover:
758 1
            with evc.lock:
759 1
                old_path = evc.current_path
760 1
                evc.current_path = evc.failover_path
761 1
                evc.failover_path = old_path
762 1
                evc.sync()
763 1
            emit_event(self.controller, "redeployed_link_down",
764
                       content=map_evc_event_content(evc))
765 1
            log.info(
766
                f"{evc} redeployed with failover due to link down {link.id}"
767
            )
768
769 1
        for evc in evcs_normal:
770 1
            emit_event(
771
                self.controller,
772
                "evc_affected_by_link_down",
773
                content={"link_id": link.id} | map_evc_event_content(evc)
774
            )
775
776
        # After handling the hot path, check if new failover paths are needed.
777
        # Note that EVCs affected by link down will generate a KytosEvent for
778
        # deployed|redeployed, which will trigger the failover path setup.
779
        # Thus, we just need to further check the check_failover list
780 1
        for evc in check_failover:
781 1
            if evc.is_failover_path_affected_by_link(link):
782 1
                evc.setup_failover_path()
783
784 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
785 1
    def on_evc_affected_by_link_down(self, event):
786
        """Change circuit when link is down or under_mantenance."""
787
        self.handle_evc_affected_by_link_down(event)
788
789 1
    def handle_evc_affected_by_link_down(self, event):
790
        """Change circuit when link is down or under_mantenance."""
791 1
        evc = self.circuits.get(event.content["evc_id"])
792 1
        link_id = event.content['link_id']
793 1
        if not evc:
794 1
            return
795 1
        with evc.lock:
796 1
            result = evc.handle_link_down()
797 1
        event_name = "error_redeploy_link_down"
798 1
        if result:
799 1
            log.info(f"{evc} redeployed due to link down {link_id}")
800 1
            event_name = "redeployed_link_down"
801 1
        emit_event(self.controller, event_name,
802
                   content=map_evc_event_content(evc))
803
804 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
805 1
    def on_evc_deployed(self, event):
806
        """Handle EVC deployed|redeployed_link_down."""
807
        self.handle_evc_deployed(event)
808
809 1
    def handle_evc_deployed(self, event):
810
        """Setup failover path on evc deployed."""
811 1
        evc = self.circuits.get(event.content["evc_id"])
812 1
        if not evc:
813 1
            return
814 1
        with evc.lock:
815 1
            evc.setup_failover_path()
816
817 1
    @listen_to("kytos/topology.topology_loaded")
818 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
819
        """Load EVCs once the topology is available."""
820
        self.load_all_evcs()
821
822 1
    def load_all_evcs(self):
823
        """Try to load all EVCs on startup."""
824 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
825 1
        for circuit_id, circuit in circuits:
826 1
            if circuit_id not in self.circuits:
827 1
                self._load_evc(circuit)
828
829 1
    def _load_evc(self, circuit_dict):
830
        """Load one EVC from mongodb to memory."""
831 1
        try:
832 1
            evc = self._evc_from_dict(circuit_dict)
833 1
        except ValueError as exception:
834 1
            log.error(
835
                f"Could not load EVC: dict={circuit_dict} error={exception}"
836
            )
837 1
            return None
838
839 1
        if evc.archived:
840 1
            return None
841 1
        evc.deactivate()
842 1
        evc.sync()
843 1
        self.circuits.setdefault(evc.id, evc)
844 1
        self.sched.add(evc)
845 1
        return evc
846
847 1
    @listen_to("kytos/flow_manager.flow.error")
848 1
    def on_flow_mod_error(self, event):
849
        """Handle flow mod errors related to an EVC."""
850
        self.handle_flow_mod_error(event)
851
852 1
    def handle_flow_mod_error(self, event):
853
        """Handle flow mod errors related to an EVC."""
854 1
        flow = event.content["flow"]
855 1
        command = event.content.get("error_command")
856 1
        if command != "add":
857
            return
858 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
859 1
        if evc:
860 1
            evc.remove_current_flows()
861
862 1
    def _evc_dict_with_instances(self, evc_dict):
863
        """Convert some dict values to instance of EVC classes.
864
865
        This method will convert: [UNI, Link]
866
        """
867 1
        data = evc_dict.copy()  # Do not modify the original dict
868 1
        for attribute, value in data.items():
869
            # Get multiple attributes.
870
            # Ex: uni_a, uni_z
871 1
            if "uni" in attribute:
872 1
                try:
873 1
                    data[attribute] = self._uni_from_dict(value)
874 1
                except ValueError as exception:
875 1
                    result = "Error creating UNI: Invalid value"
876 1
                    raise ValueError(result) from exception
877
878 1
            if attribute == "circuit_scheduler":
879 1
                data[attribute] = []
880 1
                for schedule in value:
881 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
882
883
            # Get multiple attributes.
884
            # Ex: primary_links,
885
            #     backup_links,
886
            #     current_links_cache,
887
            #     primary_links_cache,
888
            #     backup_links_cache
889 1
            if "links" in attribute:
890 1
                data[attribute] = [
891
                    self._link_from_dict(link) for link in value
892
                ]
893
894
            # Ex: current_path,
895
            #     primary_path,
896
            #     backup_path
897 1
            if "path" in attribute and attribute != "dynamic_backup_path":
898 1
                data[attribute] = Path(
899
                    [self._link_from_dict(link) for link in value]
900
                )
901
902 1
        return data
903
904 1
    def _evc_from_dict(self, evc_dict):
905 1
        data = self._evc_dict_with_instances(evc_dict)
906 1
        return EVC(self.controller, **data)
907
908 1
    def _uni_from_dict(self, uni_dict):
909
        """Return a UNI object from python dict."""
910 1
        if uni_dict is None:
911 1
            return False
912
913 1
        interface_id = uni_dict.get("interface_id")
914 1
        interface = self.controller.get_interface_by_id(interface_id)
915 1
        if interface is None:
916 1
            result = (
917
                "Error creating UNI:"
918
                + f"Could not instantiate interface {interface_id}"
919
            )
920 1
            raise ValueError(result) from ValueError
921
922 1
        tag_dict = uni_dict.get("tag", None)
923 1
        if tag_dict:
924 1
            tag = TAG.from_dict(tag_dict)
925
        else:
926 1
            tag = None
927 1
        uni = UNI(interface, tag)
928
929 1
        return uni
930
931 1
    def _link_from_dict(self, link_dict):
932
        """Return a Link object from python dict."""
933 1
        id_a = link_dict.get("endpoint_a").get("id")
934 1
        id_b = link_dict.get("endpoint_b").get("id")
935
936 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
937 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
938 1
        if not endpoint_a:
939 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
940 1
            raise ValueError(error_msg)
941 1
        if not endpoint_b:
942
            error_msg = f"Could not get interface endpoint_b id {id_b}"
943
            raise ValueError(error_msg)
944
945 1
        link = Link(endpoint_a, endpoint_b)
946 1
        if "metadata" in link_dict:
947 1
            link.extend_metadata(link_dict.get("metadata"))
948
949 1
        s_vlan = link.get_metadata("s_vlan")
950 1
        if s_vlan:
951 1
            tag = TAG.from_dict(s_vlan)
952 1
            if tag is False:
953
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
954
                raise ValueError(error_msg)
955 1
            link.update_metadata("s_vlan", tag)
956 1
        return link
957
958 1
    def _find_evc_by_schedule_id(self, schedule_id):
959
        """
960
        Find an EVC and CircuitSchedule based on schedule_id.
961
962
        :param schedule_id: Schedule ID
963
        :return: EVC and Schedule
964
        """
965 1
        circuits = self._get_circuits_buffer()
966 1
        found_schedule = None
967 1
        evc = None
968
969
        # pylint: disable=unused-variable
970 1
        for c_id, circuit in circuits.items():
971 1
            for schedule in circuit.circuit_scheduler:
972 1
                if schedule.id == schedule_id:
973 1
                    found_schedule = schedule
974 1
                    evc = circuit
975 1
                    break
976 1
            if found_schedule:
977 1
                break
978 1
        return evc, found_schedule
979
980 1
    def _get_circuits_buffer(self):
981
        """
982
        Return the circuit buffer.
983
984
        If the buffer is empty, try to load data from mongodb.
985
        """
986 1
        if not self.circuits:
987
            # Load circuits from mongodb to buffer
988 1
            circuits = self.mongo_controller.get_circuits()['circuits']
989 1
            for c_id, circuit in circuits.items():
990 1
                evc = self._evc_from_dict(circuit)
991 1
                self.circuits[c_id] = evc
992 1
        return self.circuits
993
994 1
    @staticmethod
995 1
    def _json_from_request(caller):
996
        """Return a json from request.
997
998
        If it was not possible to get a json from the request, log, for debug,
999
        who was the caller and the error that ocurred, and raise an
1000
        Exception.
1001
        """
1002 1
        try:
1003 1
            json_data = request.get_json()
1004 1
        except ValueError as exception:
1005
            log.error(exception)
1006
            log.debug(f"{caller} result {exception} 400")
1007
            raise BadRequest(str(exception)) from BadRequest
1008 1
        except BadRequest:
1009 1
            result = "The request is not a valid JSON."
1010 1
            log.debug(f"{caller} result {result} 400")
1011 1
            raise BadRequest(result) from BadRequest
1012 1
        if json_data is None:
1013 1
            result = "Content-Type must be application/json"
1014 1
            log.debug(f"{caller} result {result} 415")
1015 1
            raise UnsupportedMediaType(result)
1016
        return json_data
1017