Test Failed
Push — master ( 9ba056...9337aa )
by Antonio
04:22 queued 10s
created

build.main.Main.shutdown()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
ccs 1
cts 1
cp 1
crap 1
1
"""Main module of kytos/mef_eline Kytos Network Application.
2
3
NApp to provision circuits from user request.
4
"""
5 2
from flask import jsonify, request
6 2
from werkzeug.exceptions import BadRequest
7
8 2
from kytos.core import KytosNApp, log, rest
9 2
from kytos.core.events import KytosEvent
10 2
from kytos.core.helpers import listen_to
11 2
from kytos.core.interface import TAG, UNI
12 2
from kytos.core.link import Link
13 2
from napps.kytos.mef_eline.models import EVC, DynamicPathManager
14 2
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
15 2
from napps.kytos.mef_eline.storehouse import StoreHouse
16
17
18 2
class Main(KytosNApp):
19
    """Main class of amlight/mef_eline NApp.
20
21
    This class is the entry point for this napp.
22
    """
23
24 2
    def setup(self):
25
        """Replace the '__init__' method for the KytosNApp subclass.
26
27
        The setup method is automatically called by the controller when your
28
        application is loaded.
29
30
        So, if you have any setup routine, insert it here.
31
        """
32
        # object used to scheduler circuit events
33 2
        self.sched = Scheduler()
34
35
        # object to save and load circuits
36 2
        self.storehouse = StoreHouse(self.controller)
37
38
        # set the controller that will manager the dynamic paths
39 2
        DynamicPathManager.set_controller(self.controller)
40
41
        # dictionary of EVCs created. It acts as a circuit buffer.
42
        # Every create/update/delete must be synced to storehouse.
43 2
        self.circuits = {}
44
45
        # dictionary of EVCs by interface
46 2
        self._circuits_by_interface = {}
47
48 2
    def execute(self):
49
        """Execute once when the napp is running."""
50
51 2
    def shutdown(self):
52
        """Execute when your napp is unloaded.
53
54
        If you have some cleanup procedure, insert it here.
55
        """
56
57 2
    @rest('/v2/evc/', methods=['GET'])
58
    def list_circuits(self):
59
        """Endpoint to return all circuits stored."""
60 2
        log.debug('list_circuits /v2/evc')
61 2
        circuits = self.storehouse.get_data()
62 2
        if not circuits:
63 2
            result = {}
64 2
            status = 200
65
        else:
66 2
            result = circuits
67 2
            status = 200
68
69 2
        log.debug('list_circuits result %s items, %s', len(result), status)
70 2
        return jsonify(result), status
71
72 2
    @rest('/v2/evc/<circuit_id>', methods=['GET'])
73
    def get_circuit(self, circuit_id):
74
        """Endpoint to return a circuit based on id."""
75 2
        log.debug('get_circuit /v2/evc/%s', circuit_id)
76 2
        circuits = self.storehouse.get_data()
77
78 2
        try:
79 2
            result = circuits[circuit_id]
80 2
            status = 200
81 2
        except KeyError:
82 2
            result = {'response': f'circuit_id {circuit_id} not found'}
83 2
            status = 404
84
85 2
        log.debug('get_circuit result %s %s', result, status)
86 2
        return jsonify(result), status
87
88 2
    @rest('/v2/evc/', methods=['POST'])
89
    def create_circuit(self):
90
        """Try to create a new circuit.
91
92
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
93
        UNI_Z's requested C-VID are available from the interfaces' pools. This
94
        is checked when creating the UNI object.
95
96
        Then, E-Line NApp requests a primary and a backup path to the
97
        Pathfinder NApp using the attributes primary_links and backup_links
98
        submitted via REST
99
100
        # For each link composing paths in #3:
101
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
102
        #  - Using the S-VID obtained, generate abstract flow entries to be
103
        #    sent to FlowManager
104
105
        Push abstract flow entries to FlowManager and FlowManager pushes
106
        OpenFlow entries to datapaths
107
108
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
109
        creation
110
111
        Finnaly, notify user of the status of its request.
112
        """
113
        # Try to create the circuit object
114 2
        log.debug('create_circuit /v2/evc/')
115 2
        data = request.get_json()
116
117 2
        if not data:
118 2
            result = "Bad request: The request do not have a json."
119 2
            status = 400
120 2
            log.debug('create_circuit result %s %s', result, status)
121 2
            return jsonify(result), status
122 2
        try:
123 2
            evc = self._evc_from_dict(data)
124
        except ValueError as exception:
125
            log.error(exception)
126
            result = "Bad request: {}".format(exception)
127
            status = 400
128
            log.debug('create_circuit result %s %s', result, status)
129
            return jsonify(result), status
130
131
        # verify duplicated evc
132 2
        if self._is_duplicated_evc(evc):
133 2
            result = "Not Acceptable: This evc already exists."
134 2
            status = 409
135 2
            log.debug('create_circuit result %s %s', result, status)
136 2
            return jsonify(result), status
137
138
        # store circuit in dictionary
139 2
        self.circuits[evc.id] = evc
140
141
        # save circuit
142 2
        self.storehouse.save_evc(evc)
143
144
        # Schedule the circuit deploy
145 2
        self.sched.add(evc)
146
147
        # Circuit has no schedule, deploy now
148 2
        if not evc.circuit_scheduler:
149 2
            evc.deploy()
150
151
        # Notify users
152 2
        event = KytosEvent(name='kytos.mef_eline.created',
153
                           content=evc.as_dict())
154 2
        self.controller.buffers.app.put(event)
155
156 2
        result = {"circuit_id": evc.id}
157 2
        status = 201
158 2
        log.debug('create_circuit result %s %s', result, status)
159 2
        return jsonify(result), status
160
161 2
    @rest('/v2/evc/<circuit_id>', methods=['PATCH'])
162
    def update(self, circuit_id):
163
        """Update a circuit based on payload.
164
165
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
166
        """
167
        log.debug('update /v2/evc/%s', circuit_id)
168
        try:
169
            evc = self.circuits[circuit_id]
170
            data = request.get_json()
171
            evc.update(**data)
172
        except ValueError as exception:
173
            log.error(exception)
174
            result = {'response': 'Bad Request: {}'.format(exception)}
175
            status = 400
176
        except TypeError:
177
            result = {'response': 'Content-Type must be application/json'}
178
            status = 415
179
        except BadRequest:
180
            response = 'Bad Request: The request is not a valid JSON.'
181
            result = {'response': response}
182
            status = 400
183
        except KeyError:
184
            result = {'response': f'circuit_id {circuit_id} not found'}
185
            status = 404
186
        else:
187
            evc.sync()
188
            result = {evc.id: evc.as_dict()}
189
            status = 200
190
191
        log.debug('update result %s %s', result, status)
192
        return jsonify(result), status
193
194 2
    @rest('/v2/evc/<circuit_id>', methods=['DELETE'])
195
    def delete_circuit(self, circuit_id):
196
        """Remove a circuit.
197
198
        First, the flows are removed from the switches, and then the EVC is
199
        disabled.
200
        """
201
        log.debug('delete_circuit /v2/evc/%s', circuit_id)
202
        try:
203
            evc = self.circuits[circuit_id]
204
        except KeyError:
205
            result = {'response': f'circuit_id {circuit_id} not found'}
206
            status = 404
207
        else:
208
            log.info('Removing %s', evc)
209
            if evc.archived:
210
                result = {'response': f'Circuit {circuit_id} already removed'}
211
                status = 404
212
            else:
213
                evc.remove_current_flows()
214
                evc.deactivate()
215
                evc.disable()
216
                self.sched.remove(evc)
217
                evc.archive()
218
                evc.sync()
219
                log.info('EVC removed. %s', evc)
220
                result = {'response': f'Circuit {circuit_id} removed'}
221
                status = 200
222
223
        log.debug('delete_circuit result %s %s', result, status)
224
        return jsonify(result), status
225
226 2
    @rest('/v2/evc/schedule', methods=['GET'])
227
    def list_schedules(self):
228
        """Endpoint to return all schedules stored for all circuits.
229
230
        Return a JSON with the following template:
231
        [{"schedule_id": <schedule_id>,
232
         "circuit_id": <circuit_id>,
233
         "schedule": <schedule object>}]
234
        """
235 2
        log.debug('list_schedules /v2/evc/schedule')
236 2
        circuits = self.storehouse.get_data().values()
237 2
        if not circuits:
238 2
            result = {}
239 2
            status = 200
240 2
            return jsonify(result), status
241
242 2
        result = []
243 2
        status = 200
244 2
        for circuit in circuits:
245 2
            circuit_scheduler = circuit.get("circuit_scheduler")
246 2
            if circuit_scheduler:
247 2
                for scheduler in circuit_scheduler:
248 2
                    value = {"schedule_id": scheduler.get("id"),
249
                             "circuit_id": circuit.get("id"),
250
                             "schedule": scheduler}
251 2
                    result.append(value)
252
253 2
        log.debug('list_schedules result %s %s', result, status)
254 2
        return jsonify(result), status
255
256 2
    @rest('/v2/evc/schedule/', methods=['POST'])
257
    def create_schedule(self):
258
        """
259
        Create a new schedule for a given circuit.
260
261
        This service do no check if there are conflicts with another schedule.
262
        Payload example:
263
            {
264
              "circuit_id":"aa:bb:cc",
265
              "schedule": {
266
                "date": "2019-08-07T14:52:10.967Z",
267
                "interval": "string",
268
                "frequency": "1 * * * *",
269
                "action": "create"
270
              }
271
            }
272
        """
273 2
        log.debug('create_schedule /v2/evc/schedule/')
274 2
        try:
275
            # Try to create the circuit object
276 2
            json_data = request.get_json()
277
278 2
            circuit_id = json_data.get("circuit_id")
279 2
            schedule_data = json_data.get("schedule")
280
281 2
            if not json_data:
282
                result = "Bad request: The request does not have a json."
283
                status = 400
284
                log.debug('create_schedule result %s %s', result, status)
285
                return jsonify(result), status
286 2
            if not circuit_id:
287
                result = "Bad request: Missing circuit_id."
288
                status = 400
289
                log.debug('create_schedule result %s %s', result, status)
290
                return jsonify(result), status
291 2
            if not schedule_data:
292
                result = "Bad request: Missing schedule data."
293
                status = 400
294
                log.debug('create_schedule result %s %s', result, status)
295
                return jsonify(result), status
296
297
            # Get EVC from circuits buffer
298 2
            circuits = self._get_circuits_buffer()
299
300
            # get the circuit
301 2
            evc = circuits.get(circuit_id)
302
303
            # get the circuit
304 2
            if not evc:
305
                result = {'response': f'circuit_id {circuit_id} not found'}
306
                status = 404
307
                log.debug('create_schedule result %s %s', result, status)
308
                return jsonify(result), status
309
            # Can not modify circuits deleted and archived
310 2
            if evc.archived:
311
                result = {'response': f'Circuit is archived.'
312
                                      f'Update is forbidden.'}
313
                status = 403
314
                log.debug('create_schedule result %s %s', result, status)
315
                return jsonify(result), status
316
317
            # new schedule from dict
318 2
            new_schedule = CircuitSchedule.from_dict(schedule_data)
319
320
            # If there is no schedule, create the list
321 2
            if not evc.circuit_scheduler:
322
                evc.circuit_scheduler = []
323
324
            # Add the new schedule
325 2
            evc.circuit_scheduler.append(new_schedule)
326
327
            # Add schedule job
328 2
            self.sched.add_circuit_job(evc, new_schedule)
329
330
            # save circuit to storehouse
331 2
            evc.sync()
332
333 2
            result = new_schedule.as_dict()
334 2
            status = 201
335
336
        except ValueError as exception:
337
            log.error(exception)
338
            result = {'response': 'Bad Request: {}'.format(exception)}
339
            status = 400
340
        except TypeError:
341
            result = {'response': 'Content-Type must be application/json'}
342
            status = 415
343
        except BadRequest:
344
            response = 'Bad Request: The request is not a valid JSON.'
345
            result = {'response': response}
346
            status = 400
347
348 2
        log.debug('create_schedule result %s %s', result, status)
349 2
        return jsonify(result), status
350
351 2
    @rest('/v2/evc/schedule/<schedule_id>', methods=['PATCH'])
352
    def update_schedule(self, schedule_id):
353
        """Update a schedule.
354
355
        Change all attributes from the given schedule from a EVC circuit.
356
        The schedule ID is preserved as default.
357
        Payload example:
358
            {
359
              "date": "2019-08-07T14:52:10.967Z",
360
              "interval": "string",
361
              "frequency": "1 * * *",
362
              "action": "create"
363
            }
364
        """
365 2
        log.debug('update_schedule /v2/evc/schedule/%s', schedule_id)
366 2
        try:
367
            # Try to find a circuit schedule
368 2
            evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
369
370
            # Can not modify circuits deleted and archived
371 2
            if not found_schedule:
372
                result = {'response': f'schedule_id {schedule_id} not found'}
373
                status = 404
374
                log.debug('update_schedule result %s %s', result, status)
375
                return jsonify(result), status
376 2
            if evc.archived:
377 2
                result = {'response': f'Circuit is archived.'
378
                                      f'Update is forbidden.'}
379 2
                status = 403
380 2
                log.debug('update_schedule result %s %s', result, status)
381 2
                return jsonify(result), status
382
383 2
            data = request.get_json()
384
385 2
            new_schedule = CircuitSchedule.from_dict(data)
386 2
            new_schedule.id = found_schedule.id
387
            # Remove the old schedule
388 2
            evc.circuit_scheduler.remove(found_schedule)
389
            # Append the modified schedule
390 2
            evc.circuit_scheduler.append(new_schedule)
391
392
            # Cancel all schedule jobs
393 2
            self.sched.cancel_job(found_schedule.id)
394
            # Add the new circuit schedule
395 2
            self.sched.add_circuit_job(evc, new_schedule)
396
            # Save EVC to the storehouse
397 2
            evc.sync()
398
399 2
            result = new_schedule.as_dict()
400 2
            status = 200
401
402
        except ValueError as exception:
403
            log.error(exception)
404
            result = {'response': 'Bad Request: {}'.format(exception)}
405
            status = 400
406
        except TypeError:
407
            result = {'response': 'Content-Type must be application/json'}
408
            status = 415
409
        except BadRequest:
410
            result = {'response':
411
                      'Bad Request: The request is not a valid JSON.'}
412
            status = 400
413
414 2
        log.debug('update_schedule result %s %s', result, status)
415 2
        return jsonify(result), status
416
417 2
    @rest('/v2/evc/schedule/<schedule_id>', methods=['DELETE'])
418
    def delete_schedule(self, schedule_id):
419
        """Remove a circuit schedule.
420
421
        Remove the Schedule from EVC.
422
        Remove the Schedule from cron job.
423
        Save the EVC to the Storehouse.
424
        """
425 2
        log.debug('delete_schedule /v2/evc/schedule/%s', schedule_id)
426 2
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
427
428
        # Can not modify circuits deleted and archived
429 2
        if not found_schedule:
430
            result = {'response': f'schedule_id {schedule_id} not found'}
431
            status = 404
432
            log.debug('delete_schedule result %s %s', result, status)
433
            return jsonify(result), status
434
435 2
        if evc.archived:
436 2
            result = {'response': f'Circuit is archived. Update is forbidden.'}
437 2
            status = 403
438 2
            log.debug('delete_schedule result %s %s', result, status)
439 2
            return jsonify(result), status
440
441
        # Remove the old schedule
442 2
        evc.circuit_scheduler.remove(found_schedule)
443
444
        # Cancel all schedule jobs
445 2
        self.sched.cancel_job(found_schedule.id)
446
        # Save EVC to the storehouse
447 2
        evc.sync()
448
449 2
        result = "Schedule removed"
450 2
        status = 200
451
452 2
        log.debug('delete_schedule result %s %s', result, status)
453 2
        return jsonify(result), status
454
455 2
    def _is_duplicated_evc(self, evc):
456
        """Verify if the circuit given is duplicated with the stored evcs.
457
458
        Args:
459
            evc (EVC): circuit to be analysed.
460
461
        Returns:
462
            boolean: True if the circuit is duplicated, otherwise False.
463
464
        """
465 2
        for circuit in self.circuits.values():
466 2
            if not circuit.archived and circuit == evc:
467 2
                return True
468 2
        return False
469
470 2
    @listen_to('kytos/topology.link_up')
471
    def handle_link_up(self, event):
472
        """Change circuit when link is up or end_maintenance."""
473
        log.debug("Event handle_link_up %s", event)
474
        for evc in self.circuits.values():
475
            if evc.is_enabled() and not evc.archived:
476
                evc.handle_link_up(event.content['link'])
477
478 2
    @listen_to('kytos/topology.link_down')
479
    def handle_link_down(self, event):
480
        """Change circuit when link is down or under_mantenance."""
481
        log.debug("Event handle_link_down %s", event)
482
        for evc in self.circuits.values():
483
            if evc.is_affected_by_link(event.content['link']):
484
                log.info('handling evc %s' % evc)
485
                evc.handle_link_down()
486
487 2
    def load_circuits_by_interface(self, circuits):
488
        """Load circuits in storehouse for in-memory dictionary."""
489 2
        for circuit_id, circuit in circuits.items():
490 2
            intf_a = circuit['uni_a']['interface_id']
491 2
            self.add_to_dict_of_sets(intf_a, circuit_id)
492 2
            intf_z = circuit['uni_z']['interface_id']
493 2
            self.add_to_dict_of_sets(intf_z, circuit_id)
494 2
            for path in ('current_path', 'primary_path', 'backup_path'):
495 2
                for link in circuit[path]:
496 2
                    intf_a = link['endpoint_a']['id']
497 2
                    self.add_to_dict_of_sets(intf_a, circuit_id)
498 2
                    intf_b = link['endpoint_b']['id']
499 2
                    self.add_to_dict_of_sets(intf_b, circuit_id)
500
501 2
    def add_to_dict_of_sets(self, intf, circuit_id):
502
        """Add a single item to the dictionary of circuits by interface."""
503 2
        if intf not in self._circuits_by_interface:
504 2
            self._circuits_by_interface[intf] = set()
505 2
        self._circuits_by_interface[intf].add(circuit_id)
506
507 2
    @listen_to('kytos/topology.port.created')
508
    def load_evcs(self, event):
509
        """Try to load the unloaded EVCs from storehouse."""
510
        log.debug("Event load_evcs %s", event)
511
        circuits = self.storehouse.get_data()
512
        if not self._circuits_by_interface:
513
            self.load_circuits_by_interface(circuits)
514
515
        interface_id = '{}:{}'.format(event.content['switch'],
516
                                      event.content['port'])
517
518
        for circuit_id in self._circuits_by_interface.get(interface_id, []):
519
            if circuit_id in circuits and circuit_id not in self.circuits:
520
                try:
521
                    evc = self._evc_from_dict(circuits[circuit_id])
522
                except ValueError as exception:
523
                    log.info(
524
                        f'Could not load EVC {circuit_id} because {exception}')
525
                    continue
526
                log.info(f'Loading EVC {circuit_id}')
527
                if evc.archived:
528
                    continue
529
                if evc.is_enabled():
530
                    log.info(f'Trying to deploy EVC {circuit_id}')
531
                    evc.deploy()
532
                self.circuits[circuit_id] = evc
533
                self.sched.add(evc)
534
535 2
    def _evc_from_dict(self, evc_dict):
536
        """Convert some dict values to instance of EVC classes.
537
538
        This method will convert: [UNI, Link]
539
        """
540 2
        data = evc_dict.copy()  # Do not modify the original dict
541
542 2
        for attribute, value in data.items():
543
            # Get multiple attributes.
544
            # Ex: uni_a, uni_z
545 2
            if 'uni' in attribute:
546 2
                try:
547 2
                    data[attribute] = self._uni_from_dict(value)
548
                except ValueError as exc:
549
                    raise ValueError(f'Error creating UNI: {exc}')
550
551 2
            if attribute == 'circuit_scheduler':
552 2
                data[attribute] = []
553 2
                for schedule in value:
554 2
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
555
556
            # Get multiple attributes.
557
            # Ex: primary_links,
558
            #     backup_links,
559
            #     current_links_cache,
560
            #     primary_links_cache,
561
            #     backup_links_cache
562 2
            if 'links' in attribute:
563 2
                data[attribute] = [self._link_from_dict(link)
564
                                   for link in value]
565
566
            # Get multiple attributes.
567
            # Ex: current_path,
568
            #     primary_path,
569
            #     backup_path
570 2
            if 'path' in attribute and attribute != 'dynamic_backup_path':
571 2
                data[attribute] = [self._link_from_dict(link)
572
                                   for link in value]
573
574 2
        return EVC(self.controller, **data)
575
576 2
    def _uni_from_dict(self, uni_dict):
577
        """Return a UNI object from python dict."""
578
        if uni_dict is None:
579
            return False
580
581
        interface_id = uni_dict.get("interface_id")
582
        interface = self.controller.get_interface_by_id(interface_id)
583
        if interface is None:
584
            raise ValueError(f'Could not instantiate interface {interface_id}')
585
586
        tag_dict = uni_dict.get("tag")
587
        tag = TAG.from_dict(tag_dict)
588
        if tag is False:
589
            raise ValueError(f'Could not instantiate tag from dict {tag_dict}')
590
591
        uni = UNI(interface, tag)
592
593
        return uni
594
595 2
    def _link_from_dict(self, link_dict):
596
        """Return a Link object from python dict."""
597 2
        id_a = link_dict.get('endpoint_a').get('id')
598 2
        id_b = link_dict.get('endpoint_b').get('id')
599
600 2
        endpoint_a = self.controller.get_interface_by_id(id_a)
601 2
        endpoint_b = self.controller.get_interface_by_id(id_b)
602
603 2
        link = Link(endpoint_a, endpoint_b)
604 2
        if 'metadata' in link_dict:
605
            link.extend_metadata(link_dict.get('metadata'))
606
607 2
        s_vlan = link.get_metadata('s_vlan')
608 2
        if s_vlan:
609
            tag = TAG.from_dict(s_vlan)
610
            if tag is False:
611
                error_msg = f'Could not instantiate tag from dict {s_vlan}'
612
                raise ValueError(error_msg)
613
            link.update_metadata('s_vlan', tag)
614 2
        return link
615
616 2
    def _find_evc_by_schedule_id(self, schedule_id):
617
        """
618
        Find an EVC and CircuitSchedule based on schedule_id.
619
620
        :param schedule_id: Schedule ID
621
        :return: EVC and Schedule
622
        """
623 2
        circuits = self._get_circuits_buffer()
624 2
        found_schedule = None
625 2
        evc = None
626
627
        # pylint: disable=unused-variable
628 2
        for c_id, circuit in circuits.items():
629 2
            for schedule in circuit.circuit_scheduler:
630 2
                if schedule.id == schedule_id:
631 2
                    found_schedule = schedule
632 2
                    evc = circuit
633 2
                    break
634 2
            if found_schedule:
635 2
                break
636 2
        return evc, found_schedule
637
638 2
    def _get_circuits_buffer(self):
639
        """
640
        Return the circuit buffer.
641
642
        If the buffer is empty, try to load data from storehouse.
643
        """
644 2
        if not self.circuits:
645
            # Load storehouse circuits to buffer
646 2
            circuits = self.storehouse.get_data()
647 2
            for c_id, circuit in circuits.items():
648 2
                evc = self._evc_from_dict(circuit)
649 2
                self.circuits[c_id] = evc
650
        return self.circuits
651