Total Complexity | 157 |
Total Lines | 744 |
Duplicated Lines | 4.03 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like pyasn1.codec.ber.decoder often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # BER decoder |
||
2 | from pyasn1.type import tag, base, univ, char, useful, tagmap |
||
3 | from pyasn1.codec.ber import eoo |
||
4 | from pyasn1.compat.octets import oct2int, octs2ints |
||
5 | from pyasn1 import error |
||
6 | |||
7 | class AbstractDecoder: |
||
8 | protoComponent = None |
||
9 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
10 | length, state, decodeFun): |
||
11 | raise error.PyAsn1Error('Decoder not implemented for %s' % tagSet) |
||
12 | |||
13 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
14 | length, state, decodeFun): |
||
15 | raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % tagSet) |
||
16 | |||
17 | class AbstractSimpleDecoder(AbstractDecoder): |
||
18 | def _createComponent(self, asn1Spec, tagSet, value=None): |
||
19 | if asn1Spec is None: |
||
20 | return self.protoComponent.clone(value, tagSet) |
||
21 | elif value is None: |
||
22 | return asn1Spec |
||
23 | else: |
||
24 | return asn1Spec.clone(value) |
||
25 | |||
26 | class AbstractConstructedDecoder(AbstractDecoder): |
||
27 | def _createComponent(self, asn1Spec, tagSet, value=None): |
||
28 | if asn1Spec is None: |
||
29 | return self.protoComponent.clone(tagSet) |
||
30 | else: |
||
31 | return asn1Spec.clone() |
||
32 | |||
33 | class EndOfOctetsDecoder(AbstractSimpleDecoder): |
||
34 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
35 | length, state, decodeFun): |
||
36 | return eoo.endOfOctets, substrate[:length] |
||
37 | |||
38 | class ExplicitTagDecoder(AbstractSimpleDecoder): |
||
39 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
40 | length, state, decodeFun): |
||
41 | return decodeFun(substrate[:length], asn1Spec, tagSet, length) |
||
42 | |||
43 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
44 | length, state, decodeFun): |
||
45 | value, substrate = decodeFun(substrate, asn1Spec, tagSet, length) |
||
46 | terminator, substrate = decodeFun(substrate) |
||
47 | if terminator == eoo.endOfOctets: |
||
48 | return value, substrate |
||
49 | else: |
||
50 | raise error.PyAsn1Error('Missing end-of-octets terminator') |
||
51 | |||
52 | explicitTagDecoder = ExplicitTagDecoder() |
||
53 | |||
54 | class IntegerDecoder(AbstractSimpleDecoder): |
||
55 | protoComponent = univ.Integer(0) |
||
56 | precomputedValues = { |
||
57 | '\x00': 0, |
||
58 | '\x01': 1, |
||
59 | '\x02': 2, |
||
60 | '\x03': 3, |
||
61 | '\x04': 4, |
||
62 | '\x05': 5, |
||
63 | '\x06': 6, |
||
64 | '\x07': 7, |
||
65 | '\x08': 8, |
||
66 | '\x09': 9, |
||
67 | '\xff': -1, |
||
68 | '\xfe': -2, |
||
69 | '\xfd': -3, |
||
70 | '\xfc': -4, |
||
71 | '\xfb': -5 |
||
72 | } |
||
73 | |||
74 | def _valueFilter(self, value): |
||
75 | try: |
||
76 | return int(value) |
||
77 | except OverflowError: |
||
78 | return value |
||
79 | |||
80 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
81 | state, decodeFun): |
||
82 | substrate = substrate[:length] |
||
83 | if not substrate: |
||
84 | raise error.PyAsn1Error('Empty substrate') |
||
85 | if substrate in self.precomputedValues: |
||
86 | value = self.precomputedValues[substrate] |
||
87 | else: |
||
88 | firstOctet = oct2int(substrate[0]) |
||
89 | if firstOctet & 0x80: |
||
90 | value = -1 |
||
91 | else: |
||
92 | value = 0 |
||
93 | for octet in substrate: |
||
94 | value = value << 8 | oct2int(octet) |
||
95 | value = self._valueFilter(value) |
||
96 | return self._createComponent(asn1Spec, tagSet, value), substrate |
||
97 | |||
98 | class BooleanDecoder(IntegerDecoder): |
||
99 | protoComponent = univ.Boolean(0) |
||
100 | def _valueFilter(self, value): |
||
101 | if value: |
||
102 | return 1 |
||
103 | else: |
||
104 | return 0 |
||
105 | |||
106 | class BitStringDecoder(AbstractSimpleDecoder): |
||
107 | protoComponent = univ.BitString(()) |
||
108 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
109 | state, decodeFun): |
||
110 | substrate = substrate[:length] |
||
111 | if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
||
112 | if not substrate: |
||
113 | raise error.PyAsn1Error('Missing initial octet') |
||
114 | trailingBits = oct2int(substrate[0]) |
||
115 | if trailingBits > 7: |
||
116 | raise error.PyAsn1Error( |
||
117 | 'Trailing bits overflow %s' % trailingBits |
||
118 | ) |
||
119 | substrate = substrate[1:] |
||
120 | lsb = p = 0; l = len(substrate)-1; b = () |
||
121 | while p <= l: |
||
122 | if p == l: |
||
123 | lsb = trailingBits |
||
124 | j = 7 |
||
125 | o = oct2int(substrate[p]) |
||
126 | while j >= lsb: |
||
127 | b = b + ((o>>j)&0x01,) |
||
128 | j = j - 1 |
||
129 | p = p + 1 |
||
130 | return self._createComponent(asn1Spec, tagSet, b), '' |
||
131 | r = self._createComponent(asn1Spec, tagSet, ()) |
||
132 | if not decodeFun: |
||
133 | return r, substrate |
||
134 | while substrate: |
||
135 | component, substrate = decodeFun(substrate) |
||
136 | r = r + component |
||
137 | return r, substrate |
||
138 | |||
139 | View Code Duplication | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
|
|
|||
140 | length, state, decodeFun): |
||
141 | r = self._createComponent(asn1Spec, tagSet, '') |
||
142 | if not decodeFun: |
||
143 | return r, substrate |
||
144 | while substrate: |
||
145 | component, substrate = decodeFun(substrate) |
||
146 | if component == eoo.endOfOctets: |
||
147 | break |
||
148 | r = r + component |
||
149 | else: |
||
150 | raise error.SubstrateUnderrunError( |
||
151 | 'No EOO seen before substrate ends' |
||
152 | ) |
||
153 | return r, substrate |
||
154 | |||
155 | class OctetStringDecoder(AbstractSimpleDecoder): |
||
156 | protoComponent = univ.OctetString('') |
||
157 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
158 | state, decodeFun): |
||
159 | substrate = substrate[:length] |
||
160 | if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
||
161 | return self._createComponent(asn1Spec, tagSet, substrate), '' |
||
162 | r = self._createComponent(asn1Spec, tagSet, '') |
||
163 | if not decodeFun: |
||
164 | return r, substrate |
||
165 | while substrate: |
||
166 | component, substrate = decodeFun(substrate) |
||
167 | r = r + component |
||
168 | return r, substrate |
||
169 | |||
170 | View Code Duplication | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
|
171 | length, state, decodeFun): |
||
172 | r = self._createComponent(asn1Spec, tagSet, '') |
||
173 | if not decodeFun: |
||
174 | return r, substrate |
||
175 | while substrate: |
||
176 | component, substrate = decodeFun(substrate) |
||
177 | if component == eoo.endOfOctets: |
||
178 | break |
||
179 | r = r + component |
||
180 | else: |
||
181 | raise error.SubstrateUnderrunError( |
||
182 | 'No EOO seen before substrate ends' |
||
183 | ) |
||
184 | return r, substrate |
||
185 | |||
186 | class NullDecoder(AbstractSimpleDecoder): |
||
187 | protoComponent = univ.Null('') |
||
188 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
189 | length, state, decodeFun): |
||
190 | substrate = substrate[:length] |
||
191 | r = self._createComponent(asn1Spec, tagSet) |
||
192 | if substrate: |
||
193 | raise error.PyAsn1Error('Unexpected substrate for Null') |
||
194 | return r, substrate |
||
195 | |||
196 | class ObjectIdentifierDecoder(AbstractSimpleDecoder): |
||
197 | protoComponent = univ.ObjectIdentifier(()) |
||
198 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
199 | state, decodeFun): |
||
200 | substrate = substrate[:length] |
||
201 | if not substrate: |
||
202 | raise error.PyAsn1Error('Empty substrate') |
||
203 | oid = (); index = 0 |
||
204 | # Get the first subid |
||
205 | subId = oct2int(substrate[index]) |
||
206 | oid = oid + divmod(subId, 40) |
||
207 | |||
208 | index = index + 1 |
||
209 | substrateLen = len(substrate) |
||
210 | |||
211 | while index < substrateLen: |
||
212 | subId = oct2int(substrate[index]) |
||
213 | if subId < 128: |
||
214 | oid = oid + (subId,) |
||
215 | index = index + 1 |
||
216 | else: |
||
217 | # Construct subid from a number of octets |
||
218 | nextSubId = subId |
||
219 | subId = 0 |
||
220 | while nextSubId >= 128 and index < substrateLen: |
||
221 | subId = (subId << 7) + (nextSubId & 0x7F) |
||
222 | index = index + 1 |
||
223 | nextSubId = oct2int(substrate[index]) |
||
224 | if index == substrateLen: |
||
225 | raise error.SubstrateUnderrunError( |
||
226 | 'Short substrate for OID %s' % oid |
||
227 | ) |
||
228 | subId = (subId << 7) + nextSubId |
||
229 | oid = oid + (subId,) |
||
230 | index = index + 1 |
||
231 | return self._createComponent(asn1Spec, tagSet, oid), substrate[index:] |
||
232 | |||
233 | class RealDecoder(AbstractSimpleDecoder): |
||
234 | protoComponent = univ.Real() |
||
235 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
236 | length, state, decodeFun): |
||
237 | substrate = substrate[:length] |
||
238 | if not length: |
||
239 | raise error.SubstrateUnderrunError('Short substrate for Real') |
||
240 | fo = oct2int(substrate[0]); substrate = substrate[1:] |
||
241 | if fo & 0x40: # infinite value |
||
242 | value = fo & 0x01 and '-inf' or 'inf' |
||
243 | elif fo & 0x80: # binary enoding |
||
244 | if fo & 0x11 == 0: |
||
245 | n = 1 |
||
246 | elif fo & 0x01: |
||
247 | n = 2 |
||
248 | elif fo & 0x02: |
||
249 | n = 3 |
||
250 | else: |
||
251 | n = oct2int(substrate[0]) |
||
252 | eo, substrate = substrate[:n], substrate[n:] |
||
253 | if not eo or not substrate: |
||
254 | raise error.PyAsn1Error('Real exponent screwed') |
||
255 | e = 0 |
||
256 | while eo: # exponent |
||
257 | e <<= 8 |
||
258 | e |= oct2int(eo[0]) |
||
259 | eo = eo[1:] |
||
260 | p = 0 |
||
261 | while substrate: # value |
||
262 | p <<= 8 |
||
263 | p |= oct2int(substrate[0]) |
||
264 | substrate = substrate[1:] |
||
265 | if fo & 0x40: # sign bit |
||
266 | p = -p |
||
267 | value = (p, 2, e) |
||
268 | elif fo & 0xc0 == 0: # character encoding |
||
269 | try: |
||
270 | if fo & 0x3 == 0x1: # NR1 |
||
271 | value = (int(substrate), 10, 0) |
||
272 | elif fo & 0x3 == 0x2: # NR2 |
||
273 | value = float(substrate) |
||
274 | elif fo & 0x3 == 0x3: # NR3 |
||
275 | value = float(substrate) |
||
276 | else: |
||
277 | raise error.SubstrateUnderrunError( |
||
278 | 'Unknown NR (tag %s)' % fo |
||
279 | ) |
||
280 | except ValueError: |
||
281 | raise error.SubstrateUnderrunError( |
||
282 | 'Bad character Real syntax' |
||
283 | ) |
||
284 | elif fo & 0xc0 == 0x40: # special real value |
||
285 | pass |
||
286 | else: |
||
287 | raise error.SubstrateUnderrunError( |
||
288 | 'Unknown encoding (tag %s)' % fo |
||
289 | ) |
||
290 | return self._createComponent(asn1Spec, tagSet, value), substrate |
||
291 | |||
292 | class SequenceDecoder(AbstractConstructedDecoder): |
||
293 | protoComponent = univ.Sequence() |
||
294 | def _getComponentTagMap(self, r, idx): |
||
295 | try: |
||
296 | return r.getComponentTagMapNearPosition(idx) |
||
297 | except error.PyAsn1Error: |
||
298 | return |
||
299 | |||
300 | def _getComponentPositionByType(self, r, t, idx): |
||
301 | return r.getComponentPositionNearType(t, idx) |
||
302 | |||
303 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
304 | length, state, decodeFun): |
||
305 | substrate = substrate[:length] |
||
306 | r = self._createComponent(asn1Spec, tagSet) |
||
307 | idx = 0 |
||
308 | if not decodeFun: |
||
309 | return r, substrate |
||
310 | while substrate: |
||
311 | asn1Spec = self._getComponentTagMap(r, idx) |
||
312 | component, substrate = decodeFun( |
||
313 | substrate, asn1Spec |
||
314 | ) |
||
315 | idx = self._getComponentPositionByType( |
||
316 | r, component.getEffectiveTagSet(), idx |
||
317 | ) |
||
318 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
319 | idx = idx + 1 |
||
320 | r.setDefaultComponents() |
||
321 | r.verifySizeSpec() |
||
322 | return r, substrate |
||
323 | |||
324 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
325 | length, state, decodeFun): |
||
326 | r = self._createComponent(asn1Spec, tagSet) |
||
327 | idx = 0 |
||
328 | while substrate: |
||
329 | asn1Spec = self._getComponentTagMap(r, idx) |
||
330 | if not decodeFun: |
||
331 | return r, substrate |
||
332 | component, substrate = decodeFun(substrate, asn1Spec) |
||
333 | if component == eoo.endOfOctets: |
||
334 | break |
||
335 | idx = self._getComponentPositionByType( |
||
336 | r, component.getEffectiveTagSet(), idx |
||
337 | ) |
||
338 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
339 | idx = idx + 1 |
||
340 | else: |
||
341 | raise error.SubstrateUnderrunError( |
||
342 | 'No EOO seen before substrate ends' |
||
343 | ) |
||
344 | r.setDefaultComponents() |
||
345 | r.verifySizeSpec() |
||
346 | return r, substrate |
||
347 | |||
348 | class SequenceOfDecoder(AbstractConstructedDecoder): |
||
349 | protoComponent = univ.SequenceOf() |
||
350 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
351 | length, state, decodeFun): |
||
352 | substrate = substrate[:length] |
||
353 | r = self._createComponent(asn1Spec, tagSet) |
||
354 | asn1Spec = r.getComponentType() |
||
355 | idx = 0 |
||
356 | if not decodeFun: |
||
357 | return r, substrate |
||
358 | while substrate: |
||
359 | component, substrate = decodeFun( |
||
360 | substrate, asn1Spec |
||
361 | ) |
||
362 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
363 | idx = idx + 1 |
||
364 | r.verifySizeSpec() |
||
365 | return r, substrate |
||
366 | |||
367 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
368 | length, state, decodeFun): |
||
369 | r = self._createComponent(asn1Spec, tagSet) |
||
370 | asn1Spec = r.getComponentType() |
||
371 | idx = 0 |
||
372 | if not decodeFun: |
||
373 | return r, substrate |
||
374 | while substrate: |
||
375 | component, substrate = decodeFun(substrate, asn1Spec) |
||
376 | if component == eoo.endOfOctets: |
||
377 | break |
||
378 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
379 | idx = idx + 1 |
||
380 | else: |
||
381 | raise error.SubstrateUnderrunError( |
||
382 | 'No EOO seen before substrate ends' |
||
383 | ) |
||
384 | r.verifySizeSpec() |
||
385 | return r, substrate |
||
386 | |||
387 | class SetDecoder(SequenceDecoder): |
||
388 | protoComponent = univ.Set() |
||
389 | def _getComponentTagMap(self, r, idx): |
||
390 | return r.getComponentTagMap() |
||
391 | |||
392 | def _getComponentPositionByType(self, r, t, idx): |
||
393 | nextIdx = r.getComponentPositionByType(t) |
||
394 | if nextIdx is None: |
||
395 | return idx |
||
396 | else: |
||
397 | return nextIdx |
||
398 | |||
399 | class SetOfDecoder(SequenceOfDecoder): |
||
400 | protoComponent = univ.SetOf() |
||
401 | |||
402 | class ChoiceDecoder(AbstractConstructedDecoder): |
||
403 | protoComponent = univ.Choice() |
||
404 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
405 | length, state, decodeFun): |
||
406 | substrate = substrate[:length] |
||
407 | r = self._createComponent(asn1Spec, tagSet) |
||
408 | if not decodeFun: |
||
409 | return r, substrate |
||
410 | if r.getTagSet() == tagSet: # explicitly tagged Choice |
||
411 | component, substrate = decodeFun( |
||
412 | substrate, r.getComponentTagMap() |
||
413 | ) |
||
414 | else: |
||
415 | component, substrate = decodeFun( |
||
416 | substrate, r.getComponentTagMap(), tagSet, length, state |
||
417 | ) |
||
418 | if isinstance(component, univ.Choice): |
||
419 | effectiveTagSet = component.getEffectiveTagSet() |
||
420 | else: |
||
421 | effectiveTagSet = component.getTagSet() |
||
422 | r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) |
||
423 | return r, substrate |
||
424 | |||
425 | indefLenValueDecoder = valueDecoder |
||
426 | |||
427 | class AnyDecoder(AbstractSimpleDecoder): |
||
428 | protoComponent = univ.Any() |
||
429 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
430 | length, state, decodeFun): |
||
431 | if asn1Spec is None or \ |
||
432 | asn1Spec is not None and tagSet != asn1Spec.getTagSet(): |
||
433 | # untagged Any container, recover inner header substrate |
||
434 | length = length + len(fullSubstrate) - len(substrate) |
||
435 | substrate = fullSubstrate |
||
436 | substrate = substrate[:length] |
||
437 | return self._createComponent(asn1Spec, tagSet, value=substrate), '' |
||
438 | |||
439 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
440 | length, state, decodeFun): |
||
441 | if asn1Spec is not None and tagSet == asn1Spec.getTagSet(): |
||
442 | # tagged Any type -- consume header substrate |
||
443 | header = '' |
||
444 | else: |
||
445 | # untagged Any, recover header substrate |
||
446 | header = fullSubstrate[:-len(substrate)] |
||
447 | |||
448 | r = self._createComponent(asn1Spec, tagSet, header) |
||
449 | |||
450 | # Any components do not inherit initial tag |
||
451 | asn1Spec = self.protoComponent |
||
452 | |||
453 | if not decodeFun: |
||
454 | return r, substrate |
||
455 | while substrate: |
||
456 | component, substrate = decodeFun(substrate, asn1Spec) |
||
457 | if component == eoo.endOfOctets: |
||
458 | break |
||
459 | r = r + component |
||
460 | else: |
||
461 | raise error.SubstrateUnderrunError( |
||
462 | 'No EOO seen before substrate ends' |
||
463 | ) |
||
464 | return r, substrate |
||
465 | |||
466 | # character string types |
||
467 | class UTF8StringDecoder(OctetStringDecoder): |
||
468 | protoComponent = char.UTF8String() |
||
469 | class NumericStringDecoder(OctetStringDecoder): |
||
470 | protoComponent = char.NumericString() |
||
471 | class PrintableStringDecoder(OctetStringDecoder): |
||
472 | protoComponent = char.PrintableString() |
||
473 | class TeletexStringDecoder(OctetStringDecoder): |
||
474 | protoComponent = char.TeletexString() |
||
475 | class VideotexStringDecoder(OctetStringDecoder): |
||
476 | protoComponent = char.VideotexString() |
||
477 | class IA5StringDecoder(OctetStringDecoder): |
||
478 | protoComponent = char.IA5String() |
||
479 | class GraphicStringDecoder(OctetStringDecoder): |
||
480 | protoComponent = char.GraphicString() |
||
481 | class VisibleStringDecoder(OctetStringDecoder): |
||
482 | protoComponent = char.VisibleString() |
||
483 | class GeneralStringDecoder(OctetStringDecoder): |
||
484 | protoComponent = char.GeneralString() |
||
485 | class UniversalStringDecoder(OctetStringDecoder): |
||
486 | protoComponent = char.UniversalString() |
||
487 | class BMPStringDecoder(OctetStringDecoder): |
||
488 | protoComponent = char.BMPString() |
||
489 | |||
490 | # "useful" types |
||
491 | class GeneralizedTimeDecoder(OctetStringDecoder): |
||
492 | protoComponent = useful.GeneralizedTime() |
||
493 | class UTCTimeDecoder(OctetStringDecoder): |
||
494 | protoComponent = useful.UTCTime() |
||
495 | |||
496 | tagMap = { |
||
497 | eoo.endOfOctets.tagSet: EndOfOctetsDecoder(), |
||
498 | univ.Integer.tagSet: IntegerDecoder(), |
||
499 | univ.Boolean.tagSet: BooleanDecoder(), |
||
500 | univ.BitString.tagSet: BitStringDecoder(), |
||
501 | univ.OctetString.tagSet: OctetStringDecoder(), |
||
502 | univ.Null.tagSet: NullDecoder(), |
||
503 | univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(), |
||
504 | univ.Enumerated.tagSet: IntegerDecoder(), |
||
505 | univ.Real.tagSet: RealDecoder(), |
||
506 | univ.Sequence.tagSet: SequenceDecoder(), # conflicts with SequenceOf |
||
507 | univ.Set.tagSet: SetDecoder(), # conflicts with SetOf |
||
508 | univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any |
||
509 | # character string types |
||
510 | char.UTF8String.tagSet: UTF8StringDecoder(), |
||
511 | char.NumericString.tagSet: NumericStringDecoder(), |
||
512 | char.PrintableString.tagSet: PrintableStringDecoder(), |
||
513 | char.TeletexString.tagSet: TeletexStringDecoder(), |
||
514 | char.VideotexString.tagSet: VideotexStringDecoder(), |
||
515 | char.IA5String.tagSet: IA5StringDecoder(), |
||
516 | char.GraphicString.tagSet: GraphicStringDecoder(), |
||
517 | char.VisibleString.tagSet: VisibleStringDecoder(), |
||
518 | char.GeneralString.tagSet: GeneralStringDecoder(), |
||
519 | char.UniversalString.tagSet: UniversalStringDecoder(), |
||
520 | char.BMPString.tagSet: BMPStringDecoder(), |
||
521 | # useful types |
||
522 | useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), |
||
523 | useful.UTCTime.tagSet: UTCTimeDecoder() |
||
524 | } |
||
525 | |||
526 | # Type-to-codec map for ambiguous ASN.1 types |
||
527 | typeMap = { |
||
528 | univ.Set.typeId: SetDecoder(), |
||
529 | univ.SetOf.typeId: SetOfDecoder(), |
||
530 | univ.Sequence.typeId: SequenceDecoder(), |
||
531 | univ.SequenceOf.typeId: SequenceOfDecoder(), |
||
532 | univ.Choice.typeId: ChoiceDecoder(), |
||
533 | univ.Any.typeId: AnyDecoder() |
||
534 | } |
||
535 | |||
536 | ( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec, |
||
537 | stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue, |
||
538 | stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)] |
||
539 | |||
540 | class Decoder: |
||
541 | defaultErrorState = stErrorCondition |
||
542 | # defaultErrorState = stDumpRawValue |
||
543 | defaultRawDecoder = AnyDecoder() |
||
544 | def __init__(self, tagMap, typeMap={}): |
||
545 | self.__tagMap = tagMap |
||
546 | self.__typeMap = typeMap |
||
547 | self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet() |
||
548 | # Tag & TagSet objects caches |
||
549 | self.__tagCache = {} |
||
550 | self.__tagSetCache = {} |
||
551 | |||
552 | def __call__(self, substrate, asn1Spec=None, tagSet=None, |
||
553 | length=None, state=stDecodeTag, recursiveFlag=1): |
||
554 | fullSubstrate = substrate |
||
555 | while state != stStop: |
||
556 | if state == stDecodeTag: |
||
557 | # Decode tag |
||
558 | if not substrate: |
||
559 | raise error.SubstrateUnderrunError( |
||
560 | 'Short octet stream on tag decoding' |
||
561 | ) |
||
562 | |||
563 | firstOctet = substrate[0] |
||
564 | substrate = substrate[1:] |
||
565 | if firstOctet in self.__tagCache: |
||
566 | lastTag = self.__tagCache[firstOctet] |
||
567 | else: |
||
568 | t = oct2int(firstOctet) |
||
569 | tagClass = t&0xC0 |
||
570 | tagFormat = t&0x20 |
||
571 | tagId = t&0x1F |
||
572 | if tagId == 0x1F: |
||
573 | tagId = 0 |
||
574 | while 1: |
||
575 | if not substrate: |
||
576 | raise error.SubstrateUnderrunError( |
||
577 | 'Short octet stream on long tag decoding' |
||
578 | ) |
||
579 | t = oct2int(substrate[0]) |
||
580 | tagId = tagId << 7 | (t&0x7F) |
||
581 | substrate = substrate[1:] |
||
582 | if not t&0x80: |
||
583 | break |
||
584 | lastTag = tag.Tag( |
||
585 | tagClass=tagClass, tagFormat=tagFormat, tagId=tagId |
||
586 | ) |
||
587 | if tagId < 31: |
||
588 | # cache short tags |
||
589 | self.__tagCache[firstOctet] = lastTag |
||
590 | if tagSet is None: |
||
591 | if firstOctet in self.__tagSetCache: |
||
592 | tagSet = self.__tagSetCache[firstOctet] |
||
593 | else: |
||
594 | # base tag not recovered |
||
595 | tagSet = tag.TagSet((), lastTag) |
||
596 | if firstOctet in self.__tagCache: |
||
597 | self.__tagSetCache[firstOctet] = tagSet |
||
598 | else: |
||
599 | tagSet = lastTag + tagSet |
||
600 | state = stDecodeLength |
||
601 | if state == stDecodeLength: |
||
602 | # Decode length |
||
603 | if not substrate: |
||
604 | raise error.SubstrateUnderrunError( |
||
605 | 'Short octet stream on length decoding' |
||
606 | ) |
||
607 | firstOctet = oct2int(substrate[0]) |
||
608 | if firstOctet == 128: |
||
609 | size = 1 |
||
610 | length = -1 |
||
611 | elif firstOctet < 128: |
||
612 | length, size = firstOctet, 1 |
||
613 | else: |
||
614 | size = firstOctet & 0x7F |
||
615 | # encoded in size bytes |
||
616 | length = 0 |
||
617 | lengthString = substrate[1:size+1] |
||
618 | # missing check on maximum size, which shouldn't be a |
||
619 | # problem, we can handle more than is possible |
||
620 | if len(lengthString) != size: |
||
621 | raise error.SubstrateUnderrunError( |
||
622 | '%s<%s at %s' % |
||
623 | (size, len(lengthString), tagSet) |
||
624 | ) |
||
625 | for char in lengthString: |
||
626 | length = (length << 8) | oct2int(char) |
||
627 | size = size + 1 |
||
628 | state = stGetValueDecoder |
||
629 | substrate = substrate[size:] |
||
630 | if length != -1 and len(substrate) < length: |
||
631 | raise error.SubstrateUnderrunError( |
||
632 | '%d-octet short' % (length - len(substrate)) |
||
633 | ) |
||
634 | if state == stGetValueDecoder: |
||
635 | if asn1Spec is None: |
||
636 | state = stGetValueDecoderByTag |
||
637 | else: |
||
638 | state = stGetValueDecoderByAsn1Spec |
||
639 | # |
||
640 | # There're two ways of creating subtypes in ASN.1 what influences |
||
641 | # decoder operation. These methods are: |
||
642 | # 1) Either base types used in or no IMPLICIT tagging has been |
||
643 | # applied on subtyping. |
||
644 | # 2) Subtype syntax drops base type information (by means of |
||
645 | # IMPLICIT tagging. |
||
646 | # The first case allows for complete tag recovery from substrate |
||
647 | # while the second one requires original ASN.1 type spec for |
||
648 | # decoding. |
||
649 | # |
||
650 | # In either case a set of tags (tagSet) is coming from substrate |
||
651 | # in an incremental, tag-by-tag fashion (this is the case of |
||
652 | # EXPLICIT tag which is most basic). Outermost tag comes first |
||
653 | # from the wire. |
||
654 | # |
||
655 | if state == stGetValueDecoderByTag: |
||
656 | if tagSet in self.__tagMap: |
||
657 | concreteDecoder = self.__tagMap[tagSet] |
||
658 | else: |
||
659 | concreteDecoder = None |
||
660 | if concreteDecoder: |
||
661 | state = stDecodeValue |
||
662 | else: |
||
663 | _k = tagSet[:1] |
||
664 | if _k in self.__tagMap: |
||
665 | concreteDecoder = self.__tagMap[_k] |
||
666 | else: |
||
667 | concreteDecoder = None |
||
668 | if concreteDecoder: |
||
669 | state = stDecodeValue |
||
670 | else: |
||
671 | state = stTryAsExplicitTag |
||
672 | if state == stGetValueDecoderByAsn1Spec: |
||
673 | if isinstance(asn1Spec, (dict, tagmap.TagMap)): |
||
674 | if tagSet in asn1Spec: |
||
675 | __chosenSpec = asn1Spec[tagSet] |
||
676 | else: |
||
677 | __chosenSpec = None |
||
678 | else: |
||
679 | __chosenSpec = asn1Spec |
||
680 | if __chosenSpec is not None and ( |
||
681 | tagSet == __chosenSpec.getTagSet() or \ |
||
682 | tagSet in __chosenSpec.getTagMap() |
||
683 | ): |
||
684 | # use base type for codec lookup to recover untagged types |
||
685 | baseTagSet = __chosenSpec.baseTagSet |
||
686 | if __chosenSpec.typeId is not None and \ |
||
687 | __chosenSpec.typeId in self.__typeMap: |
||
688 | # ambiguous type |
||
689 | concreteDecoder = self.__typeMap[__chosenSpec.typeId] |
||
690 | elif baseTagSet in self.__tagMap: |
||
691 | # base type or tagged subtype |
||
692 | concreteDecoder = self.__tagMap[baseTagSet] |
||
693 | else: |
||
694 | concreteDecoder = None |
||
695 | if concreteDecoder: |
||
696 | asn1Spec = __chosenSpec |
||
697 | state = stDecodeValue |
||
698 | else: |
||
699 | state = stTryAsExplicitTag |
||
700 | elif tagSet == self.__endOfOctetsTagSet: |
||
701 | concreteDecoder = self.__tagMap[tagSet] |
||
702 | state = stDecodeValue |
||
703 | else: |
||
704 | state = stTryAsExplicitTag |
||
705 | if state == stTryAsExplicitTag: |
||
706 | if tagSet and \ |
||
707 | tagSet[0][1] == tag.tagFormatConstructed and \ |
||
708 | tagSet[0][0] != tag.tagClassUniversal: |
||
709 | # Assume explicit tagging |
||
710 | concreteDecoder = explicitTagDecoder |
||
711 | state = stDecodeValue |
||
712 | else: |
||
713 | state = self.defaultErrorState |
||
714 | if state == stDumpRawValue: |
||
715 | concreteDecoder = self.defaultRawDecoder |
||
716 | state = stDecodeValue |
||
717 | if state == stDecodeValue: |
||
718 | if recursiveFlag: |
||
719 | decodeFun = self |
||
720 | else: |
||
721 | decodeFun = None |
||
722 | if length == -1: # indef length |
||
723 | value, substrate = concreteDecoder.indefLenValueDecoder( |
||
724 | fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
725 | stGetValueDecoder, decodeFun |
||
726 | ) |
||
727 | else: |
||
728 | value, _substrate = concreteDecoder.valueDecoder( |
||
729 | fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
730 | stGetValueDecoder, decodeFun |
||
731 | ) |
||
732 | if recursiveFlag: |
||
733 | substrate = substrate[length:] |
||
734 | else: |
||
735 | substrate = _substrate |
||
736 | state = stStop |
||
737 | if state == stErrorCondition: |
||
738 | raise error.PyAsn1Error( |
||
739 | '%s not in asn1Spec: %s' % (tagSet, repr(asn1Spec)) |
||
740 | ) |
||
741 | return value, substrate |
||
742 | |||
743 | decode = Decoder(tagMap, typeMap) |
||
744 | |||
747 |