Passed
Pull Request — master (#34)
by
unknown
02:48
created

build.main.GenericFlow.do_match()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 8.2077

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 7
rs 10
c 0
b 0
f 0
ccs 1
cts 6
cp 0.1666
cc 3
nop 2
crap 8.2077
1
"""Main module of amlight/flow_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
import hashlib
9 1
import ipaddress
10 1
import json
11 1
from threading import Lock
12
13 1
import pyof.v0x01.controller2switch.common as common01
14 1
import pyof.v0x04.controller2switch.common as common04
15 1
from flask import jsonify, request
16 1
from kytos.core import KytosEvent, KytosNApp, log, rest
17 1
from kytos.core.helpers import listen_to
18 1
from napps.amlight.flow_stats.utils import format_request
19 1
from napps.amlight.sdntrace import constants
20 1
from napps.kytos.of_core.v0x01.flow import Action as Action10
21 1
from napps.kytos.of_core.v0x04.flow import Action as Action13
22 1
from napps.kytos.of_core.v0x04.match_fields import MatchFieldFactory
23 1
from pyof.v0x01.common.flow_match import FlowWildCards
24 1
from pyof.v0x04.common.flow_instructions import InstructionType
25
26
27 1
class GenericFlow():
28
    """Class to represent a flow.
29
30
        This class represents a flow regardless of the OF version."""
31
32 1
    def __init__(self, version='0x01', match=None, idle_timeout=0,
33
                 hard_timeout=0, duration_sec=0, packet_count=0, byte_count=0,
34
                 priority=0, table_id=0xff, cookie=None, buffer_id=None,
35
                 actions=None):
36 1
        self.version = version
37 1
        self.match = match if match else {}
38 1
        self.idle_timeout = idle_timeout
39 1
        self.hard_timeout = hard_timeout
40 1
        self.duration_sec = duration_sec
41 1
        self.packet_count = packet_count
42 1
        self.byte_count = byte_count
43 1
        self.priority = priority
44 1
        self.table_id = table_id
45 1
        self.cookie = cookie
46 1
        self.buffer_id = buffer_id
47 1
        self.actions = actions if actions else []
48
49 1
    def __eq__(self, other):
50
        return self.id == other.id
51
52 1
    @property
53 1
    def id(self):
54
        # pylint: disable=invalid-name
55
        """Return the hash of the object.
56
        Calculates the hash of the object by using the hashlib we use md5 of
57
        strings.
58
        Returns:
59
            string: Hash of object.
60
        """
61 1
        hash_result = hashlib.md5()
62 1
        hash_result.update(str(self.version).encode('utf-8'))
63 1
        for value in self.match.copy().values():
64 1
            if self.version == '0x01':
65 1
                hash_result.update(str(value).encode('utf-8'))
66
            else:
67 1
                hash_result.update(str(value.value).encode('utf-8'))
68 1
        hash_result.update(str(self.idle_timeout).encode('utf-8'))
69 1
        hash_result.update(str(self.hard_timeout).encode('utf-8'))
70 1
        hash_result.update(str(self.priority).encode('utf-8'))
71 1
        hash_result.update(str(self.table_id).encode('utf-8'))
72 1
        hash_result.update(str(self.cookie).encode('utf-8'))
73 1
        hash_result.update(str(self.buffer_id).encode('utf-8'))
74
75 1
        return hash_result.hexdigest()
76
77 1
    def to_dict(self):
78
        """Convert flow to a dictionary."""
79 1
        flow_dict = {}
80 1
        flow_dict['version'] = self.version
81 1
        if self.version == '0x01':
82 1
            flow_dict.update(self.match)
83
        else:
84 1
            flow_dict.update(self.match_to_dict())
85 1
        flow_dict['idle_timeout'] = self.idle_timeout
86 1
        flow_dict['hard_timeout'] = self.hard_timeout
87 1
        flow_dict['priority'] = self.priority
88 1
        flow_dict['table_id'] = self.table_id
89 1
        flow_dict['cookie'] = self.cookie
90 1
        flow_dict['buffer_id'] = self.buffer_id
91 1
        flow_dict['actions'] = []
92 1
        for action in self.actions:
93 1
            flow_dict['actions'].append(action.as_dict())
94
95 1
        return flow_dict
96
97 1
    def match_to_dict(self):
98
        """Convert a match in OF 1.3 to a dictionary."""
99
        # pylint: disable=consider-using-dict-items
100 1
        match = {}
101 1
        for name in self.match.copy():
102 1
            match[name] = self.match[name].value
103 1
        return match
104
105 1
    def to_json(self):
106
        """Return a json version of the flow."""
107
        return json.dumps(self.to_dict())
108
109
    # @staticmethod
110
    # def from_dict(flow_dict):
111
    #     """Create a flow from a dict."""
112
    #     flow = GenericFlow()
113
    #     for attr_name, value in flow_dict.items():
114
    #         if attr_name == 'actions':
115
    #             flow.actions = []
116
    #             for action in value:
117
    #                 new_action = ACTION_TYPES[int(action['action_type'])]()
118
    #                 for action_attr_name,
119
    #                     action_attr_value in action.items():
120
    #
121
    #                     setattr(new_action,
122
    #                             action_attr_name,
123
    #                             action_attr_value)
124
    #                 flow.actions.append(new_action)
125
    #         else:
126
    #             setattr(flow, attr_name, value)
127
    #     return flow
128
129 1
    @classmethod
130 1
    def from_flow_stats(cls, flow_stats, version='0x01'):
131
        """Create a flow from OF flow stats."""
132 1
        flow = GenericFlow(version=version)
133 1
        flow.idle_timeout = flow_stats.idle_timeout.value
134 1
        flow.hard_timeout = flow_stats.hard_timeout.value
135 1
        flow.priority = flow_stats.priority.value
136 1
        flow.table_id = flow_stats.table_id.value
137 1
        flow.duration_sec = flow_stats.duration_sec.value
138 1
        flow.packet_count = flow_stats.packet_count.value
139 1
        flow.byte_count = flow_stats.byte_count.value
140 1
        if version == '0x01':
141 1
            flow.match['wildcards'] = flow_stats.match.wildcards.value
142 1
            flow.match['in_port'] = flow_stats.match.in_port.value
143 1
            flow.match['eth_src'] = flow_stats.match.dl_src.value
144 1
            flow.match['eth_dst'] = flow_stats.match.dl_dst.value
145 1
            flow.match['vlan_vid'] = flow_stats.match.dl_vlan.value
146 1
            flow.match['vlan_pcp'] = flow_stats.match.dl_vlan_pcp.value
147 1
            flow.match['eth_type'] = flow_stats.match.dl_type.value
148 1
            flow.match['ip_tos'] = flow_stats.match.nw_tos.value
149 1
            flow.match['ipv4_src'] = flow_stats.match.nw_src.value
150 1
            flow.match['ipv4_dst'] = flow_stats.match.nw_dst.value
151 1
            flow.match['ip_proto'] = flow_stats.match.nw_proto.value
152 1
            flow.match['tcp_src'] = flow_stats.match.tp_src.value
153 1
            flow.match['tcp_dst'] = flow_stats.match.tp_dst.value
154 1
            flow.actions = []
155 1
            for of_action in flow_stats.actions:
156 1
                action = Action10.from_of_action(of_action)
157 1
                flow.actions.append(action)
158 1
        elif version == '0x04':
159 1
            for match in flow_stats.match.oxm_match_fields:
160 1
                match_field = MatchFieldFactory.from_of_tlv(match)
161 1
                field_name = match_field.name
162 1
                if field_name == 'dl_vlan':
163 1
                    field_name = 'vlan_vid'
164 1
                flow.match[field_name] = match_field
165 1
            flow.actions = []
166 1
            for instruction in flow_stats.instructions:
167 1
                if instruction.instruction_type == \
168
                        InstructionType.OFPIT_APPLY_ACTIONS:
169 1
                    for of_action in instruction.actions:
170 1
                        action = Action13.from_of_action(of_action)
171 1
                        flow.actions.append(action)
172 1
        return flow
173
174 1
    @classmethod
175 1
    def from_replies_flows(cls, flow04):
176
        """Create a flow from a flow passed on
177
        replies_flows in event kytos/of_core.flow_stats.received."""
178
179
        flow = GenericFlow(version='0x04')
180
        flow.idle_timeout = flow04.idle_timeout
181
        flow.hard_timeout = flow04.hard_timeout
182
        flow.priority = flow04.priority
183
        flow.table_id = flow04.table_id
184
        flow.duration_sec = flow04.stats.duration_sec
185
        flow.packet_count = flow04.stats.packet_count
186
        flow.byte_count = flow04.stats.byte_count
187
188
        as_of_match = flow04.match.as_of_match()
189
        for match in as_of_match.oxm_match_fields:
190
            match_field = MatchFieldFactory.from_of_tlv(match)
191
            field_name = match_field.name
192
            if field_name == 'dl_vlan':
193
                field_name = 'vlan_vid'
194
            flow.match[field_name] = match_field
195
        flow.actions = []
196
        for instruction in flow04.instructions:
197
            if instruction.instruction_type == \
198
                    InstructionType.OFPIT_APPLY_ACTIONS:
199
                for of_action in instruction.actions:
200
                    action = Action13.from_of_action(of_action)
201
                    flow.actions.append(action)
202
        return flow
203
204 1
    def do_match(self, args):
205
        """Match a packet against this flow."""
206
        if self.version == '0x01':
207
            return self.match10(args)
208
        if self.version == '0x04':
209
            return self.match13(args)
210
        return None
211
212 1
    def match10(self, args):
213
        """Match a packet against this flow (OF1.0)."""
214
        log.debug('Matching packet')
215
        if not self.match['wildcards'] & FlowWildCards.OFPFW_IN_PORT:
216
            if 'in_port' not in args:
217
                return False
218
            if self.match['in_port'] != int(args['in_port']):
219
                return False
220
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN_PCP:
221
            if 'vlan_pcp' not in args:
222
                return False
223
            if self.match['vlan_pcp'] != int(args['vlan_pcp']):
224
                return False
225
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN:
226
            if 'vlan_vid' not in args:
227
                return False
228
            if self.match['vlan_vid'] != args['vlan_vid'][-1]:
229
                return False
230
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_SRC:
231
            if 'eth_src' not in args:
232
                return False
233
            if self.match['eth_src'] != args['eth_src']:
234
                return False
235
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_DST:
236
            if 'eth_dst' not in args:
237
                return False
238
            if self.match['eth_dst'] != args['eth_dst']:
239
                return False
240
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_TYPE:
241
            if 'eth_type' not in args:
242
                return False
243
            if self.match['eth_type'] != int(args['eth_type']):
244
                return False
245
        if self.match['eth_type'] == constants.IPV4:
246
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_src']))
247
            if flow_ip_int != 0:
248
                mask = ((self.match['wildcards'] &
249
                         FlowWildCards.OFPFW_NW_SRC_MASK) >>
250
                        FlowWildCards.OFPFW_NW_SRC_SHIFT)
251
                mask = min(mask, 32)
252
                if mask != 32 and 'ipv4_src' not in args:
253
                    return False
254
                mask = (0xffffffff << mask) & 0xffffffff
255
                ip_int = int(ipaddress.IPv4Address(args['ipv4_src']))
256
                if ip_int & mask != flow_ip_int & mask:
257
                    return False
258
259
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_dst']))
260
            if flow_ip_int != 0:
261
                mask = ((self.match['wildcards'] &
262
                         FlowWildCards.OFPFW_NW_DST_MASK) >>
263
                        FlowWildCards.OFPFW_NW_DST_SHIFT)
264
                mask = min(mask, 32)
265
                if mask != 32 and 'ipv4_dst' not in args:
266
                    return False
267
                mask = (0xffffffff << mask) & 0xffffffff
268
                ip_int = int(ipaddress.IPv4Address(args['ipv4_dst']))
269
                if ip_int & mask != flow_ip_int & mask:
270
                    return False
271
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_TOS:
272
                if 'ip_tos' not in args:
273
                    return False
274
                if self.match['ip_tos'] != int(args['ip_tos']):
275
                    return False
276
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_PROTO:
277
                if 'ip_proto' not in args:
278
                    return False
279
                if self.match['ip_proto'] != int(args['ip_proto']):
280
                    return False
281
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_SRC:
282
                if 'tp_src' not in args:
283
                    return False
284
                if self.match['tcp_src'] != int(args['tp_src']):
285
                    return False
286
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_DST:
287
                if 'tp_dst' not in args:
288
                    return False
289
                if self.match['tcp_dst'] != int(args['tp_dst']):
290
                    return False
291
        return self
292
293 1
    def match13(self, args):
294
        """Match a packet against this flow (OF1.3)."""
295
        # pylint: disable=consider-using-dict-items
296
        for name in self.match.copy():
297
            if name not in args:
298
                return False
299
            if name == 'vlan_vid':
300
                field = args[name][-1]
301
            else:
302
                field = args[name]
303
            if name not in ('ipv4_src', 'ipv4_dst', 'ipv6_src', 'ipv6_dst'):
304
                if self.match[name].value != field:
305
                    return False
306
            else:
307
                packet_ip = int(ipaddress.ip_address(field))
308
                ip_addr = self.match[name].value
309
                if packet_ip & ip_addr.netmask != ip_addr.address:
310
                    return False
311
        return self
312
313
314
# pylint: disable=too-many-public-methods
315 1
class Main(KytosNApp):
316
    """Main class of amlight/flow_stats NApp.
317
318
    This class is the entry point for this napp.
319
    """
320
321 1
    def setup(self):
322
        """Replace the '__init__' method for the KytosNApp subclass.
323
324
        The setup method is automatically called by the controller when your
325
        application is loaded.
326
327
        So, if you have any setup routine, insert it here.
328
        """
329 1
        log.info('Starting Kytos/Amlight flow manager')
330 1
        for switch in self.controller.switches.copy().values():
331
            switch.generic_flows = []
332 1
        self.switch_stats_xid = {}
333 1
        self.switch_stats_lock = {}
334
335 1
    def execute(self):
336
        """This method is executed right after the setup method execution.
337
338
        You can also use this method in loop mode if you add to the above setup
339
        method a line like the following example:
340
341
            self.execute_as_loop(30)  # 30-second interval.
342
        """
343
344 1
    def shutdown(self):
345
        """This method is executed when your napp is unloaded.
346
347
        If you have some cleanup procedure, insert it here.
348
        """
349
350 1
    def flow_from_id(self, flow_id):
351
        """Flow from given flow_id."""
352 1
        for switch in self.controller.switches.copy().values():
353 1
            try:
354 1
                for flow in switch.generic_flows:
355 1
                    if flow.id == flow_id:
356 1
                        return flow
357
            except KeyError:
358
                pass
359 1
        return None
360
361 1
    @rest('flow/match/<dpid>')
362 1
    def flow_match(self, dpid):
363
        """Return first flow matching request."""
364 1
        switch = self.controller.get_switch_by_dpid(dpid)
365 1
        flow = self.match_flows(switch, format_request(request.args), False)
366 1
        if flow:
367 1
            return jsonify(flow.to_dict())
368 1
        return "No match", 404
369
370 1
    @rest('flow/stats/<dpid>')
371 1
    def flow_stats(self, dpid):
372
        """Return all flows matching request."""
373 1
        switch = self.controller.get_switch_by_dpid(dpid)
374 1
        if not switch:
375
            return f"switch {dpid} not found", 404
376 1
        flows = self.match_flows(switch, format_request(request.args), True)
377 1
        flows = [flow.to_dict() for flow in flows]
378 1
        return jsonify(flows)
379
380 1
    @staticmethod
381 1
    def match_flows(switch, args, many=True):
382
        # pylint: disable=bad-staticmethod-argument
383
        """
384
        Match the packet in request against the flows installed in the switch.
385
386
        Try the match with each flow, in other. If many is True, tries the
387
        match with all flows, if False, tries until the first match.
388
        :param args: packet data
389
        :param many: Boolean, indicating whether to continue after matching the
390
                first flow or not
391
        :return: If many, the list of matched flows, or the matched flow
392
        """
393
        response = []
394
        try:
395
            for flow in switch.generic_flows:
396
                match = flow.do_match(args)
397
                if match:
398
                    if many:
399
                        response.append(match)
400
                    else:
401
                        response = match
402
                        break
403
        except AttributeError:
404
            return None
405
        if not many and isinstance(response, list):
406
            return None
407
        return response
408
409 1
    @staticmethod
410 1
    def match_and_apply(switch, args):
411
        # pylint: disable=bad-staticmethod-argument
412
        """Match flows and apply actions.
413
414
        Match given packet (in args) against the switch flows and,
415
        if a match flow is found, apply its actions."""
416
        flow = Main.match_flows(switch, args, False)
417
        port = None
418
        actions = None
419
        # pylint: disable=too-many-nested-blocks
420
        if flow:
421
            actions = flow.actions
422
            if switch.ofp_version == '0x01':
423
                for action in actions:
424
                    action_type = action.action_type
425
                    if action_type == 'output':
426
                        port = action.port
427
                    elif action_type == 'set_vlan':
428
                        if 'vlan_vid' in args:
429
                            args['vlan_vid'][-1] = action.vlan_id
430
                        else:
431
                            args['vlan_vid'] = [action.vlan_id]
432
            elif switch.ofp_version == '0x04':
433
                for action in actions:
434
                    action_type = action.action_type
435
                    if action_type == 'output':
436
                        port = action.port
437
                    if action_type == 'push_vlan':
438
                        if 'vlan_vid' not in args:
439
                            args['vlan_vid'] = []
440
                        args['vlan_vid'].append(0)
441
                    if action_type == 'pop_vlan':
442
                        if 'vlan_vid' in args:
443
                            args['vlan_vid'].pop()
444
                            if len(args['vlan_vid']) == 0:
445
                                del args['vlan_vid']
446
                    if action_type == 'set_vlan':
447
                        args['vlan_vid'][-1] = action.vlan_id
448
        return flow, args, port
449
450 1
    @rest('packet_count/<flow_id>')
451 1
    def packet_count(self, flow_id):
452
        """Packet count of an specific flow."""
453 1
        flow = self.flow_from_id(flow_id)
454 1
        if flow is None:
455 1
            return "Flow does not exist", 404
456 1
        packet_stats = {
457
            'flow_id': flow_id,
458
            'packet_counter': flow.packet_count,
459
            'packet_per_second': flow.packet_count / flow.duration_sec
460
            }
461 1
        return jsonify(packet_stats)
462
463 1
    @rest('bytes_count/<flow_id>')
464 1
    def bytes_count(self, flow_id):
465
        """Bytes count of an specific flow."""
466 1
        flow = self.flow_from_id(flow_id)
467 1
        if flow is None:
468 1
            return "Flow does not exist", 404
469 1
        bytes_stats = {
470
            'flow_id': flow_id,
471
            'bytes_counter': flow.byte_count,
472
            'bits_per_second': flow.byte_count * 8 / flow.duration_sec
473
            }
474 1
        return jsonify(bytes_stats)
475
476 1
    @rest('packet_count/per_flow/<dpid>')
477 1
    def packet_count_per_flow(self, dpid):
478
        """Per flow packet count."""
479 1
        return self.flows_counters('packet_count',
480
                                   dpid,
481
                                   counter='packet_counter',
482
                                   rate='packet_per_second')
483
484 1
    @rest('packet_count/sum/<dpid>')
485 1
    def packet_count_sum(self, dpid):
486
        """Sum of packet count flow stats."""
487 1
        return self.flows_counters('packet_count',
488
                                   dpid,
489
                                   total=True)
490
491 1
    @rest('bytes_count/per_flow/<dpid>')
492 1
    def bytes_count_per_flow(self, dpid):
493
        """Per flow bytes count."""
494 1
        return self.flows_counters('byte_count',
495
                                   dpid,
496
                                   counter='bytes_counter',
497
                                   rate='bits_per_second')
498
499 1
    @rest('bytes_count/sum/<dpid>')
500 1
    def bytes_count_sum(self, dpid):
501
        """Sum of bytes count flow stats."""
502 1
        return self.flows_counters('byte_count',
503
                                   dpid,
504
                                   total=True)
505
506 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
507
                       total=False):
508
        """Calculate flows statistics.
509
510
        The returned statistics are both per flow and for the sum of flows
511
        """
512
        # pylint: disable=too-many-arguments
513
        # pylint: disable=unused-variable
514 1
        start_date = request.args.get('start_date', 0)
515 1
        end_date = request.args.get('end_date', 0)
516
        # pylint: enable=unused-variable
517
518 1
        if total:
519 1
            count_flows = 0
520
        else:
521 1
            count_flows = []
522 1
            if not counter:
523
                counter = field
524 1
            if not rate:
525
                rate = field
526
527
        # We don't have statistics persistence yet, so for now this only works
528
        # for start and end equals to zero
529 1
        flows = self.controller.get_switch_by_dpid(dpid).generic_flows
530
531 1
        for flow in flows:
532 1
            count = getattr(flow, field)
533 1
            if total:
534 1
                count_flows += count
535
            else:
536 1
                per_second = count / flow.duration_sec
537 1
                if rate.startswith('bits'):
538 1
                    per_second *= 8
539 1
                count_flows.append({'flow_id': flow.id,
540
                                    counter: count,
541
                                    rate: per_second})
542
543 1
        return jsonify(count_flows)
544
545 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
546 1
    def on_stats_reply_0x01(self, event):
547
        """Capture flow stats messages for v0x01 switches."""
548
        self.handle_stats_reply_0x01(event)
549
550 1
    def handle_stats_reply_0x01(self, event):
551
        """Handle stats replies for v0x01 switches."""
552 1
        msg = event.content['message']
553 1
        if msg.body_type == common01.StatsType.OFPST_FLOW:
554 1
            switch = event.source.switch
555 1
            self.handle_stats_reply(msg, switch)
556
557 1
    @listen_to('kytos/of_core.v0x04.messages.in.ofpt_multipart_reply')
558 1
    def on_stats_reply_0x04(self, event):
559
        """Capture flow stats messages for OpenFlow 1.3."""
560
        self.handle_stats_reply_0x04(event)
561
562 1
    def handle_stats_reply_0x04(self, event):
563
        """Handle flow stats messages for OpenFlow 1.3."""
564 1
        msg = event.content['message']
565 1
        if msg.multipart_type == common04.MultipartType.OFPMP_FLOW:
566 1
            switch = event.source.switch
567 1
            self.handle_stats_reply(msg, switch)
568
569 1
    def handle_stats_reply(self, msg, switch):
570
        """Insert flows received in the switch list of flows."""
571 1
        try:
572 1
            old_flows = switch.generic_flows
573
        except AttributeError:
574
            old_flows = []
575 1
        is_new_xid = (
576
            int(msg.header.xid) != self.switch_stats_xid.get(switch.id, 0)
577
        )
578 1
        is_last_part = msg.flags.value % 2 == 0
579 1
        self.switch_stats_lock.setdefault(switch.id, Lock())
580 1
        with self.switch_stats_lock[switch.id]:
581 1
            if is_new_xid:
582 1
                switch.generic_new_flows = []
583 1
                self.switch_stats_xid[switch.id] = int(msg.header.xid)
584 1
            for flow_stats in msg.body:
585 1
                flow = GenericFlow.from_flow_stats(flow_stats,
586
                                                   switch.ofp_version)
587 1
                switch.generic_new_flows.append(flow)
588 1
            if is_last_part:
589 1
                switch.generic_flows = switch.generic_new_flows
590 1
                switch.generic_flows.sort(
591
                    key=lambda f: (f.priority, f.duration_sec),
592
                    reverse=True
593
                )
594 1
        if is_new_xid and is_last_part and switch.generic_flows != old_flows:
595
            # Generate an event informing that flows have changed
596 1
            event = KytosEvent('amlight/flow_stats.flows_updated')
597 1
            event.content['switch'] = switch.dpid
598 1
            self.controller.buffers.app.put(event)
599
600 1
    @listen_to('kytos/of_core.flow_stats.received')
601 1
    def on_stats_received(self, event):
602
        """Capture flow stats messages for OpenFlow 1.3."""
603
        self.handle_stats_received(event)
604
605 1
    def handle_stats_received(self, event):
606
        """Handle flow stats messages for OpenFlow 1.3."""
607 1
        switch = event.content['switch']
608 1
        if 'replies_flows' in event.content:
609 1
            replies_flows = event.content['replies_flows']
610 1
            self.handle_stats_reply_received(switch, replies_flows)
611
612
    # pylint: disable=no-self-use
613 1
    def handle_stats_reply_received(self, switch, replies_flows):
614
        """Iterate on the replies and set the generic flows"""
615 1
        switch.generic_flows = [GenericFlow.from_replies_flows(flow)
616
                                for flow in replies_flows]
617 1
        switch.generic_flows.sort(
618
                    key=lambda f: (f.priority, f.duration_sec),
619
                    reverse=True
620
                    )
621