Passed
Pull Request — master (#28)
by
unknown
05:56
created

build.main   F

Complexity

Total Complexity 146

Size/Duplication

Total Lines 602
Duplicated Lines 0 %

Test Coverage

Coverage 61.71%

Importance

Changes 0
Metric Value
wmc 146
eloc 434
dl 0
loc 602
rs 2
c 0
b 0
f 0
ccs 245
cts 397
cp 0.6171

31 Methods

Rating   Name   Duplication   Size   Complexity  
A GenericFlow.to_json() 0 3 1
C GenericFlow.from_flow_stats() 0 44 9
A GenericFlow.id() 0 24 3
A GenericFlow.__init__() 0 16 3
A GenericFlow.to_dict() 0 19 3
A GenericFlow.match_to_dict() 0 7 2
A GenericFlow.__eq__() 0 2 1
A Main.bytes_count() 0 12 2
A Main.handle_stats_received() 0 6 2
A Main.on_stats_reply_0x01() 0 7 2
A Main.packet_count_sum() 0 6 1
A Main.bytes_count_sum() 0 6 1
A Main.flow_match() 0 8 2
B Main.flows_counters() 0 38 7
A GenericFlow.do_match() 0 7 3
A Main.shutdown() 0 2 1
A Main.bytes_count_per_flow() 0 7 1
A Main.setup() 0 13 2
A Main.execute() 0 2 1
F Main.match_and_apply() 0 40 16
B GenericFlow.from_replies_flows() 0 29 6
A Main.packet_count() 0 12 2
B Main.match_flows() 0 28 7
A Main.handle_stats_reply_received() 0 7 2
B GenericFlow.match13() 0 19 7
F GenericFlow.match10() 0 80 40
A Main.flow_stats() 0 9 2
A Main.on_stats_received() 0 4 1
C Main.handle_stats_reply() 0 30 10
A Main.packet_count_per_flow() 0 7 1
A Main.flow_from_id() 0 10 5

How to fix   Complexity   

Complexity

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of amlight/flow_stats Kytos Network Application.
2
3
This NApp does operations with flows not covered by Kytos itself.
4
"""
5
# pylint: disable=too-many-return-statements,too-many-instance-attributes
6
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
7
8 1
import hashlib
9 1
import ipaddress
10 1
import json
11 1
from threading import Lock
12
13 1
import pyof.v0x01.controller2switch.common as common01
14 1
from flask import jsonify, request
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import listen_to
17 1
from napps.amlight.flow_stats.utils import format_request
18 1
from napps.amlight.sdntrace import constants
19 1
from napps.kytos.of_core.v0x01.flow import Action as Action10
20 1
from napps.kytos.of_core.v0x04.flow import Action as Action13
21 1
from napps.kytos.of_core.v0x04.match_fields import MatchFieldFactory
22 1
from pyof.v0x01.common.flow_match import FlowWildCards
23 1
from pyof.v0x04.common.flow_instructions import InstructionType
24
25
26 1
class GenericFlow():
27
    """Class to represent a flow.
28
29
        This class represents a flow regardless of the OF version."""
30
31 1
    def __init__(self, version='0x01', match=None, idle_timeout=0,
32
                 hard_timeout=0, duration_sec=0, packet_count=0, byte_count=0,
33
                 priority=0, table_id=0xff, cookie=None, buffer_id=None,
34
                 actions=None):
35 1
        self.version = version
36 1
        self.match = match if match else {}
37 1
        self.idle_timeout = idle_timeout
38 1
        self.hard_timeout = hard_timeout
39 1
        self.duration_sec = duration_sec
40 1
        self.packet_count = packet_count
41 1
        self.byte_count = byte_count
42 1
        self.priority = priority
43 1
        self.table_id = table_id
44 1
        self.cookie = cookie
45 1
        self.buffer_id = buffer_id
46 1
        self.actions = actions if actions else []
47
48 1
    def __eq__(self, other):
49
        return self.id == other.id
50
51 1
    @property
52 1
    def id(self):
53
        # pylint: disable=invalid-name
54
        """Return the hash of the object.
55
        Calculates the hash of the object by using the hashlib we use md5 of
56
        strings.
57
        Returns:
58
            string: Hash of object.
59
        """
60 1
        hash_result = hashlib.md5()
61 1
        hash_result.update(str(self.version).encode('utf-8'))
62 1
        for value in self.match.copy().values():
63 1
            if self.version == '0x01':
64 1
                hash_result.update(str(value).encode('utf-8'))
65
            else:
66 1
                hash_result.update(str(value.value).encode('utf-8'))
67 1
        hash_result.update(str(self.idle_timeout).encode('utf-8'))
68 1
        hash_result.update(str(self.hard_timeout).encode('utf-8'))
69 1
        hash_result.update(str(self.priority).encode('utf-8'))
70 1
        hash_result.update(str(self.table_id).encode('utf-8'))
71 1
        hash_result.update(str(self.cookie).encode('utf-8'))
72 1
        hash_result.update(str(self.buffer_id).encode('utf-8'))
73
74 1
        return hash_result.hexdigest()
75
76 1
    def to_dict(self):
77
        """Convert flow to a dictionary."""
78 1
        flow_dict = {}
79 1
        flow_dict['version'] = self.version
80 1
        if self.version == '0x01':
81 1
            flow_dict.update(self.match)
82
        else:
83 1
            flow_dict.update(self.match_to_dict())
84 1
        flow_dict['idle_timeout'] = self.idle_timeout
85 1
        flow_dict['hard_timeout'] = self.hard_timeout
86 1
        flow_dict['priority'] = self.priority
87 1
        flow_dict['table_id'] = self.table_id
88 1
        flow_dict['cookie'] = self.cookie
89 1
        flow_dict['buffer_id'] = self.buffer_id
90 1
        flow_dict['actions'] = []
91 1
        for action in self.actions:
92 1
            flow_dict['actions'].append(action.as_dict())
93
94 1
        return flow_dict
95
96 1
    def match_to_dict(self):
97
        """Convert a match in OF 1.3 to a dictionary."""
98
        # pylint: disable=consider-using-dict-items
99 1
        match = {}
100 1
        for name in self.match.copy():
101 1
            match[name] = self.match[name].value
102 1
        return match
103
104 1
    def to_json(self):
105
        """Return a json version of the flow."""
106
        return json.dumps(self.to_dict())
107
108
    # @staticmethod
109
    # def from_dict(flow_dict):
110
    #     """Create a flow from a dict."""
111
    #     flow = GenericFlow()
112
    #     for attr_name, value in flow_dict.items():
113
    #         if attr_name == 'actions':
114
    #             flow.actions = []
115
    #             for action in value:
116
    #                 new_action = ACTION_TYPES[int(action['action_type'])]()
117
    #                 for action_attr_name,
118
    #                     action_attr_value in action.items():
119
    #
120
    #                     setattr(new_action,
121
    #                             action_attr_name,
122
    #                             action_attr_value)
123
    #                 flow.actions.append(new_action)
124
    #         else:
125
    #             setattr(flow, attr_name, value)
126
    #     return flow
127
128 1
    @classmethod
129 1
    def from_flow_stats(cls, flow_stats, version='0x01'):
130
        """Create a flow from OF flow stats."""
131 1
        flow = GenericFlow(version=version)
132 1
        flow.idle_timeout = flow_stats.idle_timeout.value
133 1
        flow.hard_timeout = flow_stats.hard_timeout.value
134 1
        flow.priority = flow_stats.priority.value
135 1
        flow.table_id = flow_stats.table_id.value
136 1
        flow.duration_sec = flow_stats.duration_sec.value
137 1
        flow.packet_count = flow_stats.packet_count.value
138 1
        flow.byte_count = flow_stats.byte_count.value
139 1
        if version == '0x01':
140 1
            flow.match['wildcards'] = flow_stats.match.wildcards.value
141 1
            flow.match['in_port'] = flow_stats.match.in_port.value
142 1
            flow.match['eth_src'] = flow_stats.match.dl_src.value
143 1
            flow.match['eth_dst'] = flow_stats.match.dl_dst.value
144 1
            flow.match['vlan_vid'] = flow_stats.match.dl_vlan.value
145 1
            flow.match['vlan_pcp'] = flow_stats.match.dl_vlan_pcp.value
146 1
            flow.match['eth_type'] = flow_stats.match.dl_type.value
147 1
            flow.match['ip_tos'] = flow_stats.match.nw_tos.value
148 1
            flow.match['ipv4_src'] = flow_stats.match.nw_src.value
149 1
            flow.match['ipv4_dst'] = flow_stats.match.nw_dst.value
150 1
            flow.match['ip_proto'] = flow_stats.match.nw_proto.value
151 1
            flow.match['tcp_src'] = flow_stats.match.tp_src.value
152 1
            flow.match['tcp_dst'] = flow_stats.match.tp_dst.value
153 1
            flow.actions = []
154 1
            for of_action in flow_stats.actions:
155 1
                action = Action10.from_of_action(of_action)
156 1
                flow.actions.append(action)
157 1
        elif version == '0x04':
158 1
            for match in flow_stats.match.oxm_match_fields:
159 1
                match_field = MatchFieldFactory.from_of_tlv(match)
160 1
                field_name = match_field.name
161 1
                if field_name == 'dl_vlan':
162 1
                    field_name = 'vlan_vid'
163 1
                flow.match[field_name] = match_field
164 1
            flow.actions = []
165 1
            for instruction in flow_stats.instructions:
166 1
                if instruction.instruction_type == \
167
                        InstructionType.OFPIT_APPLY_ACTIONS:
168 1
                    for of_action in instruction.actions:
169 1
                        action = Action13.from_of_action(of_action)
170 1
                        flow.actions.append(action)
171 1
        return flow
172
173 1
    @classmethod
174 1
    def from_replies_flows(cls, flow04):
175
        """Create a flow from a flow passed on
176
        replies_flows in event kytos/of_core.flow_stats.received."""
177
178 1
        flow = GenericFlow(version='0x04')
179 1
        flow.idle_timeout = flow04.idle_timeout.value
180 1
        flow.hard_timeout = flow04.hard_timeout.value
181 1
        flow.priority = flow04.priority.value
182 1
        flow.table_id = flow04.table_id.value
183 1
        flow.cookie = flow04.cookie.value
184 1
        flow.duration_sec = flow04.duration_sec.value
185 1
        flow.packet_count = flow04.packet_count.value
186 1
        flow.byte_count = flow04.byte_count.value
187
188 1
        for match in flow04.match.oxm_match_fields:
189 1
            match_field = MatchFieldFactory.from_of_tlv(match)
190 1
            field_name = match_field.name
191 1
            if field_name == 'dl_vlan':
192 1
                field_name = 'vlan_vid'
193 1
            flow.match[field_name] = match_field
194 1
        flow.actions = []
195 1
        for instruction in flow04.instructions:
196 1
            if instruction.instruction_type == \
197
                    InstructionType.OFPIT_APPLY_ACTIONS:
198 1
                for of_action in instruction.actions:
199 1
                    action = Action13.from_of_action(of_action)
200 1
                    flow.actions.append(action)
201 1
        return flow
202
203 1
    def do_match(self, args):
204
        """Match a packet against this flow."""
205
        if self.version == '0x01':
206
            return self.match10(args)
207
        if self.version == '0x04':
208
            return self.match13(args)
209
        return None
210
211 1
    def match10(self, args):
212
        """Match a packet against this flow (OF1.0)."""
213
        log.debug('Matching packet')
214
        if not self.match['wildcards'] & FlowWildCards.OFPFW_IN_PORT:
215
            if 'in_port' not in args:
216
                return False
217
            if self.match['in_port'] != int(args['in_port']):
218
                return False
219
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN_PCP:
220
            if 'vlan_pcp' not in args:
221
                return False
222
            if self.match['vlan_pcp'] != int(args['vlan_pcp']):
223
                return False
224
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_VLAN:
225
            if 'vlan_vid' not in args:
226
                return False
227
            if self.match['vlan_vid'] != args['vlan_vid'][-1]:
228
                return False
229
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_SRC:
230
            if 'eth_src' not in args:
231
                return False
232
            if self.match['eth_src'] != args['eth_src']:
233
                return False
234
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_DST:
235
            if 'eth_dst' not in args:
236
                return False
237
            if self.match['eth_dst'] != args['eth_dst']:
238
                return False
239
        if not self.match['wildcards'] & FlowWildCards.OFPFW_DL_TYPE:
240
            if 'eth_type' not in args:
241
                return False
242
            if self.match['eth_type'] != int(args['eth_type']):
243
                return False
244
        if self.match['eth_type'] == constants.IPV4:
245
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_src']))
246
            if flow_ip_int != 0:
247
                mask = ((self.match['wildcards'] &
248
                         FlowWildCards.OFPFW_NW_SRC_MASK) >>
249
                        FlowWildCards.OFPFW_NW_SRC_SHIFT)
250
                mask = min(mask, 32)
251
                if mask != 32 and 'ipv4_src' not in args:
252
                    return False
253
                mask = (0xffffffff << mask) & 0xffffffff
254
                ip_int = int(ipaddress.IPv4Address(args['ipv4_src']))
255
                if ip_int & mask != flow_ip_int & mask:
256
                    return False
257
258
            flow_ip_int = int(ipaddress.IPv4Address(self.match['ipv4_dst']))
259
            if flow_ip_int != 0:
260
                mask = ((self.match['wildcards'] &
261
                         FlowWildCards.OFPFW_NW_DST_MASK) >>
262
                        FlowWildCards.OFPFW_NW_DST_SHIFT)
263
                mask = min(mask, 32)
264
                if mask != 32 and 'ipv4_dst' not in args:
265
                    return False
266
                mask = (0xffffffff << mask) & 0xffffffff
267
                ip_int = int(ipaddress.IPv4Address(args['ipv4_dst']))
268
                if ip_int & mask != flow_ip_int & mask:
269
                    return False
270
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_TOS:
271
                if 'ip_tos' not in args:
272
                    return False
273
                if self.match['ip_tos'] != int(args['ip_tos']):
274
                    return False
275
            if not self.match['wildcards'] & FlowWildCards.OFPFW_NW_PROTO:
276
                if 'ip_proto' not in args:
277
                    return False
278
                if self.match['ip_proto'] != int(args['ip_proto']):
279
                    return False
280
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_SRC:
281
                if 'tp_src' not in args:
282
                    return False
283
                if self.match['tcp_src'] != int(args['tp_src']):
284
                    return False
285
            if not self.match['wildcards'] & FlowWildCards.OFPFW_TP_DST:
286
                if 'tp_dst' not in args:
287
                    return False
288
                if self.match['tcp_dst'] != int(args['tp_dst']):
289
                    return False
290
        return self
291
292 1
    def match13(self, args):
293
        """Match a packet against this flow (OF1.3)."""
294
        # pylint: disable=consider-using-dict-items
295
        for name in self.match.copy():
296
            if name not in args:
297
                return False
298
            if name == 'vlan_vid':
299
                field = args[name][-1]
300
            else:
301
                field = args[name]
302
            if name not in ('ipv4_src', 'ipv4_dst', 'ipv6_src', 'ipv6_dst'):
303
                if self.match[name].value != field:
304
                    return False
305
            else:
306
                packet_ip = int(ipaddress.ip_address(field))
307
                ip_addr = self.match[name].value
308
                if packet_ip & ip_addr.netmask != ip_addr.address:
309
                    return False
310
        return self
311
312
313 1
class Main(KytosNApp):
314
    """Main class of amlight/flow_stats NApp.
315
316
    This class is the entry point for this napp.
317
    """
318
319 1
    def setup(self):
320
        """Replace the '__init__' method for the KytosNApp subclass.
321
322
        The setup method is automatically called by the controller when your
323
        application is loaded.
324
325
        So, if you have any setup routine, insert it here.
326
        """
327 1
        log.info('Starting Kytos/Amlight flow manager')
328 1
        for switch in self.controller.switches.copy().values():
329
            switch.generic_flows = []
330 1
        self.switch_stats_xid = {}
331 1
        self.switch_stats_lock = {}
332
333 1
    def execute(self):
334
        """This method is executed right after the setup method execution.
335
336
        You can also use this method in loop mode if you add to the above setup
337
        method a line like the following example:
338
339
            self.execute_as_loop(30)  # 30-second interval.
340
        """
341
342 1
    def shutdown(self):
343
        """This method is executed when your napp is unloaded.
344
345
        If you have some cleanup procedure, insert it here.
346
        """
347
348 1
    def flow_from_id(self, flow_id):
349
        """Flow from given flow_id."""
350 1
        for switch in self.controller.switches.copy().values():
351 1
            try:
352 1
                for flow in switch.generic_flows:
353 1
                    if flow.id == flow_id:
354 1
                        return flow
355
            except KeyError:
356
                pass
357 1
        return None
358
359 1
    @rest('flow/match/<dpid>')
360 1
    def flow_match(self, dpid):
361
        """Return first flow matching request."""
362 1
        switch = self.controller.get_switch_by_dpid(dpid)
363 1
        flow = self.match_flows(switch, format_request(request.args), False)
364 1
        if flow:
365 1
            return jsonify(flow.to_dict())
366 1
        return "No match", 404
367
368 1
    @rest('flow/stats/<dpid>')
369 1
    def flow_stats(self, dpid):
370
        """Return all flows matching request."""
371 1
        switch = self.controller.get_switch_by_dpid(dpid)
372 1
        if not switch:
373
            return f"switch {dpid} not found", 404
374 1
        flows = self.match_flows(switch, format_request(request.args), True)
375 1
        flows = [flow.to_dict() for flow in flows]
376 1
        return jsonify(flows)
377
378 1
    @staticmethod
379 1
    def match_flows(switch, args, many=True):
380
        # pylint: disable=bad-staticmethod-argument
381
        """
382
        Match the packet in request against the flows installed in the switch.
383
384
        Try the match with each flow, in other. If many is True, tries the
385
        match with all flows, if False, tries until the first match.
386
        :param args: packet data
387
        :param many: Boolean, indicating whether to continue after matching the
388
                first flow or not
389
        :return: If many, the list of matched flows, or the matched flow
390
        """
391
        response = []
392
        try:
393
            for flow in switch.generic_flows:
394
                match = flow.do_match(args)
395
                if match:
396
                    if many:
397
                        response.append(match)
398
                    else:
399
                        response = match
400
                        break
401
        except AttributeError:
402
            return None
403
        if not many and isinstance(response, list):
404
            return None
405
        return response
406
407 1
    @staticmethod
408 1
    def match_and_apply(switch, args):
409
        # pylint: disable=bad-staticmethod-argument
410
        """Match flows and apply actions.
411
412
        Match given packet (in args) against the switch flows and,
413
        if a match flow is found, apply its actions."""
414
        flow = Main.match_flows(switch, args, False)
415
        port = None
416
        actions = None
417
        # pylint: disable=too-many-nested-blocks
418
        if flow:
419
            actions = flow.actions
420
            if switch.ofp_version == '0x01':
421
                for action in actions:
422
                    action_type = action.action_type
423
                    if action_type == 'output':
424
                        port = action.port
425
                    elif action_type == 'set_vlan':
426
                        if 'vlan_vid' in args:
427
                            args['vlan_vid'][-1] = action.vlan_id
428
                        else:
429
                            args['vlan_vid'] = [action.vlan_id]
430
            elif switch.ofp_version == '0x04':
431
                for action in actions:
432
                    action_type = action.action_type
433
                    if action_type == 'output':
434
                        port = action.port
435
                    if action_type == 'push_vlan':
436
                        if 'vlan_vid' not in args:
437
                            args['vlan_vid'] = []
438
                        args['vlan_vid'].append(0)
439
                    if action_type == 'pop_vlan':
440
                        if 'vlan_vid' in args:
441
                            args['vlan_vid'].pop()
442
                            if len(args['vlan_vid']) == 0:
443
                                del args['vlan_vid']
444
                    if action_type == 'set_vlan':
445
                        args['vlan_vid'][-1] = action.vlan_id
446
        return flow, args, port
447
448 1
    @rest('packet_count/<flow_id>')
449 1
    def packet_count(self, flow_id):
450
        """Packet count of an specific flow."""
451 1
        flow = self.flow_from_id(flow_id)
452 1
        if flow is None:
453 1
            return "Flow does not exist", 404
454 1
        packet_stats = {
455
            'flow_id': flow_id,
456
            'packet_counter': flow.packet_count,
457
            'packet_per_second': flow.packet_count / flow.duration_sec
458
        }
459 1
        return jsonify(packet_stats)
460
461 1
    @rest('bytes_count/<flow_id>')
462 1
    def bytes_count(self, flow_id):
463
        """Bytes count of an specific flow."""
464 1
        flow = self.flow_from_id(flow_id)
465 1
        if flow is None:
466 1
            return "Flow does not exist", 404
467 1
        bytes_stats = {
468
            'flow_id': flow_id,
469
            'bytes_counter': flow.byte_count,
470
            'bits_per_second': flow.byte_count * 8 / flow.duration_sec
471
        }
472 1
        return jsonify(bytes_stats)
473
474 1
    @rest('packet_count/per_flow/<dpid>')
475 1
    def packet_count_per_flow(self, dpid):
476
        """Per flow packet count."""
477 1
        return self.flows_counters('packet_count',
478
                                   dpid,
479
                                   counter='packet_counter',
480
                                   rate='packet_per_second')
481
482 1
    @rest('packet_count/sum/<dpid>')
483 1
    def packet_count_sum(self, dpid):
484
        """Sum of packet count flow stats."""
485 1
        return self.flows_counters('packet_count',
486
                                   dpid,
487
                                   total=True)
488
489 1
    @rest('bytes_count/per_flow/<dpid>')
490 1
    def bytes_count_per_flow(self, dpid):
491
        """Per flow bytes count."""
492 1
        return self.flows_counters('byte_count',
493
                                   dpid,
494
                                   counter='bytes_counter',
495
                                   rate='bits_per_second')
496
497 1
    @rest('bytes_count/sum/<dpid>')
498 1
    def bytes_count_sum(self, dpid):
499
        """Sum of bytes count flow stats."""
500 1
        return self.flows_counters('byte_count',
501
                                   dpid,
502
                                   total=True)
503
504 1
    def flows_counters(self, field, dpid, counter=None, rate=None,
505
                       total=False):
506
        """Calculate flows statistics.
507
508
        The returned statistics are both per flow and for the sum of flows
509
        """
510
        # pylint: disable=too-many-arguments
511
        # pylint: disable=unused-variable
512 1
        start_date = request.args.get('start_date', 0)
513 1
        end_date = request.args.get('end_date', 0)
514
        # pylint: enable=unused-variable
515
516 1
        if total:
517 1
            count_flows = 0
518
        else:
519 1
            count_flows = []
520 1
            if not counter:
521
                counter = field
522 1
            if not rate:
523
                rate = field
524
525
        # We don't have statistics persistence yet, so for now this only works
526
        # for start and end equals to zero
527 1
        flows = self.controller.get_switch_by_dpid(dpid).generic_flows
528
529 1
        for flow in flows:
530 1
            count = getattr(flow, field)
531 1
            if total:
532 1
                count_flows += count
533
            else:
534 1
                per_second = count / flow.duration_sec
535 1
                if rate.startswith('bits'):
536 1
                    per_second *= 8
537 1
                count_flows.append({'flow_id': flow.id,
538
                                    counter: count,
539
                                    rate: per_second})
540
541 1
        return jsonify(count_flows)
542
543 1
    @listen_to('kytos/of_core.v0x01.messages.in.ofpt_stats_reply')
544 1
    def on_stats_reply_0x01(self, event):
545
        """Capture flow stats messages for v0x01 switches."""
546
        msg = event.content['message']
547
        if msg.body_type == common01.StatsType.OFPST_FLOW:
548
            switch = event.source.switch
549
            self.handle_stats_reply(msg, switch)
550
551 1
    def handle_stats_reply(self, msg, switch):
552
        """Insert flows received in the switch list of flows."""
553 1
        try:
554 1
            old_flows = switch.generic_flows
555
        except AttributeError:
556
            old_flows = []
557 1
        is_new_xid = (
558
            int(msg.header.xid) != self.switch_stats_xid.get(switch.id, 0)
559
        )
560 1
        is_last_part = msg.flags.value % 2 == 0
561 1
        self.switch_stats_lock.setdefault(switch.id, Lock())
562 1
        with self.switch_stats_lock[switch.id]:
563 1
            if is_new_xid:
564 1
                switch.generic_new_flows = []
565 1
                self.switch_stats_xid[switch.id] = int(msg.header.xid)
566 1
            for flow_stats in msg.body:
567 1
                flow = GenericFlow.from_flow_stats(flow_stats,
568
                                                   switch.ofp_version)
569 1
                switch.generic_new_flows.append(flow)
570 1
            if is_last_part:
571 1
                switch.generic_flows = switch.generic_new_flows
572 1
                switch.generic_flows.sort(
573
                    key=lambda f: (f.priority, f.duration_sec),
574
                    reverse=True
575
                )
576 1
        if is_new_xid and is_last_part and switch.generic_flows != old_flows:
577
            # Generate an event informing that flows have changed
578 1
            event = KytosEvent('amlight/flow_stats.flows_updated')
579 1
            event.content['switch'] = switch.dpid
580 1
            self.controller.buffers.app.put(event)
581
582 1
    @listen_to('kytos/of_core.flow_stats.received')
583 1
    def on_stats_received(self, event):
584
        """Capture flow stats messages for OpenFlow 1.3."""
585
        self.handle_stats_received(event)
586
587 1
    def handle_stats_received(self, event):
588
        """Handle flow stats messages for OpenFlow 1.3."""
589 1
        switch = event.content['switch']
590 1
        if 'replies_flows' in event.content:
591 1
            replies_flows = event.content['replies_flows']
592 1
            self.handle_stats_reply_received(switch, replies_flows)
593
594
    # pylint: disable=no-self-use
595 1
    def handle_stats_reply_received(self, switch, replies_flows):
596
        """Iterate on the replies and set the generic flows"""
597 1
        switch.generic_flows = [GenericFlow.from_replies_flows(flow)
598
                                for flow in replies_flows]
599 1
        switch.generic_flows.sort(
600
                    key=lambda f: (f.priority, f.duration_sec),
601
                    reverse=True
602
                )
603