Test Failed
Push — master ( 437722...e67027 )
by Antonio
03:54 queued 11s
created

build.main.Main.load_evcs()   C

Complexity

Conditions 9

Size

Total Lines 28
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 0
Metric Value
cc 9
eloc 25
nop 2
dl 0
loc 28
ccs 0
cts 0
cp 0
crap 90
rs 6.6666
c 0
b 0
f 0
1
"""Main module of kytos/mef_eline Kytos Network Application.
2
3
NApp to provision circuits from user request.
4
"""
5 1
from flask import jsonify, request
6
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
7 1
                                 MethodNotAllowed, NotFound,
8 1
                                 UnsupportedMediaType)
9 1
10 1
from kytos.core import KytosNApp, log, rest
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.core.helpers import listen_to
13 1
from kytos.core.interface import TAG, UNI
14 1
from kytos.core.link import Link
15
from napps.kytos.mef_eline.models import EVC, DynamicPathManager, Path
16
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
17 1
from napps.kytos.mef_eline.storehouse import StoreHouse
18
19
20
class Main(KytosNApp):
21
    """Main class of amlight/mef_eline NApp.
22
23 1
    This class is the entry point for this napp.
24
    """
25
26
    def setup(self):
27
        """Replace the '__init__' method for the KytosNApp subclass.
28
29
        The setup method is automatically called by the controller when your
30
        application is loaded.
31
32 1
        So, if you have any setup routine, insert it here.
33
        """
34
        # object used to scheduler circuit events
35 1
        self.sched = Scheduler()
36
37
        # object to save and load circuits
38 1
        self.storehouse = StoreHouse(self.controller)
39
40 1
        # set the controller that will manager the dynamic paths
41
        DynamicPathManager.set_controller(self.controller)
42
43 1
        # dictionary of EVCs created. It acts as a circuit buffer.
44
        # Every create/update/delete must be synced to storehouse.
45
        self.circuits = {}
46
47
        # dictionary of EVCs by interface
48
        self._circuits_by_interface = {}
49 1
50
    def execute(self):
51
        """Execute once when the napp is running."""
52 1
53 1
    def shutdown(self):
54 1
        """Execute when your napp is unloaded.
55
56 1
        If you have some cleanup procedure, insert it here.
57
        """
58 1
59
    @rest('/v2/evc/', methods=['GET'])
60
    def list_circuits(self):
61 1
        """Endpoint to return circuits stored.
62
63 1
        If archived is set to True return all circuits, else only the ones
64 1
        not archived.
65 1
        """
66
        log.debug('list_circuits /v2/evc')
67 1
        archived = request.args.get('archived', False)
68 1
        circuits = self.storehouse.get_data()
69
        if not circuits:
70 1
            return jsonify({}), 200
71
        if archived:
72 1
            return jsonify(circuits), 200
73
        return jsonify({circuit_id: circuit
74
                        for circuit_id, circuit in circuits.items()
75
                        if not circuit.get('archived', False)}), 200
76
77
    @rest('/v2/evc/<circuit_id>', methods=['GET'])
78
    def get_circuit(self, circuit_id):
79
        """Endpoint to return a circuit based on id."""
80
        log.debug('get_circuit /v2/evc/%s', circuit_id)
81
        circuits = self.storehouse.get_data()
82
83
        try:
84
            result = circuits[circuit_id]
85
        except KeyError:
86
            result = f'circuit_id {circuit_id} not found'
87
            log.debug('get_circuit result %s %s', result, 404)
88
            raise NotFound(result)
89
90
        status = 200
91
        log.debug('get_circuit result %s %s', result, status)
92
        return jsonify(result), status
93
94
    @rest('/v2/evc/', methods=['POST'])
95
    def create_circuit(self):
96
        """Try to create a new circuit.
97
98 1
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
99
        UNI_Z's requested C-VID are available from the interfaces' pools. This
100 1
        is checked when creating the UNI object.
101 1
102
        Then, E-Line NApp requests a primary and a backup path to the
103 1
        Pathfinder NApp using the attributes primary_links and backup_links
104 1
        submitted via REST
105
106
        # For each link composing paths in #3:
107
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
108
        #  - Using the S-VID obtained, generate abstract flow entries to be
109 1
        #    sent to FlowManager
110
111
        Push abstract flow entries to FlowManager and FlowManager pushes
112
        OpenFlow entries to datapaths
113 1
114
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
115
        creation
116 1
117
        Finnaly, notify user of the status of its request.
118
        """
119 1
        # Try to create the circuit object
120 1
        log.debug('create_circuit /v2/evc/')
121
        try:
122
            data = request.get_json()
123 1
        except BadRequest:
124
            result = 'The request body is not a well-formed JSON.'
125 1
            log.debug('create_circuit result %s %s', result, 400)
126
            raise BadRequest(result)
127 1
128
        if data is None:
129 1
            result = 'The request body mimetype is not application/json.'
130
            log.debug('create_circuit result %s %s', result, 415)
131
            raise UnsupportedMediaType(result)
132
        try:
133
            evc = self._evc_from_dict(data)
134
        except ValueError as exception:
135
            log.debug('create_circuit result %s %s', exception, 400)
136
            raise BadRequest(str(exception))
137
138
        # verify duplicated evc
139
        if self._is_duplicated_evc(evc):
140
            result = "The EVC already exists."
141
            log.debug('create_circuit result %s %s', result, 409)
142
            raise Conflict(result)
143
144
        # store circuit in dictionary
145
        self.circuits[evc.id] = evc
146
147
        # save circuit
148
        self.storehouse.save_evc(evc)
149
150
        # Schedule the circuit deploy
151
        self.sched.add(evc)
152
153
        # Circuit has no schedule, deploy now
154 1
        if not evc.circuit_scheduler:
155
            evc.deploy()
156
157
        # Notify users
158
        event = KytosEvent(name='kytos.mef_eline.created',
159
                           content=evc.as_dict())
160
        self.controller.buffers.app.put(event)
161
162
        result = {"circuit_id": evc.id}
163
        status = 201
164
        log.debug('create_circuit result %s %s', result, status)
165
        return jsonify(result), status
166
167
    @rest('/v2/evc/<circuit_id>', methods=['PATCH'])
168
    def update(self, circuit_id):
169
        """Update a circuit based on payload.
170 1
171
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
172
        """
173
        log.debug('update /v2/evc/%s', circuit_id)
174
        try:
175
            evc = self.circuits[circuit_id]
176
        except KeyError:
177
            result = f'circuit_id {circuit_id} not found'
178
            log.debug('update result %s %s', result, 404)
179
            raise NotFound(result)
180 1
181
        if evc.archived:
182
            result = f'Can\'t update archived EVC'
183
            log.debug('update result %s %s', result, 405)
184
            raise MethodNotAllowed(['GET'], result)
185
186
        try:
187
            data = request.get_json()
188
        except BadRequest:
189 1
            result = 'The request body is not a well-formed JSON.'
190
            log.debug('update result %s %s', result, 400)
191 1
            raise BadRequest(result)
192
        if data is None:
193
            result = 'The request body mimetype is not application/json.'
194
            log.debug('update result %s %s', result, 415)
195
            raise UnsupportedMediaType(result)
196
197
        try:
198
            enable, path = \
199
                evc.update(**self._evc_dict_with_instances(data))
200
        except ValueError as exception:
201
            log.error(exception)
202
            log.debug('update result %s %s', exception, 400)
203
            raise BadRequest(str(exception))
204
205 1
        if evc.is_active():
206
            if enable is False:  # disable if active
207
                evc.remove()
208
            elif path is not None:  # redeploy if active
209
                evc.remove()
210
                evc.deploy()
211
        else:
212
            if enable is True:  # enable if inactive
213
                evc.deploy()
214
        result = {evc.id: evc.as_dict()}
215
        status = 200
216
217
        log.debug('update result %s %s', result, status)
218
        return jsonify(result), status
219
220
    @rest('/v2/evc/<circuit_id>', methods=['DELETE'])
221 1
    def delete_circuit(self, circuit_id):
222
        """Remove a circuit.
223
224
        First, the flows are removed from the switches, and then the EVC is
225
        disabled.
226 1
        """
227
        log.debug('delete_circuit /v2/evc/%s', circuit_id)
228 1
        try:
229
            evc = self.circuits[circuit_id]
230 1
        except KeyError:
231 1
            result = f'circuit_id {circuit_id} not found'
232 1
            log.debug('delete_circuit result %s %s', result, 404)
233
            raise NotFound(result)
234
235
        if evc.archived:
236 1
            result = f'Circuit {circuit_id} already removed'
237
            log.debug('delete_circuit result %s %s', result, 404)
238
            raise NotFound(result)
239
240
        log.info('Removing %s', evc)
241 1
        evc.remove_current_flows()
242
        evc.deactivate()
243
        evc.disable()
244
        self.sched.remove(evc)
245 1
        evc.archive()
246
        evc.sync()
247
        log.info('EVC removed. %s', evc)
248
        result = {'response': f'Circuit {circuit_id} removed'}
249
        status = 200
250 1
251
        log.debug('delete_circuit result %s %s', result, status)
252 1
        return jsonify(result), status
253
254
    @rest('/v2/evc/schedule', methods=['GET'])
255
    def list_schedules(self):
256
        """Endpoint to return all schedules stored for all circuits.
257
258
        Return a JSON with the following template:
259
        [{"schedule_id": <schedule_id>,
260
         "circuit_id": <circuit_id>,
261
         "schedule": <schedule object>}]
262
        """
263
        log.debug('list_schedules /v2/evc/schedule')
264
        circuits = self.storehouse.get_data().values()
265
        if not circuits:
266
            result = {}
267
            status = 200
268
            return jsonify(result), status
269
270
        result = []
271 1
        status = 200
272
        for circuit in circuits:
273
            circuit_scheduler = circuit.get("circuit_scheduler")
274
            if circuit_scheduler:
275
                for scheduler in circuit_scheduler:
276
                    value = {"schedule_id": scheduler.get("id"),
277
                             "circuit_id": circuit.get("id"),
278
                             "schedule": scheduler}
279
                    result.append(value)
280
281
        log.debug('list_schedules result %s %s', result, status)
282
        return jsonify(result), status
283
284
    @rest('/v2/evc/schedule/', methods=['POST'])
285
    def create_schedule(self):
286
        """
287
        Create a new schedule for a given circuit.
288
289
        This service do no check if there are conflicts with another schedule.
290
        Payload example:
291
            {
292
              "circuit_id":"aa:bb:cc",
293
              "schedule": {
294
                "date": "2019-08-07T14:52:10.967Z",
295
                "interval": "string",
296
                "frequency": "1 * * * *",
297
                "action": "create"
298
              }
299
            }
300
        """
301
        log.debug('create_schedule /v2/evc/schedule/')
302
303
        json_data = self.json_from_request('create_schedule')
304
        try:
305
            circuit_id = json_data['circuit_id']
306
        except TypeError:
307
            result = 'The payload should have a dictionary.'
308
            log.debug('create_schedule result %s %s', result, 400)
309
            raise BadRequest(result)
310
        except KeyError:
311
            result = 'Missing circuit_id.'
312
            log.debug('create_schedule result %s %s', result, 400)
313
            raise BadRequest(result)
314
315
        try:
316
            schedule_data = json_data['schedule']
317
        except KeyError:
318
            result = 'Missing schedule data.'
319
            log.debug('create_schedule result %s %s', result, 400)
320
            raise BadRequest(result)
321
322
        # Get EVC from circuits buffer
323
        circuits = self._get_circuits_buffer()
324
325
        # get the circuit
326
        evc = circuits.get(circuit_id)
327
328
        # get the circuit
329
        if not evc:
330
            result = f'circuit_id {circuit_id} not found'
331
            log.debug('create_schedule result %s %s', result, 404)
332
            raise NotFound(result)
333
        # Can not modify circuits deleted and archived
334
        if evc.archived:
335
            result = f'Circuit {circuit_id} is archived. Update is forbidden.'
336
            log.debug('create_schedule result %s %s', result, 403)
337
            raise Forbidden(result)
338
339
        # new schedule from dict
340
        new_schedule = CircuitSchedule.from_dict(schedule_data)
341
342
        # If there is no schedule, create the list
343
        if not evc.circuit_scheduler:
344
            evc.circuit_scheduler = []
345
346
        # Add the new schedule
347
        evc.circuit_scheduler.append(new_schedule)
348
349
        # Add schedule job
350
        self.sched.add_circuit_job(evc, new_schedule)
351
352
        # save circuit to storehouse
353
        evc.sync()
354
355
        result = new_schedule.as_dict()
356
        status = 201
357
358
        log.debug('create_schedule result %s %s', result, status)
359
        return jsonify(result), status
360
361
    @rest('/v2/evc/schedule/<schedule_id>', methods=['PATCH'])
362
    def update_schedule(self, schedule_id):
363
        """Update a schedule.
364
365
        Change all attributes from the given schedule from a EVC circuit.
366
        The schedule ID is preserved as default.
367
        Payload example:
368
            {
369
              "date": "2019-08-07T14:52:10.967Z",
370
              "interval": "string",
371
              "frequency": "1 * * *",
372
              "action": "create"
373
            }
374
        """
375
        log.debug('update_schedule /v2/evc/schedule/%s', schedule_id)
376
377
        # Try to find a circuit schedule
378
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
379
380
        # Can not modify circuits deleted and archived
381
        if not found_schedule:
382
            result = f'schedule_id {schedule_id} not found'
383
            log.debug('update_schedule result %s %s', result, 404)
384
            raise NotFound(result)
385
        if evc.archived:
386
            result = f'Circuit {evc.id} is archived. Update is forbidden.'
387
            log.debug('update_schedule result %s %s', result, 403)
388
            raise Forbidden(result)
389
390
        data = self.json_from_request('update_schedule')
391
392
        new_schedule = CircuitSchedule.from_dict(data)
393
        new_schedule.id = found_schedule.id
394
        # Remove the old schedule
395
        evc.circuit_scheduler.remove(found_schedule)
396
        # Append the modified schedule
397
        evc.circuit_scheduler.append(new_schedule)
398
399
        # Cancel all schedule jobs
400
        self.sched.cancel_job(found_schedule.id)
401
        # Add the new circuit schedule
402
        self.sched.add_circuit_job(evc, new_schedule)
403
        # Save EVC to the storehouse
404
        evc.sync()
405
406
        result = new_schedule.as_dict()
407
        status = 200
408
409
        log.debug('update_schedule result %s %s', result, status)
410
        return jsonify(result), status
411
412
    @rest('/v2/evc/schedule/<schedule_id>', methods=['DELETE'])
413
    def delete_schedule(self, schedule_id):
414
        """Remove a circuit schedule.
415
416
        Remove the Schedule from EVC.
417
        Remove the Schedule from cron job.
418
        Save the EVC to the Storehouse.
419
        """
420
        log.debug('delete_schedule /v2/evc/schedule/%s', schedule_id)
421
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
422
423
        # Can not modify circuits deleted and archived
424
        if not found_schedule:
425
            result = f'schedule_id {schedule_id} not found'
426
            log.debug('delete_schedule result %s %s', result, 404)
427
            raise NotFound(result)
428
429
        if evc.archived:
430
            result = f'Circuit {evc.id} is archived. Update is forbidden.'
431
            log.debug('delete_schedule result %s %s', result, 403)
432
            raise Forbidden(result)
433
434
        # Remove the old schedule
435
        evc.circuit_scheduler.remove(found_schedule)
436
437
        # Cancel all schedule jobs
438
        self.sched.cancel_job(found_schedule.id)
439
        # Save EVC to the storehouse
440
        evc.sync()
441
442
        result = "Schedule removed"
443
        status = 200
444
445
        log.debug('delete_schedule result %s %s', result, status)
446
        return jsonify(result), status
447
448
    def _is_duplicated_evc(self, evc):
449
        """Verify if the circuit given is duplicated with the stored evcs.
450
451
        Args:
452
            evc (EVC): circuit to be analysed.
453
454
        Returns:
455
            boolean: True if the circuit is duplicated, otherwise False.
456
457
        """
458
        for circuit in self.circuits.values():
459
            if not circuit.archived and circuit == evc:
460
                return True
461
        return False
462
463
    @listen_to('kytos/topology.link_up')
464
    def handle_link_up(self, event):
465
        """Change circuit when link is up or end_maintenance."""
466
        log.debug("Event handle_link_up %s", event)
467
        for evc in self.circuits.values():
468
            if evc.is_enabled() and not evc.archived:
469
                evc.handle_link_up(event.content['link'])
470
471
    @listen_to('kytos/topology.link_down')
472
    def handle_link_down(self, event):
473
        """Change circuit when link is down or under_mantenance."""
474
        log.debug("Event handle_link_down %s", event)
475
        for evc in self.circuits.values():
476
            if evc.is_affected_by_link(event.content['link']):
477
                log.info('handling evc %s' % evc)
478
                evc.handle_link_down()
479
480
    def load_circuits_by_interface(self, circuits):
481
        """Load circuits in storehouse for in-memory dictionary."""
482
        for circuit_id, circuit in circuits.items():
483
            intf_a = circuit['uni_a']['interface_id']
484
            self.add_to_dict_of_sets(intf_a, circuit_id)
485
            intf_z = circuit['uni_z']['interface_id']
486
            self.add_to_dict_of_sets(intf_z, circuit_id)
487
            for path in ('current_path', 'primary_path', 'backup_path'):
488
                for link in circuit[path]:
489
                    intf_a = link['endpoint_a']['id']
490
                    self.add_to_dict_of_sets(intf_a, circuit_id)
491
                    intf_b = link['endpoint_b']['id']
492
                    self.add_to_dict_of_sets(intf_b, circuit_id)
493
494
    def add_to_dict_of_sets(self, intf, circuit_id):
495
        """Add a single item to the dictionary of circuits by interface."""
496
        if intf not in self._circuits_by_interface:
497
            self._circuits_by_interface[intf] = set()
498
        self._circuits_by_interface[intf].add(circuit_id)
499
500
    @listen_to('kytos/topology.port.created')
501
    def load_evcs(self, event):
502
        """Try to load the unloaded EVCs from storehouse."""
503
        log.debug("Event load_evcs %s", event)
504
        circuits = self.storehouse.get_data()
505
        if not self._circuits_by_interface:
506
            self.load_circuits_by_interface(circuits)
507
508
        interface_id = '{}:{}'.format(event.content['switch'],
509
                                      event.content['port'])
510
511
        for circuit_id in self._circuits_by_interface.get(interface_id, []):
512
            if circuit_id in circuits and circuit_id not in self.circuits:
513
                try:
514
                    evc = self._evc_from_dict(circuits[circuit_id])
515
                except ValueError as exception:
516
                    log.info(
517
                        f'Could not load EVC {circuit_id} because {exception}')
518
                    continue
519
                log.info(f'Loading EVC {circuit_id}')
520
                if evc.archived:
521
                    continue
522
                new_evc = self.circuits.setdefault(circuit_id, evc)
523
                if new_evc == evc:
524
                    if evc.is_enabled():
525
                        log.info(f'Trying to deploy EVC {circuit_id}')
526
                        evc.deploy()
527
                    self.sched.add(evc)
528
529
    def _evc_dict_with_instances(self, evc_dict):
530
        """Convert some dict values to instance of EVC classes.
531
532
        This method will convert: [UNI, Link]
533
        """
534
        data = evc_dict.copy()  # Do not modify the original dict
535
536
        for attribute, value in data.items():
537
            # Get multiple attributes.
538
            # Ex: uni_a, uni_z
539
            if 'uni' in attribute:
540
                try:
541
                    data[attribute] = self._uni_from_dict(value)
542
                except ValueError as exc:
543
                    raise ValueError(f'Error creating UNI: {exc}')
544
545
            if attribute == 'circuit_scheduler':
546
                data[attribute] = []
547
                for schedule in value:
548
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
549
550
            # Get multiple attributes.
551
            # Ex: primary_links,
552
            #     backup_links,
553
            #     current_links_cache,
554
            #     primary_links_cache,
555
            #     backup_links_cache
556
            if 'links' in attribute:
557
                data[attribute] = [self._link_from_dict(link)
558
                                   for link in value]
559
560
            # Get multiple attributes.
561
            # Ex: current_path,
562
            #     primary_path,
563
            #     backup_path
564
            if 'path' in attribute and attribute != 'dynamic_backup_path':
565
                data[attribute] = Path([self._link_from_dict(link)
566
                                        for link in value])
567
568
        return data
569
570
    def _evc_from_dict(self, evc_dict):
571
        data = self._evc_dict_with_instances(evc_dict)
572
        return EVC(self.controller, **data)
573
574
    def _uni_from_dict(self, uni_dict):
575
        """Return a UNI object from python dict."""
576
        if uni_dict is None:
577
            return False
578
579
        interface_id = uni_dict.get("interface_id")
580
        interface = self.controller.get_interface_by_id(interface_id)
581
        if interface is None:
582
            raise ValueError(f'Could not instantiate interface {interface_id}')
583
584
        try:
585
            tag_dict = uni_dict["tag"]
586
        except KeyError:
587
            tag = None
588
        else:
589
            tag = TAG.from_dict(tag_dict)
590
        uni = UNI(interface, tag)
591
592
        return uni
593
594
    def _link_from_dict(self, link_dict):
595
        """Return a Link object from python dict."""
596
        id_a = link_dict.get('endpoint_a').get('id')
597
        id_b = link_dict.get('endpoint_b').get('id')
598
599
        endpoint_a = self.controller.get_interface_by_id(id_a)
600
        endpoint_b = self.controller.get_interface_by_id(id_b)
601
602
        link = Link(endpoint_a, endpoint_b)
603
        if 'metadata' in link_dict:
604
            link.extend_metadata(link_dict.get('metadata'))
605
606
        s_vlan = link.get_metadata('s_vlan')
607
        if s_vlan:
608
            tag = TAG.from_dict(s_vlan)
609
            if tag is False:
610
                error_msg = f'Could not instantiate tag from dict {s_vlan}'
611
                raise ValueError(error_msg)
612
            link.update_metadata('s_vlan', tag)
613
        return link
614
615
    def _find_evc_by_schedule_id(self, schedule_id):
616
        """
617
        Find an EVC and CircuitSchedule based on schedule_id.
618
619
        :param schedule_id: Schedule ID
620
        :return: EVC and Schedule
621
        """
622
        circuits = self._get_circuits_buffer()
623
        found_schedule = None
624
        evc = None
625
626
        # pylint: disable=unused-variable
627
        for c_id, circuit in circuits.items():
628
            for schedule in circuit.circuit_scheduler:
629
                if schedule.id == schedule_id:
630
                    found_schedule = schedule
631
                    evc = circuit
632
                    break
633
            if found_schedule:
634
                break
635
        return evc, found_schedule
636
637
    def _get_circuits_buffer(self):
638
        """
639
        Return the circuit buffer.
640
641
        If the buffer is empty, try to load data from storehouse.
642
        """
643
        if not self.circuits:
644
            # Load storehouse circuits to buffer
645
            circuits = self.storehouse.get_data()
646
            for c_id, circuit in circuits.items():
647
                evc = self._evc_from_dict(circuit)
648
                self.circuits[c_id] = evc
649
        return self.circuits
650
651
    @staticmethod
652
    def json_from_request(caller):
653
        """Return a json from request.
654
655
        If it was not possible to get a json from the request, log, for debug,
656
        who was the caller and the error that ocurred, and raise an
657
        Exception.
658
        """
659
        try:
660
            json_data = request.get_json()
661
        except ValueError as exception:
662
            log.error(exception)
663
            log.debug(f'{caller} result {exception} 400')
664
            raise BadRequest(str(exception))
665
        except BadRequest:
666
            result = 'The request is not a valid JSON.'
667
            log.debug(f'{caller} result {result} 400')
668
            raise BadRequest(result)
669
        if json_data is None:
670
            result = 'Content-Type must be application/json'
671
            log.debug(f'{caller} result {result} 415')
672
            raise UnsupportedMediaType(result)
673
        return json_data
674