Passed
Pull Request — master (#257)
by
unknown
03:44
created

build.main   F

Complexity

Total Complexity 164

Size/Duplication

Total Lines 1005
Duplicated Lines 0 %

Test Coverage

Coverage 93.99%

Importance

Changes 0
Metric Value
wmc 164
eloc 653
dl 0
loc 1005
ccs 532
cts 566
cp 0.9399
rs 1.947
c 0
b 0
f 0

42 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.setup() 0 26 1
A Main.list_circuits() 0 14 1
A Main.on_flow_delete() 0 4 1
A Main.get_circuit() 0 12 2
A Main.shutdown() 0 2 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 7 2
A Main.get_eline_controller() 0 4 1
C Main.create_circuit() 0 97 10
D Main.execute_consistency() 0 32 13
A Main.handle_flow_delete() 0 7 2
B Main.add_metadata() 0 30 6
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.handle_evc_deployed() 0 7 3
A Main.update_schedule() 0 50 3
A Main._evc_from_dict() 0 3 1
A Main._find_evc_by_schedule_id() 0 21 5
A Main.load_all_evcs() 0 6 3
A Main.get_metadata() 0 10 2
A Main.delete_metadata() 0 11 2
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 35 3
A Main.handle_evc_affected_by_link_down() 0 20 4
F Main.handle_link_down() 0 87 15
A Main.on_evc_deployed() 0 4 1
A Main._json_from_request() 0 23 4
B Main.list_schedules() 0 31 5
B Main._link_from_dict() 0 26 6
A Main._uni_from_dict() 0 22 4
A Main.redeploy() 0 20 4
A Main.handle_flow_mod_error() 0 9 3
A Main._is_duplicated_evc() 0 14 4
D Main.update() 0 65 13
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main._load_evc() 0 17 3
B Main.create_schedule() 0 76 7
A Main._get_circuits_buffer() 0 13 3
A Main.delete_circuit() 0 44 4

How to fix   Complexity   

Complexity

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import time
7 1
from threading import Lock
8
9 1
from flask import jsonify, request
10 1
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
11
                                 MethodNotAllowed, NotFound,
12
                                 UnsupportedMediaType)
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.helpers import listen_to
16 1
from kytos.core.interface import TAG, UNI
17 1
from kytos.core.link import Link
18 1
from napps.kytos.mef_eline import controllers, settings
19 1
from napps.kytos.mef_eline.exceptions import InvalidPath
20 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
21
                                          Path)
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, load_spec, validate
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec()
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
59
60 1
        self.load_all_evcs()
61
62 1
    def get_evcs_by_svc_level(self) -> list:
63
        """Get circuits sorted by desc service level and asc creation_time.
64
65
        In the future, as more ops are offloaded it should be get from the DB.
66
        """
67 1
        return sorted(self.circuits.values(),
68
                      key=lambda x: (-x.service_level, x.creation_time))
69
70 1
    @staticmethod
71 1
    def get_eline_controller():
72
        """Return the ELineController instance."""
73
        return controllers.ELineController()
74
75 1
    def execute(self):
76
        """Execute once when the napp is running."""
77 1
        if self._lock.locked():
78 1
            return
79 1
        log.debug("Starting consistency routine")
80 1
        with self._lock:
81 1
            self.execute_consistency()
82 1
        log.debug("Finished consistency routine")
83
84 1
    def execute_consistency(self):
85
        """Execute consistency routine."""
86 1
        circuits_to_check = {}
87 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
88 1
        for circuit in self.get_evcs_by_svc_level():
89 1
            stored_circuits.pop(circuit.id, None)
90 1
            if (
91
                circuit.is_enabled()
92
                and not circuit.is_active()
93
                and not circuit.lock.locked()
94
                and not circuit.has_recent_removed_flow()
95
                and not circuit.is_recent_updated()
96
            ):
97 1
                circuits_to_check[circuit.id] = circuit
98 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
99 1
        for circuit_id, circuit in circuits_to_check.items():
100 1
            is_checked = circuits_checked.get(circuit_id)
101 1
            if is_checked:
102 1
                circuit.execution_rounds = 0
103 1
                log.info(f"{circuit} enabled but inactive - activating")
104 1
                with circuit.lock:
105 1
                    circuit.activate()
106 1
                    circuit.sync()
107
            else:
108 1
                circuit.execution_rounds += 1
109 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
110 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
111 1
                    with circuit.lock:
112 1
                        circuit.deploy()
113 1
        for circuit_id in stored_circuits:
114 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
115 1
            self._load_evc(stored_circuits[circuit_id])
116
117 1
    def shutdown(self):
118
        """Execute when your napp is unloaded.
119
120
        If you have some cleanup procedure, insert it here.
121
        """
122
123 1
    @rest("/v2/evc/", methods=["GET"])
124 1
    def list_circuits(self):
125
        """Endpoint to return circuits stored.
126
127
        archive query arg if defined (not null) will be filtered
128
        accordingly, by default only non archived evcs will be listed
129
        """
130 1
        log.debug("list_circuits /v2/evc")
131 1
        archived = request.args.get("archived", "false").lower()
132 1
        archived_to_optional = {"null": None, "true": True, "false": False}
133 1
        archived = archived_to_optional.get(archived, False)
134 1
        circuits = self.mongo_controller.get_circuits(archived=archived)
135 1
        circuits = circuits['circuits']
136 1
        return jsonify(circuits), 200
137
138 1
    @rest("/v2/evc/<circuit_id>", methods=["GET"])
139 1
    def get_circuit(self, circuit_id):
140
        """Endpoint to return a circuit based on id."""
141 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
142 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
143 1
        if not circuit:
144 1
            result = f"circuit_id {circuit_id} not found"
145 1
            log.debug("get_circuit result %s %s", result, 404)
146 1
            raise NotFound(result)
147 1
        status = 200
148 1
        log.debug("get_circuit result %s %s", circuit, status)
149 1
        return jsonify(circuit), status
150
151 1
    @rest("/v2/evc/", methods=["POST"])
152 1
    @validate(spec)
153 1
    def create_circuit(self, data):
154
        """Try to create a new circuit.
155
156
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
157
        UNI_Z's requested C-VID are available from the interfaces' pools. This
158
        is checked when creating the UNI object.
159
160
        Then, E-Line NApp requests a primary and a backup path to the
161
        Pathfinder NApp using the attributes primary_links and backup_links
162
        submitted via REST
163
164
        # For each link composing paths in #3:
165
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
166
        #  - Using the S-VID obtained, generate abstract flow entries to be
167
        #    sent to FlowManager
168
169
        Push abstract flow entries to FlowManager and FlowManager pushes
170
        OpenFlow entries to datapaths
171
172
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
173
        creation
174
175
        Finnaly, notify user of the status of its request.
176
        """
177
        # Try to create the circuit object
178 1
        log.debug("create_circuit /v2/evc/")
179
180 1
        try:
181 1
            evc = self._evc_from_dict(data)
182 1
        except ValueError as exception:
183 1
            log.debug("create_circuit result %s %s", exception, 400)
184 1
            raise BadRequest(str(exception)) from BadRequest
185
186 1
        if evc.primary_path:
187
            try:
188
                evc.primary_path.is_valid(
189
                    evc.uni_a.interface.switch,
190
                    evc.uni_z.interface.switch,
191
                    bool(evc.circuit_scheduler),
192
                )
193
            except InvalidPath as exception:
194
                raise BadRequest(
195
                    f"primary_path is not valid: {exception}"
196
                ) from exception
197 1
        if evc.backup_path:
198
            try:
199
                evc.backup_path.is_valid(
200
                    evc.uni_a.interface.switch,
201
                    evc.uni_z.interface.switch,
202
                    bool(evc.circuit_scheduler),
203
                )
204
            except InvalidPath as exception:
205
                raise BadRequest(
206
                    f"backup_path is not valid: {exception}"
207
                ) from exception
208
209
        # verify duplicated evc
210 1
        if self._is_duplicated_evc(evc):
211 1
            result = "The EVC already exists."
212 1
            log.debug("create_circuit result %s %s", result, 409)
213 1
            raise Conflict(result)
214
215 1
        try:
216 1
            evc._validate_has_primary_or_dynamic()
217 1
        except ValueError as exception:
218 1
            raise BadRequest(str(exception)) from exception
219
220
        # store circuit in dictionary
221 1
        self.circuits[evc.id] = evc
222
223
        # save circuit
224 1
        evc.sync()
225
226
        # Schedule the circuit deploy
227 1
        self.sched.add(evc)
228
229
        # Circuit has no schedule, deploy now
230 1
        if not evc.circuit_scheduler:
231 1
            with evc.lock:
232 1
                evc.deploy()
233
234
        # Notify users
235 1
        result = {"circuit_id": evc.id}
236 1
        status = 201
237 1
        log.debug("create_circuit result %s %s", result, status)
238 1
        emit_event(self.controller, name="created", content={
239
            "evc.id": evc.id,
240
            "name": evc.name,
241
            "metadata": evc.metadata,
242
            "active": evc._active,
243
            "enabled": evc._enabled,
244
            "uni_a": evc.uni_a,
245
            "uni_z": evc.uni_z,
246
        })
247 1
        return jsonify(result), status
248
249 1
    @listen_to('kytos/flow_manager.flow.removed')
250 1
    def on_flow_delete(self, event):
251
        """Capture delete messages to keep track when flows got removed."""
252
        self.handle_flow_delete(event)
253
254 1
    def handle_flow_delete(self, event):
255
        """Keep track when the EVC got flows removed by deriving its cookie."""
256 1
        flow = event.content["flow"]
257 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
258 1
        if evc:
259 1
            log.debug("Flow removed in EVC %s", evc.id)
260 1
            evc.set_flow_removed_at()
261
262 1
    @rest("/v2/evc/<circuit_id>", methods=["PATCH"])
263 1
    def update(self, circuit_id):
264
        """Update a circuit based on payload.
265
266
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
267
        """
268 1
        log.debug("update /v2/evc/%s", circuit_id)
269 1
        try:
270 1
            evc = self.circuits[circuit_id]
271 1
        except KeyError:
272 1
            result = f"circuit_id {circuit_id} not found"
273 1
            log.debug("update result %s %s", result, 404)
274 1
            raise NotFound(result) from NotFound
275
276 1
        if evc.archived:
277 1
            result = "Can't update archived EVC"
278 1
            log.debug("update result %s %s", result, 405)
279 1
            raise MethodNotAllowed(["GET"], result)
280
281 1
        try:
282 1
            data = request.get_json()
283 1
        except BadRequest:
284 1
            result = "The request body is not a well-formed JSON."
285 1
            log.debug("update result %s %s", result, 400)
286 1
            raise BadRequest(result) from BadRequest
287 1
        if data is None:
288 1
            result = "The request body mimetype is not application/json."
289 1
            log.debug("update result %s %s", result, 415)
290 1
            raise UnsupportedMediaType(result) from UnsupportedMediaType
291
292 1
        try:
293 1
            enable, redeploy = evc.update(
294
                **self._evc_dict_with_instances(data)
295
            )
296 1
        except ValueError as exception:
297 1
            log.error(exception)
298 1
            log.debug("update result %s %s", exception, 400)
299 1
            raise BadRequest(str(exception)) from BadRequest
300
301 1
        if evc.is_active():
302
            if enable is False:  # disable if active
303
                with evc.lock:
304
                    evc.remove()
305
            elif redeploy is not None:  # redeploy if active
306
                with evc.lock:
307
                    evc.remove()
308
                    evc.deploy()
309
        else:
310 1
            if enable is True:  # enable if inactive
311 1
                with evc.lock:
312 1
                    evc.deploy()
313 1
        result = {evc.id: evc.as_dict()}
314 1
        status = 200
315
316 1
        log.debug("update result %s %s", result, status)
317 1
        emit_event(self.controller, "updated", content={
318
            "evc.id": evc.id,
319
            "name": evc.name,
320
            "metadata": evc.metadata,
321
            "active": evc._active,
322
            "enabled": evc._enabled,
323
            "uni_a": evc.uni_a,
324
            "uni_z": evc.uni_z,
325
        })
326 1
        return jsonify(result), status
327
328 1
    @rest("/v2/evc/<circuit_id>", methods=["DELETE"])
329 1
    def delete_circuit(self, circuit_id):
330
        """Remove a circuit.
331
332
        First, the flows are removed from the switches, and then the EVC is
333
        disabled.
334
        """
335 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
336 1
        try:
337 1
            evc = self.circuits[circuit_id]
338 1
        except KeyError:
339 1
            result = f"circuit_id {circuit_id} not found"
340 1
            log.debug("delete_circuit result %s %s", result, 404)
341 1
            raise NotFound(result) from NotFound
342
343 1
        if evc.archived:
344 1
            result = f"Circuit {circuit_id} already removed"
345 1
            log.debug("delete_circuit result %s %s", result, 404)
346 1
            raise NotFound(result) from NotFound
347
348 1
        log.info("Removing %s", evc)
349 1
        with evc.lock:
350 1
            evc.remove_current_flows()
351 1
            evc.remove_failover_flows(sync=False)
352 1
            evc.deactivate()
353 1
            evc.disable()
354 1
            self.sched.remove(evc)
355 1
            evc.archive()
356 1
            evc.sync()
357 1
        log.info("EVC removed. %s", evc)
358 1
        result = {"response": f"Circuit {circuit_id} removed"}
359 1
        status = 200
360
361 1
        log.debug("delete_circuit result %s %s", result, status)
362 1
        emit_event(self.controller, "deleted", content={
363
            "evc_id": evc.id,
364
            "name": evc.name,
365
            "metadata": evc.metadata,
366
            "active": evc._active,
367
            "enabled": evc._enabled,
368
            "uni_a": evc.uni_a,
369
            "uni_z": evc.uni_z,
370
        })
371 1
        return jsonify(result), status
372
373 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["GET"])
374 1
    def get_metadata(self, circuit_id):
375
        """Get metadata from an EVC."""
376 1
        try:
377 1
            return (
378
                jsonify({"metadata": self.circuits[circuit_id].metadata}),
379
                200,
380
            )
381
        except KeyError as error:
382
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
383
384 1
    @rest("v2/evc/<circuit_id>/metadata", methods=["POST"])
385 1
    def add_metadata(self, circuit_id):
386
        """Add metadata to an EVC."""
387 1
        try:
388 1
            metadata = request.get_json()
389 1
            content_type = request.content_type
390 1
        except BadRequest as error:
391 1
            result = "The request body is not a well-formed JSON."
392 1
            raise BadRequest(result) from error
393 1
        if content_type is None:
394 1
            result = "The request body is empty."
395 1
            raise BadRequest(result)
396 1
        if metadata is None:
397 1
            if content_type != "application/json":
398 1
                result = (
399
                    "The content type must be application/json "
400
                    f"(received {content_type})."
401
                )
402
            else:
403
                result = "Metadata is empty."
404 1
            raise UnsupportedMediaType(result)
405
406 1
        try:
407 1
            evc = self.circuits[circuit_id]
408 1
        except KeyError as error:
409 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
410
411 1
        evc.extend_metadata(metadata)
412 1
        evc.sync()
413 1
        return jsonify("Operation successful"), 201
414
415 1
    @rest("v2/evc/<circuit_id>/metadata/<key>", methods=["DELETE"])
416 1
    def delete_metadata(self, circuit_id, key):
417
        """Delete metadata from an EVC."""
418 1
        try:
419 1
            evc = self.circuits[circuit_id]
420 1
        except KeyError as error:
421 1
            raise NotFound(f"circuit_id {circuit_id} not found.") from error
422
423 1
        evc.remove_metadata(key)
424 1
        evc.sync()
425 1
        return jsonify("Operation successful"), 200
426
427 1
    @rest("/v2/evc/<circuit_id>/redeploy", methods=["PATCH"])
428 1
    def redeploy(self, circuit_id):
429
        """Endpoint to force the redeployment of an EVC."""
430 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
431 1
        try:
432 1
            evc = self.circuits[circuit_id]
433 1
        except KeyError:
434 1
            result = f"circuit_id {circuit_id} not found"
435 1
            raise NotFound(result) from NotFound
436 1
        if evc.is_enabled():
437 1
            with evc.lock:
438 1
                evc.remove_current_flows()
439 1
                evc.deploy()
440 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
441 1
            status = 202
442
        else:
443 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
444 1
            status = 409
445
446 1
        return jsonify(result), status
447
448 1
    @rest("/v2/evc/schedule", methods=["GET"])
449 1
    def list_schedules(self):
450
        """Endpoint to return all schedules stored for all circuits.
451
452
        Return a JSON with the following template:
453
        [{"schedule_id": <schedule_id>,
454
         "circuit_id": <circuit_id>,
455
         "schedule": <schedule object>}]
456
        """
457 1
        log.debug("list_schedules /v2/evc/schedule")
458 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
459 1
        if not circuits:
460 1
            result = {}
461 1
            status = 200
462 1
            return jsonify(result), status
463
464 1
        result = []
465 1
        status = 200
466 1
        for circuit in circuits:
467 1
            circuit_scheduler = circuit.get("circuit_scheduler")
468 1
            if circuit_scheduler:
469 1
                for scheduler in circuit_scheduler:
470 1
                    value = {
471
                        "schedule_id": scheduler.get("id"),
472
                        "circuit_id": circuit.get("id"),
473
                        "schedule": scheduler,
474
                    }
475 1
                    result.append(value)
476
477 1
        log.debug("list_schedules result %s %s", result, status)
478 1
        return jsonify(result), status
479
480 1
    @rest("/v2/evc/schedule/", methods=["POST"])
481 1
    def create_schedule(self):
482
        """
483
        Create a new schedule for a given circuit.
484
485
        This service do no check if there are conflicts with another schedule.
486
        Payload example:
487
            {
488
              "circuit_id":"aa:bb:cc",
489
              "schedule": {
490
                "date": "2019-08-07T14:52:10.967Z",
491
                "interval": "string",
492
                "frequency": "1 * * * *",
493
                "action": "create"
494
              }
495
            }
496
        """
497 1
        log.debug("create_schedule /v2/evc/schedule/")
498
499 1
        json_data = self._json_from_request("create_schedule")
500 1
        try:
501 1
            circuit_id = json_data["circuit_id"]
502 1
        except TypeError:
503 1
            result = "The payload should have a dictionary."
504 1
            log.debug("create_schedule result %s %s", result, 400)
505 1
            raise BadRequest(result) from BadRequest
506 1
        except KeyError:
507 1
            result = "Missing circuit_id."
508 1
            log.debug("create_schedule result %s %s", result, 400)
509 1
            raise BadRequest(result) from BadRequest
510
511 1
        try:
512 1
            schedule_data = json_data["schedule"]
513 1
        except KeyError:
514 1
            result = "Missing schedule data."
515 1
            log.debug("create_schedule result %s %s", result, 400)
516 1
            raise BadRequest(result) from BadRequest
517
518
        # Get EVC from circuits buffer
519 1
        circuits = self._get_circuits_buffer()
520
521
        # get the circuit
522 1
        evc = circuits.get(circuit_id)
523
524
        # get the circuit
525 1
        if not evc:
526 1
            result = f"circuit_id {circuit_id} not found"
527 1
            log.debug("create_schedule result %s %s", result, 404)
528 1
            raise NotFound(result) from NotFound
529
        # Can not modify circuits deleted and archived
530 1
        if evc.archived:
531 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
532 1
            log.debug("create_schedule result %s %s", result, 403)
533 1
            raise Forbidden(result) from Forbidden
534
535
        # new schedule from dict
536 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
537
538
        # If there is no schedule, create the list
539 1
        if not evc.circuit_scheduler:
540 1
            evc.circuit_scheduler = []
541
542
        # Add the new schedule
543 1
        evc.circuit_scheduler.append(new_schedule)
544
545
        # Add schedule job
546 1
        self.sched.add_circuit_job(evc, new_schedule)
547
548
        # save circuit to mongodb
549 1
        evc.sync()
550
551 1
        result = new_schedule.as_dict()
552 1
        status = 201
553
554 1
        log.debug("create_schedule result %s %s", result, status)
555 1
        return jsonify(result), status
556
557 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["PATCH"])
558 1
    def update_schedule(self, schedule_id):
559
        """Update a schedule.
560
561
        Change all attributes from the given schedule from a EVC circuit.
562
        The schedule ID is preserved as default.
563
        Payload example:
564
            {
565
              "date": "2019-08-07T14:52:10.967Z",
566
              "interval": "string",
567
              "frequency": "1 * * *",
568
              "action": "create"
569
            }
570
        """
571 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
572
573
        # Try to find a circuit schedule
574 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
575
576
        # Can not modify circuits deleted and archived
577 1
        if not found_schedule:
578 1
            result = f"schedule_id {schedule_id} not found"
579 1
            log.debug("update_schedule result %s %s", result, 404)
580 1
            raise NotFound(result) from NotFound
581 1
        if evc.archived:
582 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
583 1
            log.debug("update_schedule result %s %s", result, 403)
584 1
            raise Forbidden(result) from Forbidden
585
586 1
        data = self._json_from_request("update_schedule")
587
588 1
        new_schedule = CircuitSchedule.from_dict(data)
589 1
        new_schedule.id = found_schedule.id
590
        # Remove the old schedule
591 1
        evc.circuit_scheduler.remove(found_schedule)
592
        # Append the modified schedule
593 1
        evc.circuit_scheduler.append(new_schedule)
594
595
        # Cancel all schedule jobs
596 1
        self.sched.cancel_job(found_schedule.id)
597
        # Add the new circuit schedule
598 1
        self.sched.add_circuit_job(evc, new_schedule)
599
        # Save EVC to mongodb
600 1
        evc.sync()
601
602 1
        result = new_schedule.as_dict()
603 1
        status = 200
604
605 1
        log.debug("update_schedule result %s %s", result, status)
606 1
        return jsonify(result), status
607
608 1
    @rest("/v2/evc/schedule/<schedule_id>", methods=["DELETE"])
609 1
    def delete_schedule(self, schedule_id):
610
        """Remove a circuit schedule.
611
612
        Remove the Schedule from EVC.
613
        Remove the Schedule from cron job.
614
        Save the EVC to the Storehouse.
615
        """
616 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
617 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
618
619
        # Can not modify circuits deleted and archived
620 1
        if not found_schedule:
621 1
            result = f"schedule_id {schedule_id} not found"
622 1
            log.debug("delete_schedule result %s %s", result, 404)
623 1
            raise NotFound(result)
624
625 1
        if evc.archived:
626 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
627 1
            log.debug("delete_schedule result %s %s", result, 403)
628 1
            raise Forbidden(result)
629
630
        # Remove the old schedule
631 1
        evc.circuit_scheduler.remove(found_schedule)
632
633
        # Cancel all schedule jobs
634 1
        self.sched.cancel_job(found_schedule.id)
635
        # Save EVC to mongodb
636 1
        evc.sync()
637
638 1
        result = "Schedule removed"
639 1
        status = 200
640
641 1
        log.debug("delete_schedule result %s %s", result, status)
642 1
        return jsonify(result), status
643
644 1
    def _is_duplicated_evc(self, evc):
645
        """Verify if the circuit given is duplicated with the stored evcs.
646
647
        Args:
648
            evc (EVC): circuit to be analysed.
649
650
        Returns:
651
            boolean: True if the circuit is duplicated, otherwise False.
652
653
        """
654 1
        for circuit in tuple(self.circuits.values()):
655 1
            if not circuit.archived and circuit.shares_uni(evc):
656 1
                return True
657 1
        return False
658
659 1
    @listen_to("kytos/topology.link_up")
660 1
    def on_link_up(self, event):
661
        """Change circuit when link is up or end_maintenance."""
662
        self.handle_link_up(event)
663
664 1
    def handle_link_up(self, event):
665
        """Change circuit when link is up or end_maintenance."""
666 1
        log.info("Event handle_link_up %s", event.content["link"])
667 1
        for evc in self.get_evcs_by_svc_level():
668 1
            if evc.is_enabled() and not evc.archived:
669 1
                with evc.lock:
670 1
                    evc.handle_link_up(event.content["link"])
671
672 1
    @listen_to("kytos/topology.link_down")
673 1
    def on_link_down(self, event):
674
        """Change circuit when link is down or under_mantenance."""
675
        self.handle_link_down(event)
676
677 1
    def handle_link_down(self, event):
678
        """Change circuit when link is down or under_mantenance."""
679 1
        link = event.content["link"]
680 1
        log.info("Event handle_link_down %s", link)
681 1
        switch_flows = {}
682 1
        evcs_with_failover = []
683 1
        evcs_normal = []
684 1
        check_failover = []
685 1
        for evc in self.get_evcs_by_svc_level():
686 1
            if evc.is_affected_by_link(link):
687
                # if there is no failover path, handles link down the
688
                # tradditional way
689 1
                if (
690
                    not getattr(evc, 'failover_path', None) or
691
                    evc.is_failover_path_affected_by_link(link)
692
                ):
693 1
                    evcs_normal.append(evc)
694 1
                    continue
695 1
                for dpid, flows in evc.get_failover_flows().items():
696 1
                    switch_flows.setdefault(dpid, [])
697 1
                    switch_flows[dpid].extend(flows)
698 1
                evcs_with_failover.append(evc)
699
            else:
700 1
                check_failover.append(evc)
701
702 1
        offset = 0
703 1
        while switch_flows:
704 1
            offset = (offset + settings.BATCH_SIZE) or None
705 1
            switches = list(switch_flows.keys())
706 1
            for dpid in switches:
707 1
                emit_event(
708
                    self.controller,
709
                    context="kytos.flow_manager",
710
                    name="flows.install",
711
                    content={
712
                        "dpid": dpid,
713
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
714
                    }
715
                )
716 1
                if offset is None or offset >= len(switch_flows[dpid]):
717 1
                    del switch_flows[dpid]
718 1
                    continue
719 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
720 1
            time.sleep(settings.BATCH_INTERVAL)
721
722 1
        for evc in evcs_with_failover:
723 1
            with evc.lock:
724 1
                old_path = evc.current_path
725 1
                evc.current_path = evc.failover_path
726 1
                evc.failover_path = old_path
727 1
                evc.sync()
728 1
            emit_event(self.controller, "redeployed_link_down", content={
729
                "evc_id": evc.id,
730
                "name": evc.name,
731
                "metadata": evc.metadata,
732
                "active": evc._active,
733
                "enabled": evc._enabled,
734
                "uni_a": evc.uni_a,
735
                "uni_z": evc.uni_z,
736
            })
737 1
            log.info(
738
                f"{evc} redeployed with failover due to link down {link.id}"
739
            )
740
741 1
        for evc in evcs_normal:
742 1
            emit_event(
743
                self.controller,
744
                "evc_affected_by_link_down",
745
                content={
746
                    "link_id": link.id,
747
                    "evc_id": evc.id,
748
                    "name": evc.name,
749
                    "metadata": evc.metadata,
750
                    "active": evc._active,
751
                    "enabled": evc._enabled,
752
                    "uni_a": evc.uni_a,
753
                    "uni_z": evc.uni_z,
754
                }
755
            )
756
757
        # After handling the hot path, check if new failover paths are needed.
758
        # Note that EVCs affected by link down will generate a KytosEvent for
759
        # deployed|redeployed, which will trigger the failover path setup.
760
        # Thus, we just need to further check the check_failover list
761 1
        for evc in check_failover:
762 1
            if evc.is_failover_path_affected_by_link(link):
763 1
                evc.setup_failover_path()
764
765 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
766 1
    def on_evc_affected_by_link_down(self, event):
767
        """Change circuit when link is down or under_mantenance."""
768
        self.handle_evc_affected_by_link_down(event)
769
770 1
    def handle_evc_affected_by_link_down(self, event):
771
        """Change circuit when link is down or under_mantenance."""
772 1
        evc = self.circuits.get(event.content["evc_id"])
773 1
        link_id = event.content['link_id']
774 1
        if not evc:
775 1
            return
776 1
        with evc.lock:
777 1
            result = evc.handle_link_down()
778 1
        event_name = "error_redeploy_link_down"
779 1
        if result:
780 1
            log.info(f"{evc} redeployed due to link down {link_id}")
781 1
            event_name = "redeployed_link_down"
782 1
        emit_event(self.controller, event_name, content={
783
            "evc_id": evc.id,
784
            "name": evc.name,
785
            "metadata": evc.metadata,
786
            "active": evc._active,
787
            "enabled": evc._enabled,
788
            "uni_a": evc.uni_a,
789
            "uni_z": evc.uni_z,
790
        })
791
792 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
793 1
    def on_evc_deployed(self, event):
794
        """Handle EVC deployed|redeployed_link_down."""
795
        self.handle_evc_deployed(event)
796
797 1
    def handle_evc_deployed(self, event):
798
        """Setup failover path on evc deployed."""
799 1
        evc = self.circuits.get(event.content["evc_id"])
800 1
        if not evc:
801 1
            return
802 1
        with evc.lock:
803 1
            evc.setup_failover_path()
804
805 1
    @listen_to("kytos/topology.topology_loaded")
806 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
807
        """Load EVCs once the topology is available."""
808
        self.load_all_evcs()
809
810 1
    def load_all_evcs(self):
811
        """Try to load all EVCs on startup."""
812 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
813 1
        for circuit_id, circuit in circuits:
814 1
            if circuit_id not in self.circuits:
815 1
                self._load_evc(circuit)
816
817 1
    def _load_evc(self, circuit_dict):
818
        """Load one EVC from mongodb to memory."""
819 1
        try:
820 1
            evc = self._evc_from_dict(circuit_dict)
821 1
        except ValueError as exception:
822 1
            log.error(
823
                f"Could not load EVC: dict={circuit_dict} error={exception}"
824
            )
825 1
            return None
826
827 1
        if evc.archived:
828 1
            return None
829 1
        evc.deactivate()
830 1
        evc.sync()
831 1
        self.circuits.setdefault(evc.id, evc)
832 1
        self.sched.add(evc)
833 1
        return evc
834
835 1
    @listen_to("kytos/flow_manager.flow.error")
836 1
    def on_flow_mod_error(self, event):
837
        """Handle flow mod errors related to an EVC."""
838
        self.handle_flow_mod_error(event)
839
840 1
    def handle_flow_mod_error(self, event):
841
        """Handle flow mod errors related to an EVC."""
842 1
        flow = event.content["flow"]
843 1
        command = event.content.get("error_command")
844 1
        if command != "add":
845
            return
846 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
847 1
        if evc:
848 1
            evc.remove_current_flows()
849
850 1
    def _evc_dict_with_instances(self, evc_dict):
851
        """Convert some dict values to instance of EVC classes.
852
853
        This method will convert: [UNI, Link]
854
        """
855 1
        data = evc_dict.copy()  # Do not modify the original dict
856 1
        for attribute, value in data.items():
857
            # Get multiple attributes.
858
            # Ex: uni_a, uni_z
859 1
            if "uni" in attribute:
860 1
                try:
861 1
                    data[attribute] = self._uni_from_dict(value)
862 1
                except ValueError as exception:
863 1
                    result = "Error creating UNI: Invalid value"
864 1
                    raise ValueError(result) from exception
865
866 1
            if attribute == "circuit_scheduler":
867 1
                data[attribute] = []
868 1
                for schedule in value:
869 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
870
871
            # Get multiple attributes.
872
            # Ex: primary_links,
873
            #     backup_links,
874
            #     current_links_cache,
875
            #     primary_links_cache,
876
            #     backup_links_cache
877 1
            if "links" in attribute:
878 1
                data[attribute] = [
879
                    self._link_from_dict(link) for link in value
880
                ]
881
882
            # Ex: current_path,
883
            #     primary_path,
884
            #     backup_path
885 1
            if "path" in attribute and attribute != "dynamic_backup_path":
886 1
                data[attribute] = Path(
887
                    [self._link_from_dict(link) for link in value]
888
                )
889
890 1
        return data
891
892 1
    def _evc_from_dict(self, evc_dict):
893 1
        data = self._evc_dict_with_instances(evc_dict)
894 1
        return EVC(self.controller, **data)
895
896 1
    def _uni_from_dict(self, uni_dict):
897
        """Return a UNI object from python dict."""
898 1
        if uni_dict is None:
899 1
            return False
900
901 1
        interface_id = uni_dict.get("interface_id")
902 1
        interface = self.controller.get_interface_by_id(interface_id)
903 1
        if interface is None:
904 1
            result = (
905
                "Error creating UNI:"
906
                + f"Could not instantiate interface {interface_id}"
907
            )
908 1
            raise ValueError(result) from ValueError
909
910 1
        tag_dict = uni_dict.get("tag", None)
911 1
        if tag_dict:
912 1
            tag = TAG.from_dict(tag_dict)
913
        else:
914 1
            tag = None
915 1
        uni = UNI(interface, tag)
916
917 1
        return uni
918
919 1
    def _link_from_dict(self, link_dict):
920
        """Return a Link object from python dict."""
921 1
        id_a = link_dict.get("endpoint_a").get("id")
922 1
        id_b = link_dict.get("endpoint_b").get("id")
923
924 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
925 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
926 1
        if not endpoint_a:
927 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
928 1
            raise ValueError(error_msg)
929 1
        if not endpoint_b:
930
            error_msg = f"Could not get interface endpoint_b id {id_b}"
931
            raise ValueError(error_msg)
932
933 1
        link = Link(endpoint_a, endpoint_b)
934 1
        if "metadata" in link_dict:
935 1
            link.extend_metadata(link_dict.get("metadata"))
936
937 1
        s_vlan = link.get_metadata("s_vlan")
938 1
        if s_vlan:
939 1
            tag = TAG.from_dict(s_vlan)
940 1
            if tag is False:
941
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
942
                raise ValueError(error_msg)
943 1
            link.update_metadata("s_vlan", tag)
944 1
        return link
945
946 1
    def _find_evc_by_schedule_id(self, schedule_id):
947
        """
948
        Find an EVC and CircuitSchedule based on schedule_id.
949
950
        :param schedule_id: Schedule ID
951
        :return: EVC and Schedule
952
        """
953 1
        circuits = self._get_circuits_buffer()
954 1
        found_schedule = None
955 1
        evc = None
956
957
        # pylint: disable=unused-variable
958 1
        for c_id, circuit in circuits.items():
959 1
            for schedule in circuit.circuit_scheduler:
960 1
                if schedule.id == schedule_id:
961 1
                    found_schedule = schedule
962 1
                    evc = circuit
963 1
                    break
964 1
            if found_schedule:
965 1
                break
966 1
        return evc, found_schedule
967
968 1
    def _get_circuits_buffer(self):
969
        """
970
        Return the circuit buffer.
971
972
        If the buffer is empty, try to load data from mongodb.
973
        """
974 1
        if not self.circuits:
975
            # Load circuits from mongodb to buffer
976 1
            circuits = self.mongo_controller.get_circuits()['circuits']
977 1
            for c_id, circuit in circuits.items():
978 1
                evc = self._evc_from_dict(circuit)
979 1
                self.circuits[c_id] = evc
980 1
        return self.circuits
981
982 1
    @staticmethod
983 1
    def _json_from_request(caller):
984
        """Return a json from request.
985
986
        If it was not possible to get a json from the request, log, for debug,
987
        who was the caller and the error that ocurred, and raise an
988
        Exception.
989
        """
990 1
        try:
991 1
            json_data = request.get_json()
992 1
        except ValueError as exception:
993
            log.error(exception)
994
            log.debug(f"{caller} result {exception} 400")
995
            raise BadRequest(str(exception)) from BadRequest
996 1
        except BadRequest:
997 1
            result = "The request is not a valid JSON."
998 1
            log.debug(f"{caller} result {result} 400")
999 1
            raise BadRequest(result) from BadRequest
1000 1
        if json_data is None:
1001 1
            result = "Content-Type must be application/json"
1002 1
            log.debug(f"{caller} result {result} 415")
1003 1
            raise UnsupportedMediaType(result)
1004
        return json_data
1005