SequenceDecoder.valueDecoder()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 20
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 18
nop 8
dl 0
loc 20
rs 9.5
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# BER decoder
2
from pyasn1.type import tag, base, univ, char, useful, tagmap
3
from pyasn1.codec.ber import eoo
4
from pyasn1.compat.octets import oct2int, octs2ints
5
from pyasn1 import error
6
7
class AbstractDecoder:
8
    protoComponent = None
9
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
10
                     length, state, decodeFun):
11
        raise error.PyAsn1Error('Decoder not implemented for %s' % tagSet)
12
13
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
14
                     length, state, decodeFun):
15
        raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % tagSet)
16
17
class AbstractSimpleDecoder(AbstractDecoder):
18
    def _createComponent(self, asn1Spec, tagSet, value=None):
19
        if asn1Spec is None:
20
            return self.protoComponent.clone(value, tagSet)
21
        elif value is None:
22
            return asn1Spec
23
        else:
24
            return asn1Spec.clone(value)
25
26
class AbstractConstructedDecoder(AbstractDecoder):
27
    def _createComponent(self, asn1Spec, tagSet, value=None):
28
        if asn1Spec is None:
29
            return self.protoComponent.clone(tagSet)
30
        else:
31
            return asn1Spec.clone()
32
33
class EndOfOctetsDecoder(AbstractSimpleDecoder):
34
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
35
                     length, state, decodeFun):
36
        return eoo.endOfOctets, substrate[:length]
37
38
class ExplicitTagDecoder(AbstractSimpleDecoder):
39
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
40
                     length, state, decodeFun):
41
        return decodeFun(substrate[:length], asn1Spec, tagSet, length)
42
43
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
44
                             length, state, decodeFun):
45
        value, substrate = decodeFun(substrate, asn1Spec, tagSet, length)
46
        terminator, substrate = decodeFun(substrate)
47
        if terminator == eoo.endOfOctets:
48
            return value, substrate
49
        else:
50
            raise error.PyAsn1Error('Missing end-of-octets terminator')
51
52
explicitTagDecoder = ExplicitTagDecoder()
53
54
class IntegerDecoder(AbstractSimpleDecoder):
55
    protoComponent = univ.Integer(0)
56
    precomputedValues = {
57
        '\x00':  0,
58
        '\x01':  1,
59
        '\x02':  2,
60
        '\x03':  3,
61
        '\x04':  4,
62
        '\x05':  5,
63
        '\x06':  6,
64
        '\x07':  7,
65
        '\x08':  8,
66
        '\x09':  9,
67
        '\xff': -1,
68
        '\xfe': -2,
69
        '\xfd': -3,
70
        '\xfc': -4,
71
        '\xfb': -5
72
        }
73
74
    def _valueFilter(self, value):
75
        try:
76
            return int(value)
77
        except OverflowError:
78
            return value
79
80
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
81
                     state, decodeFun):
82
        substrate = substrate[:length]
83
        if not substrate:
84
            raise error.PyAsn1Error('Empty substrate')
85
        if substrate in self.precomputedValues:
86
            value = self.precomputedValues[substrate]
87
        else:
88
            firstOctet = oct2int(substrate[0])
89
            if firstOctet & 0x80:
90
                value = -1
91
            else:
92
                value = 0
93
            for octet in substrate:
94
                value = value << 8 | oct2int(octet)
95
            value = self._valueFilter(value)
96
        return self._createComponent(asn1Spec, tagSet, value), substrate
97
98
class BooleanDecoder(IntegerDecoder):
99
    protoComponent = univ.Boolean(0)
100
    def _valueFilter(self, value):
101
        if value:
102
            return 1
103
        else:
104
            return 0
105
106
class BitStringDecoder(AbstractSimpleDecoder):
107
    protoComponent = univ.BitString(())
108
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
109
                     state, decodeFun):
110
        substrate = substrate[:length]
111
        if tagSet[0][1] == tag.tagFormatSimple:    # XXX what tag to check?
112
            if not substrate:
113
                raise error.PyAsn1Error('Missing initial octet')
114
            trailingBits = oct2int(substrate[0])
115
            if trailingBits > 7:
116
                raise error.PyAsn1Error(
117
                    'Trailing bits overflow %s' % trailingBits
118
                    )
119
            substrate = substrate[1:]
120
            lsb = p = 0; l = len(substrate)-1; b = ()
121
            while p <= l:
122
                if p == l:
123
                    lsb = trailingBits
124
                j = 7
125
                o = oct2int(substrate[p])
126
                while j >= lsb:
127
                    b = b + ((o>>j)&0x01,)
128
                    j = j - 1
129
                p = p + 1
130
            return self._createComponent(asn1Spec, tagSet, b), ''
131
        r = self._createComponent(asn1Spec, tagSet, ())
132
        if not decodeFun:
133
            return r, substrate
134
        while substrate:
135
            component, substrate = decodeFun(substrate)
136
            r = r + component
137
        return r, substrate
138
139 View Code Duplication
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
140
                             length, state, decodeFun):
141
        r = self._createComponent(asn1Spec, tagSet, '')
142
        if not decodeFun:
143
            return r, substrate
144
        while substrate:
145
            component, substrate = decodeFun(substrate)
146
            if component == eoo.endOfOctets:
147
                break
148
            r = r + component
149
        else:
150
            raise error.SubstrateUnderrunError(
151
                'No EOO seen before substrate ends'
152
                )
153
        return r, substrate
154
155
class OctetStringDecoder(AbstractSimpleDecoder):
156
    protoComponent = univ.OctetString('')
157
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
158
                     state, decodeFun):
159
        substrate = substrate[:length]
160
        if tagSet[0][1] == tag.tagFormatSimple:    # XXX what tag to check?
161
            return self._createComponent(asn1Spec, tagSet, substrate), ''
162
        r = self._createComponent(asn1Spec, tagSet, '')
163
        if not decodeFun:
164
            return r, substrate
165
        while substrate:
166
            component, substrate = decodeFun(substrate)
167
            r = r + component
168
        return r, substrate
169
170 View Code Duplication
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
171
                             length, state, decodeFun):
172
        r = self._createComponent(asn1Spec, tagSet, '')
173
        if not decodeFun:
174
            return r, substrate
175
        while substrate:
176
            component, substrate = decodeFun(substrate)
177
            if component == eoo.endOfOctets:
178
                break
179
            r = r + component
180
        else:
181
            raise error.SubstrateUnderrunError(
182
                'No EOO seen before substrate ends'
183
                )
184
        return r, substrate
185
186
class NullDecoder(AbstractSimpleDecoder):
187
    protoComponent = univ.Null('')
188
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
189
                     length, state, decodeFun):
190
        substrate = substrate[:length]
191
        r = self._createComponent(asn1Spec, tagSet)
192
        if substrate:
193
            raise error.PyAsn1Error('Unexpected substrate for Null')
194
        return r, substrate
195
196
class ObjectIdentifierDecoder(AbstractSimpleDecoder):
197
    protoComponent = univ.ObjectIdentifier(())
198
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
199
                     state, decodeFun):
200
        substrate = substrate[:length]
201
        if not substrate:
202
            raise error.PyAsn1Error('Empty substrate')
203
        oid = (); index = 0
204
        # Get the first subid
205
        subId = oct2int(substrate[index])
206
        oid = oid + divmod(subId, 40)
207
208
        index = index + 1
209
        substrateLen = len(substrate)
210
211
        while index < substrateLen:
212
            subId = oct2int(substrate[index])
213
            if subId < 128:
214
                oid = oid + (subId,)
215
                index = index + 1
216
            else:
217
                # Construct subid from a number of octets
218
                nextSubId = subId
219
                subId = 0
220
                while nextSubId >= 128 and index < substrateLen:
221
                    subId = (subId << 7) + (nextSubId & 0x7F)
222
                    index = index + 1
223
                    nextSubId = oct2int(substrate[index])
224
                if index == substrateLen:
225
                    raise error.SubstrateUnderrunError(
226
                        'Short substrate for OID %s' % oid
227
                        )
228
                subId = (subId << 7) + nextSubId
229
                oid = oid + (subId,)
230
                index = index + 1
231
        return self._createComponent(asn1Spec, tagSet, oid), substrate[index:]
232
233
class RealDecoder(AbstractSimpleDecoder):
234
    protoComponent = univ.Real()
235
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
236
                     length, state, decodeFun):
237
        substrate = substrate[:length]
238
        if not length:
239
            raise error.SubstrateUnderrunError('Short substrate for Real')
240
        fo = oct2int(substrate[0]); substrate = substrate[1:]
241
        if fo & 0x40:  # infinite value
242
            value = fo & 0x01 and '-inf' or 'inf'
243
        elif fo & 0x80:  # binary enoding
244
            if fo & 0x11 == 0:
245
                n = 1
246
            elif fo & 0x01:
247
                n = 2
248
            elif fo & 0x02:
249
                n = 3
250
            else:
251
                n = oct2int(substrate[0])
252
            eo, substrate = substrate[:n], substrate[n:]
253
            if not eo or not substrate:
254
                raise error.PyAsn1Error('Real exponent screwed')
255
            e = 0
256
            while eo:         # exponent
257
                e <<= 8
258
                e |= oct2int(eo[0])
259
                eo = eo[1:]
260
            p = 0
261
            while substrate:  # value
262
                p <<= 8
263
                p |= oct2int(substrate[0])
264
                substrate = substrate[1:]
265
            if fo & 0x40:    # sign bit
266
                p = -p
267
            value = (p, 2, e)
268
        elif fo & 0xc0 == 0:  # character encoding
269
            try:
270
                if fo & 0x3 == 0x1:  # NR1
271
                    value = (int(substrate), 10, 0)
272
                elif fo & 0x3 == 0x2:  # NR2
273
                    value = float(substrate)
274
                elif fo & 0x3 == 0x3:  # NR3
275
                    value = float(substrate)
276
                else:
277
                    raise error.SubstrateUnderrunError(
278
                        'Unknown NR (tag %s)' % fo
279
                        )
280
            except ValueError:
281
                raise error.SubstrateUnderrunError(
282
                    'Bad character Real syntax'
283
                    )
284
        elif fo & 0xc0 == 0x40:  # special real value
285
            pass
286
        else:
287
            raise error.SubstrateUnderrunError(
288
                'Unknown encoding (tag %s)' % fo
289
                )
290
        return self._createComponent(asn1Spec, tagSet, value), substrate
0 ignored issues
show
introduced by
The variable value does not seem to be defined for all execution paths.
Loading history...
291
292
class SequenceDecoder(AbstractConstructedDecoder):
293
    protoComponent = univ.Sequence()
294
    def _getComponentTagMap(self, r, idx):
295
        try:
296
            return r.getComponentTagMapNearPosition(idx)
297
        except error.PyAsn1Error:
298
            return
299
300
    def _getComponentPositionByType(self, r, t, idx):
301
        return r.getComponentPositionNearType(t, idx)
302
303
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
304
                     length, state, decodeFun):
305
        substrate = substrate[:length]
306
        r = self._createComponent(asn1Spec, tagSet)
307
        idx = 0
308
        if not decodeFun:
309
            return r, substrate
310
        while substrate:
311
            asn1Spec = self._getComponentTagMap(r, idx)
312
            component, substrate = decodeFun(
313
                substrate, asn1Spec
314
                )
315
            idx = self._getComponentPositionByType(
316
                r, component.getEffectiveTagSet(), idx
317
                )
318
            r.setComponentByPosition(idx, component, asn1Spec is None)
319
            idx = idx + 1
320
        r.setDefaultComponents()
321
        r.verifySizeSpec()
322
        return r, substrate
323
324
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
325
                             length, state, decodeFun):
326
        r = self._createComponent(asn1Spec, tagSet)
327
        idx = 0
328
        while substrate:
329
            asn1Spec = self._getComponentTagMap(r, idx)
330
            if not decodeFun:
331
                return r, substrate
332
            component, substrate = decodeFun(substrate, asn1Spec)
333
            if component == eoo.endOfOctets:
334
                break
335
            idx = self._getComponentPositionByType(
336
                r, component.getEffectiveTagSet(), idx
337
                )
338
            r.setComponentByPosition(idx, component, asn1Spec is None)
339
            idx = idx + 1
340
        else:
341
            raise error.SubstrateUnderrunError(
342
                'No EOO seen before substrate ends'
343
                )
344
        r.setDefaultComponents()
345
        r.verifySizeSpec()
346
        return r, substrate
347
348
class SequenceOfDecoder(AbstractConstructedDecoder):
349
    protoComponent = univ.SequenceOf()
350
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
351
                     length, state, decodeFun):
352
        substrate = substrate[:length]
353
        r = self._createComponent(asn1Spec, tagSet)
354
        asn1Spec = r.getComponentType()
355
        idx = 0
356
        if not decodeFun:
357
            return r, substrate
358
        while substrate:
359
            component, substrate = decodeFun(
360
                substrate, asn1Spec
361
                )
362
            r.setComponentByPosition(idx, component, asn1Spec is None)
363
            idx = idx + 1
364
        r.verifySizeSpec()
365
        return r, substrate
366
367
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
368
                             length, state, decodeFun):
369
        r = self._createComponent(asn1Spec, tagSet)
370
        asn1Spec = r.getComponentType()
371
        idx = 0
372
        if not decodeFun:
373
            return r, substrate
374
        while substrate:
375
            component, substrate = decodeFun(substrate, asn1Spec)
376
            if component == eoo.endOfOctets:
377
                break
378
            r.setComponentByPosition(idx, component, asn1Spec is None)
379
            idx = idx + 1
380
        else:
381
            raise error.SubstrateUnderrunError(
382
                'No EOO seen before substrate ends'
383
                )
384
        r.verifySizeSpec()
385
        return r, substrate
386
387
class SetDecoder(SequenceDecoder):
388
    protoComponent = univ.Set()
389
    def _getComponentTagMap(self, r, idx):
390
        return r.getComponentTagMap()
391
392
    def _getComponentPositionByType(self, r, t, idx):
393
        nextIdx = r.getComponentPositionByType(t)
394
        if nextIdx is None:
395
            return idx
396
        else:
397
            return nextIdx
398
399
class SetOfDecoder(SequenceOfDecoder):
400
    protoComponent = univ.SetOf()
401
402
class ChoiceDecoder(AbstractConstructedDecoder):
403
    protoComponent = univ.Choice()
404
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
405
                     length, state, decodeFun):
406
        substrate = substrate[:length]
407
        r = self._createComponent(asn1Spec, tagSet)
408
        if not decodeFun:
409
            return r, substrate
410
        if r.getTagSet() == tagSet: # explicitly tagged Choice
411
            component, substrate = decodeFun(
412
                substrate, r.getComponentTagMap()
413
                )
414
        else:
415
            component, substrate = decodeFun(
416
                substrate, r.getComponentTagMap(), tagSet, length, state
417
                )
418
        if isinstance(component, univ.Choice):
419
            effectiveTagSet = component.getEffectiveTagSet()
420
        else:
421
            effectiveTagSet = component.getTagSet()
422
        r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None)
423
        return r, substrate
424
425
    indefLenValueDecoder = valueDecoder
426
427
class AnyDecoder(AbstractSimpleDecoder):
428
    protoComponent = univ.Any()
429
    def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
430
                     length, state, decodeFun):
431
        if asn1Spec is None or \
432
               asn1Spec is not None and tagSet != asn1Spec.getTagSet():
433
            # untagged Any container, recover inner header substrate
434
            length = length + len(fullSubstrate) - len(substrate)
435
            substrate = fullSubstrate
436
        substrate = substrate[:length]
437
        return self._createComponent(asn1Spec, tagSet, value=substrate), ''
438
439
    def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
440
                             length, state, decodeFun):
441
        if asn1Spec is not None and tagSet == asn1Spec.getTagSet():
442
            # tagged Any type -- consume header substrate
443
            header = ''
444
        else:
445
            # untagged Any, recover header substrate
446
            header = fullSubstrate[:-len(substrate)]
447
448
        r = self._createComponent(asn1Spec, tagSet, header)
449
450
        # Any components do not inherit initial tag
451
        asn1Spec = self.protoComponent
452
453
        if not decodeFun:
454
            return r, substrate
455
        while substrate:
456
            component, substrate = decodeFun(substrate, asn1Spec)
457
            if component == eoo.endOfOctets:
458
                break
459
            r = r + component
460
        else:
461
            raise error.SubstrateUnderrunError(
462
                'No EOO seen before substrate ends'
463
                )
464
        return r, substrate
465
466
# character string types
467
class UTF8StringDecoder(OctetStringDecoder):
468
    protoComponent = char.UTF8String()
469
class NumericStringDecoder(OctetStringDecoder):
470
    protoComponent = char.NumericString()
471
class PrintableStringDecoder(OctetStringDecoder):
472
    protoComponent = char.PrintableString()
473
class TeletexStringDecoder(OctetStringDecoder):
474
    protoComponent = char.TeletexString()
475
class VideotexStringDecoder(OctetStringDecoder):
476
    protoComponent = char.VideotexString()
477
class IA5StringDecoder(OctetStringDecoder):
478
    protoComponent = char.IA5String()
479
class GraphicStringDecoder(OctetStringDecoder):
480
    protoComponent = char.GraphicString()
481
class VisibleStringDecoder(OctetStringDecoder):
482
    protoComponent = char.VisibleString()
483
class GeneralStringDecoder(OctetStringDecoder):
484
    protoComponent = char.GeneralString()
485
class UniversalStringDecoder(OctetStringDecoder):
486
    protoComponent = char.UniversalString()
487
class BMPStringDecoder(OctetStringDecoder):
488
    protoComponent = char.BMPString()
489
490
# "useful" types
491
class GeneralizedTimeDecoder(OctetStringDecoder):
492
    protoComponent = useful.GeneralizedTime()
493
class UTCTimeDecoder(OctetStringDecoder):
494
    protoComponent = useful.UTCTime()
495
496
tagMap = {
497
    eoo.endOfOctets.tagSet: EndOfOctetsDecoder(),
498
    univ.Integer.tagSet: IntegerDecoder(),
499
    univ.Boolean.tagSet: BooleanDecoder(),
500
    univ.BitString.tagSet: BitStringDecoder(),
501
    univ.OctetString.tagSet: OctetStringDecoder(),
502
    univ.Null.tagSet: NullDecoder(),
503
    univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(),
504
    univ.Enumerated.tagSet: IntegerDecoder(),
505
    univ.Real.tagSet: RealDecoder(),
506
    univ.Sequence.tagSet: SequenceDecoder(),  # conflicts with SequenceOf
507
    univ.Set.tagSet: SetDecoder(),            # conflicts with SetOf
508
    univ.Choice.tagSet: ChoiceDecoder(),      # conflicts with Any
509
    # character string types
510
    char.UTF8String.tagSet: UTF8StringDecoder(),
511
    char.NumericString.tagSet: NumericStringDecoder(),
512
    char.PrintableString.tagSet: PrintableStringDecoder(),
513
    char.TeletexString.tagSet: TeletexStringDecoder(),
514
    char.VideotexString.tagSet: VideotexStringDecoder(),
515
    char.IA5String.tagSet: IA5StringDecoder(),
516
    char.GraphicString.tagSet: GraphicStringDecoder(),
517
    char.VisibleString.tagSet: VisibleStringDecoder(),
518
    char.GeneralString.tagSet: GeneralStringDecoder(),
519
    char.UniversalString.tagSet: UniversalStringDecoder(),
520
    char.BMPString.tagSet: BMPStringDecoder(),
521
    # useful types
522
    useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(),
523
    useful.UTCTime.tagSet: UTCTimeDecoder()
524
    }
525
526
# Type-to-codec map for ambiguous ASN.1 types
527
typeMap = {
528
    univ.Set.typeId: SetDecoder(),
529
    univ.SetOf.typeId: SetOfDecoder(),
530
    univ.Sequence.typeId: SequenceDecoder(),
531
    univ.SequenceOf.typeId: SequenceOfDecoder(),
532
    univ.Choice.typeId: ChoiceDecoder(),
533
    univ.Any.typeId: AnyDecoder()
534
    }
535
536
( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec,
537
  stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue,
538
  stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)]
539
540
class Decoder:
541
    defaultErrorState = stErrorCondition
542
#    defaultErrorState = stDumpRawValue
543
    defaultRawDecoder = AnyDecoder()
544
    def __init__(self, tagMap, typeMap={}):
545
        self.__tagMap = tagMap
546
        self.__typeMap = typeMap
547
        self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet()
548
        # Tag & TagSet objects caches
549
        self.__tagCache = {}
550
        self.__tagSetCache = {}
551
552
    def __call__(self, substrate, asn1Spec=None, tagSet=None,
553
                 length=None, state=stDecodeTag, recursiveFlag=1):
554
        fullSubstrate = substrate
555
        while state != stStop:
556
            if state == stDecodeTag:
557
                # Decode tag
558
                if not substrate:
559
                    raise error.SubstrateUnderrunError(
560
                        'Short octet stream on tag decoding'
561
                        )
562
563
                firstOctet = substrate[0]
564
                substrate = substrate[1:]
565
                if firstOctet in self.__tagCache:
566
                    lastTag = self.__tagCache[firstOctet]
567
                else:
568
                    t = oct2int(firstOctet)
569
                    tagClass = t&0xC0
570
                    tagFormat = t&0x20
571
                    tagId = t&0x1F
572
                    if tagId == 0x1F:
573
                        tagId = 0
574
                        while 1:
575
                            if not substrate:
576
                                raise error.SubstrateUnderrunError(
577
                                    'Short octet stream on long tag decoding'
578
                                    )
579
                            t = oct2int(substrate[0])
580
                            tagId = tagId << 7 | (t&0x7F)
581
                            substrate = substrate[1:]
582
                            if not t&0x80:
583
                                break
584
                    lastTag = tag.Tag(
585
                        tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
586
                        )
587
                    if tagId < 31:
588
                        # cache short tags
589
                        self.__tagCache[firstOctet] = lastTag
590
                if tagSet is None:
591
                    if firstOctet in self.__tagSetCache:
592
                        tagSet = self.__tagSetCache[firstOctet]
593
                    else:
594
                        # base tag not recovered
595
                        tagSet = tag.TagSet((), lastTag)
596
                        if firstOctet in self.__tagCache:
597
                            self.__tagSetCache[firstOctet] = tagSet
598
                else:
599
                    tagSet = lastTag + tagSet
600
                state = stDecodeLength
601
            if state == stDecodeLength:
602
                # Decode length
603
                if not substrate:
604
                     raise error.SubstrateUnderrunError(
605
                         'Short octet stream on length decoding'
606
                         )
607
                firstOctet  = oct2int(substrate[0])
608
                if firstOctet == 128:
609
                    size = 1
610
                    length = -1
611
                elif firstOctet < 128:
612
                    length, size = firstOctet, 1
613
                else:
614
                    size = firstOctet & 0x7F
615
                    # encoded in size bytes
616
                    length = 0
617
                    lengthString = substrate[1:size+1]
618
                    # missing check on maximum size, which shouldn't be a
619
                    # problem, we can handle more than is possible
620
                    if len(lengthString) != size:
621
                        raise error.SubstrateUnderrunError(
622
                            '%s<%s at %s' %
623
                            (size, len(lengthString), tagSet)
624
                            )
625
                    for char in lengthString:
626
                        length = (length << 8) | oct2int(char)
627
                    size = size + 1
628
                state = stGetValueDecoder
629
                substrate = substrate[size:]
630
                if length != -1 and len(substrate) < length:
631
                    raise error.SubstrateUnderrunError(
632
                        '%d-octet short' % (length - len(substrate))
633
                        )
634
            if state == stGetValueDecoder:
635
                if asn1Spec is None:
636
                    state = stGetValueDecoderByTag
637
                else:
638
                    state = stGetValueDecoderByAsn1Spec
639
            #
640
            # There're two ways of creating subtypes in ASN.1 what influences
641
            # decoder operation. These methods are:
642
            # 1) Either base types used in or no IMPLICIT tagging has been
643
            #    applied on subtyping.
644
            # 2) Subtype syntax drops base type information (by means of
645
            #    IMPLICIT tagging.
646
            # The first case allows for complete tag recovery from substrate
647
            # while the second one requires original ASN.1 type spec for
648
            # decoding.
649
            #
650
            # In either case a set of tags (tagSet) is coming from substrate
651
            # in an incremental, tag-by-tag fashion (this is the case of
652
            # EXPLICIT tag which is most basic). Outermost tag comes first
653
            # from the wire.
654
            #
655
            if state == stGetValueDecoderByTag:
656
                if tagSet in self.__tagMap:
657
                    concreteDecoder = self.__tagMap[tagSet]
658
                else:
659
                    concreteDecoder = None
660
                if concreteDecoder:
661
                    state = stDecodeValue
662
                else:
663
                    _k = tagSet[:1]
664
                    if _k in self.__tagMap:
665
                        concreteDecoder = self.__tagMap[_k]
666
                    else:
667
                        concreteDecoder = None
668
                    if concreteDecoder:
669
                        state = stDecodeValue
670
                    else:
671
                        state = stTryAsExplicitTag
672
            if state == stGetValueDecoderByAsn1Spec:
673
                if isinstance(asn1Spec, (dict, tagmap.TagMap)):
674
                    if tagSet in asn1Spec:
675
                        __chosenSpec = asn1Spec[tagSet]
676
                    else:
677
                        __chosenSpec = None
678
                else:
679
                    __chosenSpec = asn1Spec
680
                if __chosenSpec is not None and (
681
                       tagSet == __chosenSpec.getTagSet() or \
682
                       tagSet in __chosenSpec.getTagMap()
683
                       ):
684
                    # use base type for codec lookup to recover untagged types
685
                    baseTagSet = __chosenSpec.baseTagSet
686
                    if __chosenSpec.typeId is not None and \
687
                           __chosenSpec.typeId in self.__typeMap:
688
                        # ambiguous type
689
                        concreteDecoder = self.__typeMap[__chosenSpec.typeId]
690
                    elif baseTagSet in self.__tagMap:
691
                        # base type or tagged subtype
692
                        concreteDecoder = self.__tagMap[baseTagSet]
693
                    else:
694
                        concreteDecoder = None
695
                    if concreteDecoder:
696
                        asn1Spec = __chosenSpec
697
                        state = stDecodeValue
698
                    else:
699
                        state = stTryAsExplicitTag
700
                elif tagSet == self.__endOfOctetsTagSet:
701
                    concreteDecoder = self.__tagMap[tagSet]
702
                    state = stDecodeValue
703
                else:
704
                    state = stTryAsExplicitTag
705
            if state == stTryAsExplicitTag:
706
                if tagSet and \
707
                       tagSet[0][1] == tag.tagFormatConstructed and \
708
                       tagSet[0][0] != tag.tagClassUniversal:
709
                    # Assume explicit tagging
710
                    concreteDecoder = explicitTagDecoder
711
                    state = stDecodeValue
712
                else:
713
                    state = self.defaultErrorState
714
            if state == stDumpRawValue:
715
                concreteDecoder = self.defaultRawDecoder
716
                state = stDecodeValue
717
            if state == stDecodeValue:
718
                if recursiveFlag:
719
                    decodeFun = self
720
                else:
721
                    decodeFun = None
722
                if length == -1:  # indef length
723
                    value, substrate = concreteDecoder.indefLenValueDecoder(
0 ignored issues
show
introduced by
The variable concreteDecoder does not seem to be defined for all execution paths.
Loading history...
724
                        fullSubstrate, substrate, asn1Spec, tagSet, length,
725
                        stGetValueDecoder, decodeFun
726
                        )
727
                else:
728
                    value, _substrate = concreteDecoder.valueDecoder(
729
                        fullSubstrate, substrate, asn1Spec, tagSet, length,
730
                        stGetValueDecoder, decodeFun
731
                        )
732
                    if recursiveFlag:
733
                        substrate = substrate[length:]
734
                    else:
735
                        substrate = _substrate
736
                state = stStop
737
            if state == stErrorCondition:
738
                raise error.PyAsn1Error(
739
                    '%s not in asn1Spec: %s' % (tagSet, repr(asn1Spec))
740
                    )
741
        return value, substrate
0 ignored issues
show
introduced by
The variable value does not seem to be defined for all execution paths.
Loading history...
742
743
decode = Decoder(tagMap, typeMap)
744
745
# XXX
746
# non-recursive decoding; return position rather than substrate
747