Passed
Pull Request — master (#964)
by
unknown
13:09
created

LsGenericFlags.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
Copyright (c) 2016 Evelio Vila <[email protected]>
4
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
5
License: 3-clause BSD. (See the COPYRIGHT file)
6
"""
7
8
import json
9
import binascii
10
import itertools
11
from struct import unpack
12
13
from exabgp.util import concat_strs
14
from exabgp.vendoring.bitstring import BitArray
15
from exabgp.bgp.message.notification import Notify
16
from exabgp.bgp.message.update.attribute.attribute import Attribute
17
18
19
@Attribute.register()
20
class LINKSTATE(Attribute):
21
	ID = Attribute.CODE.BGP_LS
22
	FLAG = Attribute.Flag.OPTIONAL
23
	TLV = -1
24
25
	# Registered subclasses we know how to decode
26
	registered_lsids = dict()
27
28
	# what this implementation knows as LS attributes
29
	node_lsids = []
30
	link_lsids = []
31
	prefix_lsids = []
32
33
	def __init__(self, ls_attrs):
34
		self.ls_attrs = ls_attrs
35
36
	@classmethod
37
	def register(cls,lsid=None,flag=None):
38
		def register_lsid (klass):
39
			scode = klass.TLV if lsid is None else lsid
40
			if scode in cls.registered_lsids:
41
				raise RuntimeError('only one class can be registered per BGP link state attribute type')
42
			cls.registered_lsids[scode] = klass
43
			return klass
44
		return register_lsid
45
46
	@classmethod
47
	def registered(cls, lsid, flag=None):
48
		return lsid in cls.registered_lsids
49
50
	@classmethod
51
	def unpack(cls, data, negotiated):
52
		ls_attrs = []
53
		while data:
54
			scode, length = unpack('!HH',data[:4])
55
			if scode in cls.registered_lsids:
56
				klass = cls.registered_lsids[scode].unpack(data[4:length+4],length)
57
			else:
58
				klass = GenericLSID(scode,data[4:length+4])
59
			klass.TLV = scode
60
			ls_attrs.append(klass)
61
			data = data[length+4:]
62
		# Remove all but last instance of each TLV class - the last holds state for all TLVs of that type
63
		ls_attrs = list({ls_attr.TLV: ls_attr for ls_attr in ls_attrs}.values())
64
		for klass in ls_attrs:
65
			if hasattr(klass, 'terids') or hasattr(klass, 'sr_adj_sids') or hasattr(klass, 'sr_adj_lan_sids'):
66
				klass.reset()
67
68
		return cls(ls_attrs=ls_attrs)
69
70
	def json (self,compact=None):
71
		content = ', '.join(d.json() for d in self.ls_attrs)
72
		return '{ %s }' % (content)
73
74
	def __str__(self):
75
		return ', '.join(str(d) for d in self.ls_attrs)
76
77
78 View Code Duplication
class GenericLSID(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
79
	TLV = 99999
80
81
	def __init__ (self, code, rep):
82
		self.rep = rep
83
		self.code = code
84
85
	def __repr__ (self):
86
		return "Attribute with code [ %s ] not implemented" % (self.code)
87
88
	@classmethod
89
	def unpack (cls,scode,data):
90
		length = len(data)
91
		info = binascii.b2a_uu(data[:length])
92
		return cls(code=scode,rep=info)
93
94
	def json (self,compact=None):
95
		return '"attribute-not-implemented": "%s"' % (self.code)
96
97
98
class LsGenericFlags(object):
99
100
#	draft-ietf-isis-segment-routing-extensions Prefix-SID Sub-TLV
101
	ISIS_SR_FLAGS = ['R', 'N', 'P', 'E', 'V', 'L', 'RSV', 'RSV']
102
#	RFC 7794 IPv4/IPv6 Extended Reachability Attribute Flags
103
	ISIS_SR_ATTR_FLAGS = ['X', 'R', 'N', 'RSV', 'RSV', 'RSV', 'RSV', 'RSV']
104
#	draft-ietf-isis-segment-routing-extensions - Adj-SID IS-IS Flags
105
	ISIS_SR_ADJ_FLAGS = ['F', 'B', 'V', 'L', 'S', 'P', 'RSV', 'RSV']
106
# 	isis-segment-routing-extensions 3.1. SR-Capabilities Sub-TLV
107
	ISIS_SR_CAP_FLAGS = ['I', 'V', 'RSV', 'RSV', 'RSV', 'RSV', 'RSV', 'RSV']
108
#	RFC 7752 3.3.1.1. Node Flag Bits TLV
109
	LS_NODE_FLAGS = ['O', 'T', 'E', 'B', 'R', 'V', 'RSV', 'RSV']
110
#	RFC 7752 3.3.3.1. IGP Flags TLV
111
	LS_IGP_FLAGS = ['D', 'N', 'L', 'P', 'RSV', 'RSV', 'RSV', 'RSV']
112
#	RFC 7752 3.3.2.2.  MPLS Protocol Mask TLV
113
	LS_MPLS_MASK = ['LDP', 'RSVP-TE', 'RSV', 'RSV', 'RSV', 'RSV', 'RSV', 'RSV']
114
#	RFC 5307 1.2.
115
	LS_PROTECTION_MASK = ['ExtraTrafic', 'Unprotected', 'Shared', 'Dedicated 1:1',
116
							'Dedicated 1+1', 'Enhanced', 'RSV', 'RSV']
117
118
	def __init__(self,flags):
119
		self.flags = flags
120
121
	@classmethod
122
	def unpack(cls,data,pattern):
123
		pad = pattern.count('RSV')
124
		repeat = len(pattern) - pad
125
		flag_array = binascii.b2a_hex(data)
126
		hex_rep = hex(int(flag_array, 16))
127
		bit_array = BitArray(hex_rep)
128
		valid_flags = [concat_strs(''.join(item), ''.join(itertools.repeat('0',pad)))
129
				for item in itertools.product('01', repeat=repeat)]
130
		valid_flags.append('0000')
131
		if bit_array.bin in valid_flags:
132
			flags = dict(zip(pattern, [0, ] * len(pattern)))
133
			flags.update(dict((k, int(v)) for k, v in zip(pattern, bit_array.bin)))
134
		else:
135
			raise Notify(3,5, "Invalid SR flags mask")
136
		return cls(flags=flags)
137
138
	def json (self,compact=None):
139
		return json.dumps(self.flags)
140
141
	def __repr__ (self):
142
		return str(self.flags)
143