ObjectIdentifierEncoder.encodeValue()   C
last analyzed

Complexity

Conditions 11

Size

Total Lines 40
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 28
nop 5
dl 0
loc 40
rs 5.4
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like pyasn1.codec.ber.encoder.ObjectIdentifierEncoder.encodeValue() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# BER encoder
2
from pyasn1.type import base, tag, univ, char, useful
3
from pyasn1.codec.ber import eoo
4
from pyasn1.compat.octets import int2oct, ints2octs, null, str2octs
5
from pyasn1 import error
6
7
class Error(Exception): pass
8
9
class AbstractItemEncoder:
10
    supportIndefLenMode = 1
11
    def encodeTag(self, t, isConstructed):
12
        tagClass, tagFormat, tagId = t.asTuple()  # this is a hotspot
13
        v = tagClass | tagFormat
14
        if isConstructed:
15
            v = v|tag.tagFormatConstructed
16
        if tagId < 31:
17
            return int2oct(v|tagId)
18
        else:
19
            s = int2oct(tagId&0x7f)
20
            tagId = tagId >> 7
21
            while tagId:
22
                s = int2oct(0x80|(tagId&0x7f)) + s
23
                tagId = tagId >> 7
24
            return int2oct(v|0x1F) + s
25
26
    def encodeLength(self, length, defMode):
27
        if not defMode and self.supportIndefLenMode:
28
            return int2oct(0x80)
29
        if length < 0x80:
30
            return int2oct(length)
31
        else:
32
            substrate = null
33
            while length:
34
                substrate = int2oct(length&0xff) + substrate
35
                length = length >> 8
36
            substrateLen = len(substrate)
37
            if substrateLen > 126:
38
                raise Error('Length octets overflow (%d)' % substrateLen)
39
            return int2oct(0x80 | substrateLen) + substrate
40
41
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
42
        raise Error('Not implemented')
43
44
    def _encodeEndOfOctets(self, encodeFun, defMode):
45
        if defMode or not self.supportIndefLenMode:
46
            return null
47
        else:
48
            return encodeFun(eoo.endOfOctets, defMode)
49
50
    def encode(self, encodeFun, value, defMode, maxChunkSize):
51
        substrate, isConstructed = self.encodeValue(
52
            encodeFun, value, defMode, maxChunkSize
53
            )
54
        tagSet = value.getTagSet()
55
        if tagSet:
56
            if not isConstructed:  # primitive form implies definite mode
57
                defMode = 1
58
            return self.encodeTag(
59
                tagSet[-1], isConstructed
60
                ) + self.encodeLength(
61
                len(substrate), defMode
62
                ) + substrate + self._encodeEndOfOctets(encodeFun, defMode)
63
        else:
64
            return substrate  # untagged value
65
66
class EndOfOctetsEncoder(AbstractItemEncoder):
67
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
68
        return null, 0
69
70
class ExplicitlyTaggedItemEncoder(AbstractItemEncoder):
71
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
72
        if isinstance(value, base.AbstractConstructedAsn1Item):
73
            value = value.clone(tagSet=value.getTagSet()[:-1],
74
                                cloneValueFlag=1)
75
        else:
76
            value = value.clone(tagSet=value.getTagSet()[:-1])
77
        return encodeFun(value, defMode, maxChunkSize), 1
78
79
explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder()
80
81
class IntegerEncoder(AbstractItemEncoder):
82
    supportIndefLenMode = 0
83
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
84
        octets = []
85
        value = int(value) # to save on ops on asn1 type
86
        while 1:
87
            octets.insert(0, value & 0xff)
88
            if value == 0 or value == -1:
89
                break
90
            value = value >> 8
91
        if value == 0 and octets[0] & 0x80:
92
            octets.insert(0, 0)
93
        while len(octets) > 1 and \
94
                  (octets[0] == 0 and octets[1] & 0x80 == 0 or \
95
                   octets[0] == 0xff and octets[1] & 0x80 != 0):
96
            del octets[0]
97
        return ints2octs(octets), 0
98
99
class BitStringEncoder(AbstractItemEncoder):
100
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
101
        if not maxChunkSize or len(value) <= maxChunkSize*8:
102
            r = {}; l = len(value); p = 0; j = 7
103
            while p < l:
104
                i, j = divmod(p, 8)
105
                r[i] = r.get(i,0) | value[p]<<(7-j)
106
                p = p + 1
107
            keys = list(r); keys.sort()
108
            return int2oct(7-j) + ints2octs([r[k] for k in keys]), 0
109
        else:
110
            pos = 0; substrate = null
111
            while 1:
112
                # count in octets
113
                v = value.clone(value[pos*8:pos*8+maxChunkSize*8])
114
                if not v:
115
                    break
116
                substrate = substrate + encodeFun(v, defMode, maxChunkSize)
117
                pos = pos + maxChunkSize
118
            return substrate, 1
119
120
class OctetStringEncoder(AbstractItemEncoder):
121
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
122
        if not maxChunkSize or len(value) <= maxChunkSize:
123
            return value.asOctets(), 0
124
        else:
125
            pos = 0; substrate = null
126
            while 1:
127
                v = value.clone(value[pos:pos+maxChunkSize])
128
                if not v:
129
                    break
130
                substrate = substrate + encodeFun(v, defMode, maxChunkSize)
131
                pos = pos + maxChunkSize
132
            return substrate, 1
133
134
class NullEncoder(AbstractItemEncoder):
135
    supportIndefLenMode = 0
136
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
137
        return null, 0
138
139
class ObjectIdentifierEncoder(AbstractItemEncoder):
140
    supportIndefLenMode = 0
141
    precomputedValues = {
142
        (1, 3, 6, 1, 2): (43, 6, 1, 2),
143
        (1, 3, 6, 1, 4): (43, 6, 1, 4)
144
        }
145
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
146
        oid = value.asTuple()
147
        if oid[:5] in self.precomputedValues:
148
            octets = self.precomputedValues[oid[:5]]
149
            index = 5
150
        else:
151
            if len(oid) < 2:
152
                raise error.PyAsn1Error('Short OID %s' % value)
153
154
            # Build the first twos
155
            index = 0
156
            subid = oid[index] * 40
157
            subid = subid + oid[index+1]
158
            if subid < 0 or subid > 0xff:
159
                raise error.PyAsn1Error(
160
                    'Initial sub-ID overflow %s in OID %s' % (oid[index:], value)
161
                    )
162
            octets = (subid,)
163
            index = index + 2
164
165
        # Cycle through subids
166
        for subid in oid[index:]:
167
            if subid > -1 and subid < 128:
168
                # Optimize for the common case
169
                octets = octets + (subid & 0x7f,)
170
            elif subid < 0 or subid > 0xFFFFFFFF:
171
                raise error.PyAsn1Error(
172
                    'SubId overflow %s in %s' % (subid, value)
173
                    )
174
            else:
175
                # Pack large Sub-Object IDs
176
                res = (subid & 0x7f,)
177
                subid = subid >> 7
178
                while subid > 0:
179
                    res = (0x80 | (subid & 0x7f),) + res
180
                    subid = subid >> 7
181
                # Add packed Sub-Object ID to resulted Object ID
182
                octets += res
183
184
        return ints2octs(octets), 0
185
186
class RealEncoder(AbstractItemEncoder):
187
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
188
        if value.isPlusInfinity():
189
            return int2oct(0x40), 0
190
        if value.isMinusInfinity():
191
            return int2oct(0x41), 0
192
        m, b, e = value
193
        if not m:
194
            return null, 0
195
        if b == 10:
196
            return str2octs('\x03%dE%s%d' % (m, e == 0 and '+' or '', e)), 0
197
        elif b == 2:
198
            fo = 0x80                 # binary enoding
199
            if m < 0:
200
                fo = fo | 0x40  # sign bit
201
                m = -m
202
            while int(m) != m: # drop floating point
203
                m *= 2
204
                e -= 1
205
            while m & 0x1 == 0: # mantissa normalization
206
                m >>= 1
207
                e += 1
208
            eo = null
209
            while e:
210
                eo = int2oct(e&0xff) + eo
211
                e >>= 8
212
            n = len(eo)
213
            if n > 0xff:
214
                raise error.PyAsn1Error('Real exponent overflow')
215
            if n == 1:
216
                pass
217
            elif n == 2:
218
                fo |= 1
219
            elif n == 3:
220
                fo |= 2
221
            else:
222
                fo |= 3
223
                eo = int2oct(n//0xff+1) + eo
224
            po = null
225
            while m:
226
                po = int2oct(m&0xff) + po
227
                m >>= 8
228
            substrate = int2oct(fo) + eo + po
229
            return substrate, 0
230
        else:
231
            raise error.PyAsn1Error('Prohibited Real base %s' % b)
232
233
class SequenceEncoder(AbstractItemEncoder):
234
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
235
        value.setDefaultComponents()
236
        value.verifySizeSpec()
237
        substrate = null; idx = len(value)
238
        while idx > 0:
239
            idx = idx - 1
240
            if value[idx] is None:  # Optional component
241
                continue
242
            component = value.getDefaultComponentByPosition(idx)
243
            if component is not None and component == value[idx]:
244
                continue
245
            substrate = encodeFun(
246
                value[idx], defMode, maxChunkSize
247
                ) + substrate
248
        return substrate, 1
249
250
class SequenceOfEncoder(AbstractItemEncoder):
251
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
252
        value.verifySizeSpec()
253
        substrate = null; idx = len(value)
254
        while idx > 0:
255
            idx = idx - 1
256
            substrate = encodeFun(
257
                value[idx], defMode, maxChunkSize
258
                ) + substrate
259
        return substrate, 1
260
261
class ChoiceEncoder(AbstractItemEncoder):
262
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
263
        return encodeFun(value.getComponent(), defMode, maxChunkSize), 1
264
265
class AnyEncoder(OctetStringEncoder):
266
    def encodeValue(self, encodeFun, value, defMode, maxChunkSize):
267
        return value.asOctets(), defMode == 0
268
269
tagMap = {
270
    eoo.endOfOctets.tagSet: EndOfOctetsEncoder(),
271
    univ.Boolean.tagSet: IntegerEncoder(),
272
    univ.Integer.tagSet: IntegerEncoder(),
273
    univ.BitString.tagSet: BitStringEncoder(),
274
    univ.OctetString.tagSet: OctetStringEncoder(),
275
    univ.Null.tagSet: NullEncoder(),
276
    univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
277
    univ.Enumerated.tagSet: IntegerEncoder(),
278
    univ.Real.tagSet: RealEncoder(),
279
    # Sequence & Set have same tags as SequenceOf & SetOf
280
    univ.SequenceOf.tagSet: SequenceOfEncoder(),
281
    univ.SetOf.tagSet: SequenceOfEncoder(),
282
    univ.Choice.tagSet: ChoiceEncoder(),
283
    # character string types
284
    char.UTF8String.tagSet: OctetStringEncoder(),
285
    char.NumericString.tagSet: OctetStringEncoder(),
286
    char.PrintableString.tagSet: OctetStringEncoder(),
287
    char.TeletexString.tagSet: OctetStringEncoder(),
288
    char.VideotexString.tagSet: OctetStringEncoder(),
289
    char.IA5String.tagSet: OctetStringEncoder(),
290
    char.GraphicString.tagSet: OctetStringEncoder(),
291
    char.VisibleString.tagSet: OctetStringEncoder(),
292
    char.GeneralString.tagSet: OctetStringEncoder(),
293
    char.UniversalString.tagSet: OctetStringEncoder(),
294
    char.BMPString.tagSet: OctetStringEncoder(),
295
    # useful types
296
    useful.GeneralizedTime.tagSet: OctetStringEncoder(),
297
    useful.UTCTime.tagSet: OctetStringEncoder()
298
    }
299
300
# Type-to-codec map for ambiguous ASN.1 types
301
typeMap = {
302
    univ.Set.typeId: SequenceEncoder(),
303
    univ.SetOf.typeId: SequenceOfEncoder(),
304
    univ.Sequence.typeId: SequenceEncoder(),
305
    univ.SequenceOf.typeId: SequenceOfEncoder(),
306
    univ.Choice.typeId: ChoiceEncoder(),
307
    univ.Any.typeId: AnyEncoder()
308
    }
309
310
class Encoder:
311
    def __init__(self, tagMap, typeMap={}):
312
        self.__tagMap = tagMap
313
        self.__typeMap = typeMap
314
315
    def __call__(self, value, defMode=1, maxChunkSize=0):
316
        tagSet = value.getTagSet()
317
        if len(tagSet) > 1:
318
            concreteEncoder = explicitlyTaggedItemEncoder
319
        else:
320
            if value.typeId is not None and value.typeId in self.__typeMap:
321
                concreteEncoder = self.__typeMap[value.typeId]
322
            elif tagSet in self.__tagMap:
323
                concreteEncoder = self.__tagMap[tagSet]
324
            else:
325
                baseTagSet = value.baseTagSet
326
                if baseTagSet in self.__tagMap:
327
                    concreteEncoder = self.__tagMap[baseTagSet]
328
                else:
329
                    raise Error('No encoder for %s' % value)
330
        return concreteEncoder.encode(
331
            self, value, defMode, maxChunkSize
332
            )
333
334
encode = Encoder(tagMap, typeMap)
335