Passed
Pull Request — master (#162)
by Antonio
04:46
created

build.main.Main.update()   B

Complexity

Conditions 8

Size

Total Lines 39
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 65.6018

Importance

Changes 0
Metric Value
cc 8
eloc 30
nop 2
dl 0
loc 39
ccs 1
cts 29
cp 0.0345
crap 65.6018
rs 7.2933
c 0
b 0
f 0
1
"""Main module of kytos/mef_eline Kytos Network Application.
2
3
NApp to provision circuits from user request.
4
"""
5 2
from flask import jsonify, request
6 2
from werkzeug.exceptions import BadRequest
7
8 2
from kytos.core import KytosNApp, log, rest
9 2
from kytos.core.events import KytosEvent
10 2
from kytos.core.helpers import listen_to
11 2
from kytos.core.interface import TAG, UNI
12 2
from kytos.core.link import Link
13 2
from napps.kytos.mef_eline.models import EVC, DynamicPathManager
14 2
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
15 2
from napps.kytos.mef_eline.storehouse import StoreHouse
16
17
18 2
class Main(KytosNApp):
19
    """Main class of amlight/mef_eline NApp.
20
21
    This class is the entry point for this napp.
22
    """
23
24 2
    def setup(self):
25
        """Replace the '__init__' method for the KytosNApp subclass.
26
27
        The setup method is automatically called by the controller when your
28
        application is loaded.
29
30
        So, if you have any setup routine, insert it here.
31
        """
32
        # object used to scheduler circuit events
33 2
        self.sched = Scheduler()
34
35
        # object to save and load circuits
36 2
        self.storehouse = StoreHouse(self.controller)
37
38
        # set the controller that will manager the dynamic paths
39 2
        DynamicPathManager.set_controller(self.controller)
40
41
        # dictionary of EVCs created. It acts as a circuit buffer.
42
        # Every create/update/delete must be synced to storehouse.
43 2
        self.circuits = {}
44
45
        # dictionary of EVCs by interface
46 2
        self._circuits_by_interface = {}
47
48 2
    def execute(self):
49
        """Execute once when the napp is running."""
50
51 2
    def shutdown(self):
52
        """Execute when your napp is unloaded.
53
54
        If you have some cleanup procedure, insert it here.
55
        """
56
57 2
    @rest('/v2/evc/', methods=['GET'])
58
    def list_circuits(self):
59
        """Endpoint to return all circuits stored."""
60 2
        log.debug('list_circuits /v2/evc')
61 2
        circuits = self.storehouse.get_data()
62 2
        if not circuits:
63 2
            result = {}
64 2
            status = 200
65
        else:
66 2
            result = circuits
67 2
            status = 200
68
69 2
        log.debug('list_circuits result %s items, %s', len(result), status)
70 2
        return jsonify(result), status
71
72 2
    @rest('/v2/evc/<circuit_id>', methods=['GET'])
73
    def get_circuit(self, circuit_id):
74
        """Endpoint to return a circuit based on id."""
75 2
        log.debug('get_circuit /v2/evc/%s', circuit_id)
76 2
        circuits = self.storehouse.get_data()
77
78 2
        try:
79 2
            result = circuits[circuit_id]
80 2
            status = 200
81 2
        except KeyError:
82 2
            result = {'response': f'circuit_id {circuit_id} not found'}
83 2
            status = 404
84
85 2
        log.debug('get_circuit result %s %s', result, status)
86 2
        return jsonify(result), status
87
88 2
    @rest('/v2/evc/', methods=['POST'])
89
    def create_circuit(self):
90
        """Try to create a new circuit.
91
92
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
93
        UNI_Z's requested C-VID are available from the interfaces' pools. This
94
        is checked when creating the UNI object.
95
96
        Then, E-Line NApp requests a primary and a backup path to the
97
        Pathfinder NApp using the attributes primary_links and backup_links
98
        submitted via REST
99
100
        # For each link composing paths in #3:
101
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
102
        #  - Using the S-VID obtained, generate abstract flow entries to be
103
        #    sent to FlowManager
104
105
        Push abstract flow entries to FlowManager and FlowManager pushes
106
        OpenFlow entries to datapaths
107
108
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
109
        creation
110
111
        Finnaly, notify user of the status of its request.
112
        """
113
        # Try to create the circuit object
114 2
        log.debug('create_circuit /v2/evc/')
115 2
        data = request.get_json()
116
117 2
        if not data:
118 2
            result = "Bad request: The request do not have a json."
119 2
            status = 400
120 2
            log.debug('create_circuit result %s %s', result, status)
121 2
            return jsonify(result), status
122 2
        try:
123 2
            evc = self._evc_from_dict(data)
124
        except ValueError as exception:
125
            log.error(exception)
126
            result = "Bad request: {}".format(exception)
127
            status = 400
128
            log.debug('create_circuit result %s %s', result, status)
129
            return jsonify(result), status
130
131
        # verify duplicated evc
132 2
        if self._is_duplicated_evc(evc):
133 2
            result = "Not Acceptable: This evc already exists."
134 2
            status = 409
135 2
            log.debug('create_circuit result %s %s', result, status)
136 2
            return jsonify(result), status
137
138
        # store circuit in dictionary
139 2
        self.circuits[evc.id] = evc
140
141
        # save circuit
142 2
        self.storehouse.save_evc(evc)
143
144
        # Schedule the circuit deploy
145 2
        self.sched.add(evc)
146
147
        # Circuit has no schedule, deploy now
148 2
        if not evc.circuit_scheduler:
149 2
            evc.deploy()
150
151
        # Notify users
152 2
        event = KytosEvent(name='kytos.mef_eline.created',
153
                           content=evc.as_dict())
154 2
        self.controller.buffers.app.put(event)
155
156 2
        result = {"circuit_id": evc.id}
157 2
        status = 201
158 2
        log.debug('create_circuit result %s %s', result, status)
159 2
        return jsonify(result), status
160
161 2
    @rest('/v2/evc/<circuit_id>', methods=['PATCH'])
162
    def update(self, circuit_id):
163
        """Update a circuit based on payload.
164
165
        The EVC required attributes (name, uni_a, uni_z) can't be updated.
166
        """
167
        log.debug('update /v2/evc/%s', circuit_id)
168
        try:
169
            evc = self.circuits[circuit_id]
170
        except KeyError:
171
            result = {'response': f'circuit_id {circuit_id} not found'}
172
            status = 404
173
        else:
174
            if evc.archived:
175
                result = {'response': f'Can\'t update archived EVC'}
176
                status = 405
177
            else:
178
                try:
179
                    data = request.get_json()
180
                    evc.update(**data)
181
                except ValueError as exception:
182
                    log.error(exception)
183
                    result = {'response': 'Bad Request: {}'.format(exception)}
184
                    status = 400
185
                except TypeError:
186
                    result = {'response': 'Content-Type must be '
187
                                          'application/json'}
188
                    status = 415
189
                except BadRequest:
190
                    response = 'Bad Request: The request is not a valid JSON.'
191
                    result = {'response': response}
192
                    status = 400
193
                else:
194
                    evc.sync()
195
                    result = {evc.id: evc.as_dict()}
196
                    status = 200
197
198
        log.debug('update result %s %s', result, status)
199
        return jsonify(result), status
200
201 2
    @rest('/v2/evc/<circuit_id>', methods=['DELETE'])
202
    def delete_circuit(self, circuit_id):
203
        """Remove a circuit.
204
205
        First, the flows are removed from the switches, and then the EVC is
206
        disabled.
207
        """
208
        log.debug('delete_circuit /v2/evc/%s', circuit_id)
209
        try:
210
            evc = self.circuits[circuit_id]
211
        except KeyError:
212
            result = {'response': f'circuit_id {circuit_id} not found'}
213
            status = 404
214
        else:
215
            log.info('Removing %s', evc)
216
            if evc.archived:
217
                result = {'response': f'Circuit {circuit_id} already removed'}
218
                status = 404
219
            else:
220
                evc.remove_current_flows()
221
                evc.deactivate()
222
                evc.disable()
223
                self.sched.remove(evc)
224
                evc.archive()
225
                evc.sync()
226
                log.info('EVC removed. %s', evc)
227
                result = {'response': f'Circuit {circuit_id} removed'}
228
                status = 200
229
230
        log.debug('delete_circuit result %s %s', result, status)
231
        return jsonify(result), status
232
233 2
    @rest('/v2/evc/schedule', methods=['GET'])
234
    def list_schedules(self):
235
        """Endpoint to return all schedules stored for all circuits.
236
237
        Return a JSON with the following template:
238
        [{"schedule_id": <schedule_id>,
239
         "circuit_id": <circuit_id>,
240
         "schedule": <schedule object>}]
241
        """
242 2
        log.debug('list_schedules /v2/evc/schedule')
243 2
        circuits = self.storehouse.get_data().values()
244 2
        if not circuits:
245 2
            result = {}
246 2
            status = 200
247 2
            return jsonify(result), status
248
249 2
        result = []
250 2
        status = 200
251 2
        for circuit in circuits:
252 2
            circuit_scheduler = circuit.get("circuit_scheduler")
253 2
            if circuit_scheduler:
254 2
                for scheduler in circuit_scheduler:
255 2
                    value = {"schedule_id": scheduler.get("id"),
256
                             "circuit_id": circuit.get("id"),
257
                             "schedule": scheduler}
258 2
                    result.append(value)
259
260 2
        log.debug('list_schedules result %s %s', result, status)
261 2
        return jsonify(result), status
262
263 2
    @rest('/v2/evc/schedule/', methods=['POST'])
264
    def create_schedule(self):
265
        """
266
        Create a new schedule for a given circuit.
267
268
        This service do no check if there are conflicts with another schedule.
269
        Payload example:
270
            {
271
              "circuit_id":"aa:bb:cc",
272
              "schedule": {
273
                "date": "2019-08-07T14:52:10.967Z",
274
                "interval": "string",
275
                "frequency": "1 * * * *",
276
                "action": "create"
277
              }
278
            }
279
        """
280 2
        log.debug('create_schedule /v2/evc/schedule/')
281 2
        try:
282
            # Try to create the circuit object
283 2
            json_data = request.get_json()
284
285 2
            circuit_id = json_data.get("circuit_id")
286 2
            schedule_data = json_data.get("schedule")
287
288 2
            if not json_data:
289
                result = "Bad request: The request does not have a json."
290
                status = 400
291
                log.debug('create_schedule result %s %s', result, status)
292
                return jsonify(result), status
293 2
            if not circuit_id:
294
                result = "Bad request: Missing circuit_id."
295
                status = 400
296
                log.debug('create_schedule result %s %s', result, status)
297
                return jsonify(result), status
298 2
            if not schedule_data:
299
                result = "Bad request: Missing schedule data."
300
                status = 400
301
                log.debug('create_schedule result %s %s', result, status)
302
                return jsonify(result), status
303
304
            # Get EVC from circuits buffer
305 2
            circuits = self._get_circuits_buffer()
306
307
            # get the circuit
308 2
            evc = circuits.get(circuit_id)
309
310
            # get the circuit
311 2
            if not evc:
312
                result = {'response': f'circuit_id {circuit_id} not found'}
313
                status = 404
314
                log.debug('create_schedule result %s %s', result, status)
315
                return jsonify(result), status
316
            # Can not modify circuits deleted and archived
317 2
            if evc.archived:
318
                result = {'response': f'Circuit is archived.'
319
                                      f'Update is forbidden.'}
320
                status = 403
321
                log.debug('create_schedule result %s %s', result, status)
322
                return jsonify(result), status
323
324
            # new schedule from dict
325 2
            new_schedule = CircuitSchedule.from_dict(schedule_data)
326
327
            # If there is no schedule, create the list
328 2
            if not evc.circuit_scheduler:
329
                evc.circuit_scheduler = []
330
331
            # Add the new schedule
332 2
            evc.circuit_scheduler.append(new_schedule)
333
334
            # Add schedule job
335 2
            self.sched.add_circuit_job(evc, new_schedule)
336
337
            # save circuit to storehouse
338 2
            evc.sync()
339
340 2
            result = new_schedule.as_dict()
341 2
            status = 201
342
343
        except ValueError as exception:
344
            log.error(exception)
345
            result = {'response': 'Bad Request: {}'.format(exception)}
346
            status = 400
347
        except TypeError:
348
            result = {'response': 'Content-Type must be application/json'}
349
            status = 415
350
        except BadRequest:
351
            response = 'Bad Request: The request is not a valid JSON.'
352
            result = {'response': response}
353
            status = 400
354
355 2
        log.debug('create_schedule result %s %s', result, status)
356 2
        return jsonify(result), status
357
358 2
    @rest('/v2/evc/schedule/<schedule_id>', methods=['PATCH'])
359
    def update_schedule(self, schedule_id):
360
        """Update a schedule.
361
362
        Change all attributes from the given schedule from a EVC circuit.
363
        The schedule ID is preserved as default.
364
        Payload example:
365
            {
366
              "date": "2019-08-07T14:52:10.967Z",
367
              "interval": "string",
368
              "frequency": "1 * * *",
369
              "action": "create"
370
            }
371
        """
372 2
        log.debug('update_schedule /v2/evc/schedule/%s', schedule_id)
373 2
        try:
374
            # Try to find a circuit schedule
375 2
            evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
376
377
            # Can not modify circuits deleted and archived
378 2
            if not found_schedule:
379
                result = {'response': f'schedule_id {schedule_id} not found'}
380
                status = 404
381
                log.debug('update_schedule result %s %s', result, status)
382
                return jsonify(result), status
383 2
            if evc.archived:
384 2
                result = {'response': f'Circuit is archived.'
385
                                      f'Update is forbidden.'}
386 2
                status = 403
387 2
                log.debug('update_schedule result %s %s', result, status)
388 2
                return jsonify(result), status
389
390 2
            data = request.get_json()
391
392 2
            new_schedule = CircuitSchedule.from_dict(data)
393 2
            new_schedule.id = found_schedule.id
394
            # Remove the old schedule
395 2
            evc.circuit_scheduler.remove(found_schedule)
396
            # Append the modified schedule
397 2
            evc.circuit_scheduler.append(new_schedule)
398
399
            # Cancel all schedule jobs
400 2
            self.sched.cancel_job(found_schedule.id)
401
            # Add the new circuit schedule
402 2
            self.sched.add_circuit_job(evc, new_schedule)
403
            # Save EVC to the storehouse
404 2
            evc.sync()
405
406 2
            result = new_schedule.as_dict()
407 2
            status = 200
408
409
        except ValueError as exception:
410
            log.error(exception)
411
            result = {'response': 'Bad Request: {}'.format(exception)}
412
            status = 400
413
        except TypeError:
414
            result = {'response': 'Content-Type must be application/json'}
415
            status = 415
416
        except BadRequest:
417
            result = {'response':
418
                      'Bad Request: The request is not a valid JSON.'}
419
            status = 400
420
421 2
        log.debug('update_schedule result %s %s', result, status)
422 2
        return jsonify(result), status
423
424 2
    @rest('/v2/evc/schedule/<schedule_id>', methods=['DELETE'])
425
    def delete_schedule(self, schedule_id):
426
        """Remove a circuit schedule.
427
428
        Remove the Schedule from EVC.
429
        Remove the Schedule from cron job.
430
        Save the EVC to the Storehouse.
431
        """
432 2
        log.debug('delete_schedule /v2/evc/schedule/%s', schedule_id)
433 2
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
434
435
        # Can not modify circuits deleted and archived
436 2
        if not found_schedule:
437
            result = {'response': f'schedule_id {schedule_id} not found'}
438
            status = 404
439
            log.debug('delete_schedule result %s %s', result, status)
440
            return jsonify(result), status
441
442 2
        if evc.archived:
443 2
            result = {'response': f'Circuit is archived. Update is forbidden.'}
444 2
            status = 403
445 2
            log.debug('delete_schedule result %s %s', result, status)
446 2
            return jsonify(result), status
447
448
        # Remove the old schedule
449 2
        evc.circuit_scheduler.remove(found_schedule)
450
451
        # Cancel all schedule jobs
452 2
        self.sched.cancel_job(found_schedule.id)
453
        # Save EVC to the storehouse
454 2
        evc.sync()
455
456 2
        result = "Schedule removed"
457 2
        status = 200
458
459 2
        log.debug('delete_schedule result %s %s', result, status)
460 2
        return jsonify(result), status
461
462 2
    def _is_duplicated_evc(self, evc):
463
        """Verify if the circuit given is duplicated with the stored evcs.
464
465
        Args:
466
            evc (EVC): circuit to be analysed.
467
468
        Returns:
469
            boolean: True if the circuit is duplicated, otherwise False.
470
471
        """
472 2
        for circuit in self.circuits.values():
473 2
            if not circuit.archived and circuit == evc:
474 2
                return True
475 2
        return False
476
477 2
    @listen_to('kytos/topology.link_up')
478
    def handle_link_up(self, event):
479
        """Change circuit when link is up or end_maintenance."""
480
        log.debug("Event handle_link_up %s", event)
481
        for evc in self.circuits.values():
482
            if evc.is_enabled() and not evc.archived:
483
                evc.handle_link_up(event.content['link'])
484
485 2
    @listen_to('kytos/topology.link_down')
486
    def handle_link_down(self, event):
487
        """Change circuit when link is down or under_mantenance."""
488
        log.debug("Event handle_link_down %s", event)
489
        for evc in self.circuits.values():
490
            if evc.is_affected_by_link(event.content['link']):
491
                log.info('handling evc %s' % evc)
492
                evc.handle_link_down()
493
494 2
    def load_circuits_by_interface(self, circuits):
495
        """Load circuits in storehouse for in-memory dictionary."""
496 2
        for circuit_id, circuit in circuits.items():
497 2
            intf_a = circuit['uni_a']['interface_id']
498 2
            self.add_to_dict_of_sets(intf_a, circuit_id)
499 2
            intf_z = circuit['uni_z']['interface_id']
500 2
            self.add_to_dict_of_sets(intf_z, circuit_id)
501 2
            for path in ('current_path', 'primary_path', 'backup_path'):
502 2
                for link in circuit[path]:
503 2
                    intf_a = link['endpoint_a']['id']
504 2
                    self.add_to_dict_of_sets(intf_a, circuit_id)
505 2
                    intf_b = link['endpoint_b']['id']
506 2
                    self.add_to_dict_of_sets(intf_b, circuit_id)
507
508 2
    def add_to_dict_of_sets(self, intf, circuit_id):
509
        """Add a single item to the dictionary of circuits by interface."""
510 2
        if intf not in self._circuits_by_interface:
511 2
            self._circuits_by_interface[intf] = set()
512 2
        self._circuits_by_interface[intf].add(circuit_id)
513
514 2
    @listen_to('kytos/topology.port.created')
515
    def load_evcs(self, event):
516
        """Try to load the unloaded EVCs from storehouse."""
517
        log.debug("Event load_evcs %s", event)
518
        circuits = self.storehouse.get_data()
519
        if not self._circuits_by_interface:
520
            self.load_circuits_by_interface(circuits)
521
522
        interface_id = '{}:{}'.format(event.content['switch'],
523
                                      event.content['port'])
524
525
        for circuit_id in self._circuits_by_interface.get(interface_id, []):
526
            if circuit_id in circuits and circuit_id not in self.circuits:
527
                try:
528
                    evc = self._evc_from_dict(circuits[circuit_id])
529
                except ValueError as exception:
530
                    log.info(
531
                        f'Could not load EVC {circuit_id} because {exception}')
532
                    continue
533
                log.info(f'Loading EVC {circuit_id}')
534
                if evc.archived:
535
                    continue
536
                if evc.is_enabled():
537
                    log.info(f'Trying to deploy EVC {circuit_id}')
538
                    evc.deploy()
539
                self.circuits[circuit_id] = evc
540
                self.sched.add(evc)
541
542 2
    def _evc_from_dict(self, evc_dict):
543
        """Convert some dict values to instance of EVC classes.
544
545
        This method will convert: [UNI, Link]
546
        """
547 2
        data = evc_dict.copy()  # Do not modify the original dict
548
549 2
        for attribute, value in data.items():
550
            # Get multiple attributes.
551
            # Ex: uni_a, uni_z
552 2
            if 'uni' in attribute:
553 2
                try:
554 2
                    data[attribute] = self._uni_from_dict(value)
555
                except ValueError as exc:
556
                    raise ValueError(f'Error creating UNI: {exc}')
557
558 2
            if attribute == 'circuit_scheduler':
559 2
                data[attribute] = []
560 2
                for schedule in value:
561 2
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
562
563
            # Get multiple attributes.
564
            # Ex: primary_links,
565
            #     backup_links,
566
            #     current_links_cache,
567
            #     primary_links_cache,
568
            #     backup_links_cache
569 2
            if 'links' in attribute:
570 2
                data[attribute] = [self._link_from_dict(link)
571
                                   for link in value]
572
573
            # Get multiple attributes.
574
            # Ex: current_path,
575
            #     primary_path,
576
            #     backup_path
577 2
            if 'path' in attribute and attribute != 'dynamic_backup_path':
578 2
                data[attribute] = [self._link_from_dict(link)
579
                                   for link in value]
580
581 2
        return EVC(self.controller, **data)
582
583 2
    def _uni_from_dict(self, uni_dict):
584
        """Return a UNI object from python dict."""
585
        if uni_dict is None:
586
            return False
587
588
        interface_id = uni_dict.get("interface_id")
589
        interface = self.controller.get_interface_by_id(interface_id)
590
        if interface is None:
591
            raise ValueError(f'Could not instantiate interface {interface_id}')
592
593
        tag_dict = uni_dict.get("tag")
594
        tag = TAG.from_dict(tag_dict)
595
        if tag is False:
596
            raise ValueError(f'Could not instantiate tag from dict {tag_dict}')
597
598
        uni = UNI(interface, tag)
599
600
        return uni
601
602 2
    def _link_from_dict(self, link_dict):
603
        """Return a Link object from python dict."""
604 2
        id_a = link_dict.get('endpoint_a').get('id')
605 2
        id_b = link_dict.get('endpoint_b').get('id')
606
607 2
        endpoint_a = self.controller.get_interface_by_id(id_a)
608 2
        endpoint_b = self.controller.get_interface_by_id(id_b)
609
610 2
        link = Link(endpoint_a, endpoint_b)
611 2
        if 'metadata' in link_dict:
612
            link.extend_metadata(link_dict.get('metadata'))
613
614 2
        s_vlan = link.get_metadata('s_vlan')
615 2
        if s_vlan:
616
            tag = TAG.from_dict(s_vlan)
617
            if tag is False:
618
                error_msg = f'Could not instantiate tag from dict {s_vlan}'
619
                raise ValueError(error_msg)
620
            link.update_metadata('s_vlan', tag)
621 2
        return link
622
623 2
    def _find_evc_by_schedule_id(self, schedule_id):
624
        """
625
        Find an EVC and CircuitSchedule based on schedule_id.
626
627
        :param schedule_id: Schedule ID
628
        :return: EVC and Schedule
629
        """
630 2
        circuits = self._get_circuits_buffer()
631 2
        found_schedule = None
632 2
        evc = None
633
634
        # pylint: disable=unused-variable
635 2
        for c_id, circuit in circuits.items():
636 2
            for schedule in circuit.circuit_scheduler:
637 2
                if schedule.id == schedule_id:
638 2
                    found_schedule = schedule
639 2
                    evc = circuit
640 2
                    break
641 2
            if found_schedule:
642 2
                break
643 2
        return evc, found_schedule
644
645 2
    def _get_circuits_buffer(self):
646
        """
647
        Return the circuit buffer.
648
649
        If the buffer is empty, try to load data from storehouse.
650
        """
651 2
        if not self.circuits:
652
            # Load storehouse circuits to buffer
653 2
            circuits = self.storehouse.get_data()
654 2
            for c_id, circuit in circuits.items():
655 2
                evc = self._evc_from_dict(circuit)
656 2
                self.circuits[c_id] = evc
657
        return self.circuits
658