Completed
Push — master ( 229cac...232edf )
by Thomas
10:56
created

lib/exabgp/application/flow.py (4 issues)

1
#!/usr/bin/python
2
"""
3
flow.py
4
5
Created by Thomas Mangin on 2017-07-06.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
# based on the blog at: http://blog.sflow.com/2017/07/bgp-flowspec-on-white-box-switch.html
11
12
import os
13
import sys
14
import json
15
import re
16
import subprocess
17
import signal
18
19
20
class ACL(object):
21
    dry = os.environ.get('CUMULUS_FLOW_RIB', False)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable os does not seem to be defined.
Loading history...
22
23
    path = '/etc/cumulus/acl/policy.d/'
24
    priority = '60'
25
    prefix = 'flowspec'
26
    bld = '.bld'
27
    suffix = '.rules'
28
29
    __uid = 0
30
    _known = dict()
31
32
    @classmethod
33
    def _uid(cls):
34
        cls.__uid += 1
35
        return cls.__uid
36
37
    @classmethod
38
    def _file(cls, name):
39
        return cls.path + cls.priority + cls.prefix + str(name) + cls.suffix
40
41
    @classmethod
42
    def _delete(cls, key):
43
        if key not in cls._known:
44
            return
45
        # removing key first so the call to clear never loops forever
46
        uid, acl = cls._known.pop(key)
47
        try:
48
            filename = cls._file(uid)
49
            if os.path.isfile(filename):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable os does not seem to be defined.
Loading history...
50
                os.unlink(filename)
51
        except Exception:
52
            pass
53
54
    @classmethod
55
    def _commit(cls):
56
        if cls.dry:
57
            cls.show()
58
            return
59
        try:
60
            return subprocess.Popen(
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable subprocess does not seem to be defined.
Loading history...
61
                ['cl-acltool', '-i'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE
62
            ).communicate()[0]
63
        except Exception:
64
            pass
65
66
    @staticmethod
67
    def _build(flow, action):
68
        acl = '[iptables]\n-A FORWARD --in-interface swp+'
69
        if 'protocol' in flow:
70
            acl += ' -p ' + re.sub('[!<>=]', '', flow['protocol'][0])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable re does not seem to be defined.
Loading history...
71
        if 'source-ipv4' in flow:
72
            acl += ' -s ' + flow['source-ipv4'][0]
73
        if 'destination-ipv4' in flow:
74
            acl += ' -d ' + flow['destination-ipv4'][0]
75
        if 'source-port' in flow:
76
            acl += ' --sport ' + re.sub('[!<>=]', '', flow['source-port'][0])
77
        if 'destination-port' in flow:
78
            acl += ' --dport ' + re.sub('[!<>=]', '', flow['destination-port'][0])
79
        acl = acl + ' -j DROP\n'
80
        return acl
81
82
    @classmethod
83
    def insert(cls, flow, action):
84
        key = flow['string']
85
        if key in cls._known:
86
            return
87
        uid = cls._uid()
88
        acl = cls._build(flow, action)
89
        cls._known[key] = (uid, acl)
90
        try:
91
            with open(cls._file(uid), 'w') as f:
92
                f.write(acl)
93
            cls._commit()
94
        except Exception:
95
            cls.end()
96
97
    @classmethod
98
    def remove(cls, flow):
99
        key = flow['string']
100
        if key not in cls._known:
101
            return
102
        uid, _ = cls._known[key]
103
        cls._delete(key)
104
105
    @classmethod
106
    def clear(cls):
107
        for key in cls._known:
108
            cls._delete(key)
109
        cls._commit()
110
111
    @classmethod
112
    def end(cls):
113
        cls.clear()
114
        sys.exit(1)
115
116
    @classmethod
117
    def show(cls):
118
        for key, (uid, _) in cls._known.items():
119
            sys.stderr.write('%d %s\n' % (uid, key))
120
        for _, acl in cls._known.values():
121
            sys.stderr.write('%s' % acl)
122
        sys.stderr.flush()
123
124
125
signal.signal(signal.SIGTERM, ACL.end)
126
127
128
opened = 0
129
buffered = ''
130
131
while True:
132
    try:
133
        line = sys.stdin.readline()
134
        if not line or 'shutdown' in line:
135
            ACL.end()
136
        buffered += line
137
        opened += line.count('{')
138
        opened -= line.count('}')
139
        if opened:
140
            continue
141
        line, buffered = buffered, ''
142
        message = json.loads(line)
143
144
        if message['type'] == 'state' and message['neighbor']['state'] == 'down':
145
            ACL.clear()
146
            continue
147
148
        if message['type'] != 'update':
149
            continue
150
151
        update = message['neighbor']['message']['update']
152
153
        if 'announce' in update:
154
            flow = update['announce']['ipv4 flow']
155
            # The RFC allows both encoding
156
            flow = flow['no-nexthop'][0] if 'no-nexthop' in flow else flow[0]
157
158
            community = update['attribute']['extended-community'][0]
159
            ACL.insert(flow, community)
160
            continue
161
162
        if 'withdraw' in update:
163
            flow = update['withdraw']['ipv4 flow'][0]
164
            ACL.remove(flow)
165
            continue
166
167
    except KeyboardInterrupt:
168
        ACL.end()
169
    except Exception:
170
        pass
171