| Total Complexity | 157 |
| Total Lines | 744 |
| Duplicated Lines | 4.03 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like pyasn1.codec.ber.decoder often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # BER decoder |
||
| 2 | from pyasn1.type import tag, base, univ, char, useful, tagmap |
||
| 3 | from pyasn1.codec.ber import eoo |
||
| 4 | from pyasn1.compat.octets import oct2int, octs2ints |
||
| 5 | from pyasn1 import error |
||
| 6 | |||
| 7 | class AbstractDecoder: |
||
| 8 | protoComponent = None |
||
| 9 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 10 | length, state, decodeFun): |
||
| 11 | raise error.PyAsn1Error('Decoder not implemented for %s' % tagSet) |
||
| 12 | |||
| 13 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 14 | length, state, decodeFun): |
||
| 15 | raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % tagSet) |
||
| 16 | |||
| 17 | class AbstractSimpleDecoder(AbstractDecoder): |
||
| 18 | def _createComponent(self, asn1Spec, tagSet, value=None): |
||
| 19 | if asn1Spec is None: |
||
| 20 | return self.protoComponent.clone(value, tagSet) |
||
| 21 | elif value is None: |
||
| 22 | return asn1Spec |
||
| 23 | else: |
||
| 24 | return asn1Spec.clone(value) |
||
| 25 | |||
| 26 | class AbstractConstructedDecoder(AbstractDecoder): |
||
| 27 | def _createComponent(self, asn1Spec, tagSet, value=None): |
||
| 28 | if asn1Spec is None: |
||
| 29 | return self.protoComponent.clone(tagSet) |
||
| 30 | else: |
||
| 31 | return asn1Spec.clone() |
||
| 32 | |||
| 33 | class EndOfOctetsDecoder(AbstractSimpleDecoder): |
||
| 34 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 35 | length, state, decodeFun): |
||
| 36 | return eoo.endOfOctets, substrate[:length] |
||
| 37 | |||
| 38 | class ExplicitTagDecoder(AbstractSimpleDecoder): |
||
| 39 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 40 | length, state, decodeFun): |
||
| 41 | return decodeFun(substrate[:length], asn1Spec, tagSet, length) |
||
| 42 | |||
| 43 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 44 | length, state, decodeFun): |
||
| 45 | value, substrate = decodeFun(substrate, asn1Spec, tagSet, length) |
||
| 46 | terminator, substrate = decodeFun(substrate) |
||
| 47 | if terminator == eoo.endOfOctets: |
||
| 48 | return value, substrate |
||
| 49 | else: |
||
| 50 | raise error.PyAsn1Error('Missing end-of-octets terminator') |
||
| 51 | |||
| 52 | explicitTagDecoder = ExplicitTagDecoder() |
||
| 53 | |||
| 54 | class IntegerDecoder(AbstractSimpleDecoder): |
||
| 55 | protoComponent = univ.Integer(0) |
||
| 56 | precomputedValues = { |
||
| 57 | '\x00': 0, |
||
| 58 | '\x01': 1, |
||
| 59 | '\x02': 2, |
||
| 60 | '\x03': 3, |
||
| 61 | '\x04': 4, |
||
| 62 | '\x05': 5, |
||
| 63 | '\x06': 6, |
||
| 64 | '\x07': 7, |
||
| 65 | '\x08': 8, |
||
| 66 | '\x09': 9, |
||
| 67 | '\xff': -1, |
||
| 68 | '\xfe': -2, |
||
| 69 | '\xfd': -3, |
||
| 70 | '\xfc': -4, |
||
| 71 | '\xfb': -5 |
||
| 72 | } |
||
| 73 | |||
| 74 | def _valueFilter(self, value): |
||
| 75 | try: |
||
| 76 | return int(value) |
||
| 77 | except OverflowError: |
||
| 78 | return value |
||
| 79 | |||
| 80 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
| 81 | state, decodeFun): |
||
| 82 | substrate = substrate[:length] |
||
| 83 | if not substrate: |
||
| 84 | raise error.PyAsn1Error('Empty substrate') |
||
| 85 | if substrate in self.precomputedValues: |
||
| 86 | value = self.precomputedValues[substrate] |
||
| 87 | else: |
||
| 88 | firstOctet = oct2int(substrate[0]) |
||
| 89 | if firstOctet & 0x80: |
||
| 90 | value = -1 |
||
| 91 | else: |
||
| 92 | value = 0 |
||
| 93 | for octet in substrate: |
||
| 94 | value = value << 8 | oct2int(octet) |
||
| 95 | value = self._valueFilter(value) |
||
| 96 | return self._createComponent(asn1Spec, tagSet, value), substrate |
||
| 97 | |||
| 98 | class BooleanDecoder(IntegerDecoder): |
||
| 99 | protoComponent = univ.Boolean(0) |
||
| 100 | def _valueFilter(self, value): |
||
| 101 | if value: |
||
| 102 | return 1 |
||
| 103 | else: |
||
| 104 | return 0 |
||
| 105 | |||
| 106 | class BitStringDecoder(AbstractSimpleDecoder): |
||
| 107 | protoComponent = univ.BitString(()) |
||
| 108 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
| 109 | state, decodeFun): |
||
| 110 | substrate = substrate[:length] |
||
| 111 | if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
||
| 112 | if not substrate: |
||
| 113 | raise error.PyAsn1Error('Missing initial octet') |
||
| 114 | trailingBits = oct2int(substrate[0]) |
||
| 115 | if trailingBits > 7: |
||
| 116 | raise error.PyAsn1Error( |
||
| 117 | 'Trailing bits overflow %s' % trailingBits |
||
| 118 | ) |
||
| 119 | substrate = substrate[1:] |
||
| 120 | lsb = p = 0; l = len(substrate)-1; b = () |
||
| 121 | while p <= l: |
||
| 122 | if p == l: |
||
| 123 | lsb = trailingBits |
||
| 124 | j = 7 |
||
| 125 | o = oct2int(substrate[p]) |
||
| 126 | while j >= lsb: |
||
| 127 | b = b + ((o>>j)&0x01,) |
||
| 128 | j = j - 1 |
||
| 129 | p = p + 1 |
||
| 130 | return self._createComponent(asn1Spec, tagSet, b), '' |
||
| 131 | r = self._createComponent(asn1Spec, tagSet, ()) |
||
| 132 | if not decodeFun: |
||
| 133 | return r, substrate |
||
| 134 | while substrate: |
||
| 135 | component, substrate = decodeFun(substrate) |
||
| 136 | r = r + component |
||
| 137 | return r, substrate |
||
| 138 | |||
| 139 | View Code Duplication | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
|
|
|
|||
| 140 | length, state, decodeFun): |
||
| 141 | r = self._createComponent(asn1Spec, tagSet, '') |
||
| 142 | if not decodeFun: |
||
| 143 | return r, substrate |
||
| 144 | while substrate: |
||
| 145 | component, substrate = decodeFun(substrate) |
||
| 146 | if component == eoo.endOfOctets: |
||
| 147 | break |
||
| 148 | r = r + component |
||
| 149 | else: |
||
| 150 | raise error.SubstrateUnderrunError( |
||
| 151 | 'No EOO seen before substrate ends' |
||
| 152 | ) |
||
| 153 | return r, substrate |
||
| 154 | |||
| 155 | class OctetStringDecoder(AbstractSimpleDecoder): |
||
| 156 | protoComponent = univ.OctetString('') |
||
| 157 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
| 158 | state, decodeFun): |
||
| 159 | substrate = substrate[:length] |
||
| 160 | if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? |
||
| 161 | return self._createComponent(asn1Spec, tagSet, substrate), '' |
||
| 162 | r = self._createComponent(asn1Spec, tagSet, '') |
||
| 163 | if not decodeFun: |
||
| 164 | return r, substrate |
||
| 165 | while substrate: |
||
| 166 | component, substrate = decodeFun(substrate) |
||
| 167 | r = r + component |
||
| 168 | return r, substrate |
||
| 169 | |||
| 170 | View Code Duplication | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
|
| 171 | length, state, decodeFun): |
||
| 172 | r = self._createComponent(asn1Spec, tagSet, '') |
||
| 173 | if not decodeFun: |
||
| 174 | return r, substrate |
||
| 175 | while substrate: |
||
| 176 | component, substrate = decodeFun(substrate) |
||
| 177 | if component == eoo.endOfOctets: |
||
| 178 | break |
||
| 179 | r = r + component |
||
| 180 | else: |
||
| 181 | raise error.SubstrateUnderrunError( |
||
| 182 | 'No EOO seen before substrate ends' |
||
| 183 | ) |
||
| 184 | return r, substrate |
||
| 185 | |||
| 186 | class NullDecoder(AbstractSimpleDecoder): |
||
| 187 | protoComponent = univ.Null('') |
||
| 188 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 189 | length, state, decodeFun): |
||
| 190 | substrate = substrate[:length] |
||
| 191 | r = self._createComponent(asn1Spec, tagSet) |
||
| 192 | if substrate: |
||
| 193 | raise error.PyAsn1Error('Unexpected substrate for Null') |
||
| 194 | return r, substrate |
||
| 195 | |||
| 196 | class ObjectIdentifierDecoder(AbstractSimpleDecoder): |
||
| 197 | protoComponent = univ.ObjectIdentifier(()) |
||
| 198 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
| 199 | state, decodeFun): |
||
| 200 | substrate = substrate[:length] |
||
| 201 | if not substrate: |
||
| 202 | raise error.PyAsn1Error('Empty substrate') |
||
| 203 | oid = (); index = 0 |
||
| 204 | # Get the first subid |
||
| 205 | subId = oct2int(substrate[index]) |
||
| 206 | oid = oid + divmod(subId, 40) |
||
| 207 | |||
| 208 | index = index + 1 |
||
| 209 | substrateLen = len(substrate) |
||
| 210 | |||
| 211 | while index < substrateLen: |
||
| 212 | subId = oct2int(substrate[index]) |
||
| 213 | if subId < 128: |
||
| 214 | oid = oid + (subId,) |
||
| 215 | index = index + 1 |
||
| 216 | else: |
||
| 217 | # Construct subid from a number of octets |
||
| 218 | nextSubId = subId |
||
| 219 | subId = 0 |
||
| 220 | while nextSubId >= 128 and index < substrateLen: |
||
| 221 | subId = (subId << 7) + (nextSubId & 0x7F) |
||
| 222 | index = index + 1 |
||
| 223 | nextSubId = oct2int(substrate[index]) |
||
| 224 | if index == substrateLen: |
||
| 225 | raise error.SubstrateUnderrunError( |
||
| 226 | 'Short substrate for OID %s' % oid |
||
| 227 | ) |
||
| 228 | subId = (subId << 7) + nextSubId |
||
| 229 | oid = oid + (subId,) |
||
| 230 | index = index + 1 |
||
| 231 | return self._createComponent(asn1Spec, tagSet, oid), substrate[index:] |
||
| 232 | |||
| 233 | class RealDecoder(AbstractSimpleDecoder): |
||
| 234 | protoComponent = univ.Real() |
||
| 235 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 236 | length, state, decodeFun): |
||
| 237 | substrate = substrate[:length] |
||
| 238 | if not length: |
||
| 239 | raise error.SubstrateUnderrunError('Short substrate for Real') |
||
| 240 | fo = oct2int(substrate[0]); substrate = substrate[1:] |
||
| 241 | if fo & 0x40: # infinite value |
||
| 242 | value = fo & 0x01 and '-inf' or 'inf' |
||
| 243 | elif fo & 0x80: # binary enoding |
||
| 244 | if fo & 0x11 == 0: |
||
| 245 | n = 1 |
||
| 246 | elif fo & 0x01: |
||
| 247 | n = 2 |
||
| 248 | elif fo & 0x02: |
||
| 249 | n = 3 |
||
| 250 | else: |
||
| 251 | n = oct2int(substrate[0]) |
||
| 252 | eo, substrate = substrate[:n], substrate[n:] |
||
| 253 | if not eo or not substrate: |
||
| 254 | raise error.PyAsn1Error('Real exponent screwed') |
||
| 255 | e = 0 |
||
| 256 | while eo: # exponent |
||
| 257 | e <<= 8 |
||
| 258 | e |= oct2int(eo[0]) |
||
| 259 | eo = eo[1:] |
||
| 260 | p = 0 |
||
| 261 | while substrate: # value |
||
| 262 | p <<= 8 |
||
| 263 | p |= oct2int(substrate[0]) |
||
| 264 | substrate = substrate[1:] |
||
| 265 | if fo & 0x40: # sign bit |
||
| 266 | p = -p |
||
| 267 | value = (p, 2, e) |
||
| 268 | elif fo & 0xc0 == 0: # character encoding |
||
| 269 | try: |
||
| 270 | if fo & 0x3 == 0x1: # NR1 |
||
| 271 | value = (int(substrate), 10, 0) |
||
| 272 | elif fo & 0x3 == 0x2: # NR2 |
||
| 273 | value = float(substrate) |
||
| 274 | elif fo & 0x3 == 0x3: # NR3 |
||
| 275 | value = float(substrate) |
||
| 276 | else: |
||
| 277 | raise error.SubstrateUnderrunError( |
||
| 278 | 'Unknown NR (tag %s)' % fo |
||
| 279 | ) |
||
| 280 | except ValueError: |
||
| 281 | raise error.SubstrateUnderrunError( |
||
| 282 | 'Bad character Real syntax' |
||
| 283 | ) |
||
| 284 | elif fo & 0xc0 == 0x40: # special real value |
||
| 285 | pass |
||
| 286 | else: |
||
| 287 | raise error.SubstrateUnderrunError( |
||
| 288 | 'Unknown encoding (tag %s)' % fo |
||
| 289 | ) |
||
| 290 | return self._createComponent(asn1Spec, tagSet, value), substrate |
||
| 291 | |||
| 292 | class SequenceDecoder(AbstractConstructedDecoder): |
||
| 293 | protoComponent = univ.Sequence() |
||
| 294 | def _getComponentTagMap(self, r, idx): |
||
| 295 | try: |
||
| 296 | return r.getComponentTagMapNearPosition(idx) |
||
| 297 | except error.PyAsn1Error: |
||
| 298 | return |
||
| 299 | |||
| 300 | def _getComponentPositionByType(self, r, t, idx): |
||
| 301 | return r.getComponentPositionNearType(t, idx) |
||
| 302 | |||
| 303 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 304 | length, state, decodeFun): |
||
| 305 | substrate = substrate[:length] |
||
| 306 | r = self._createComponent(asn1Spec, tagSet) |
||
| 307 | idx = 0 |
||
| 308 | if not decodeFun: |
||
| 309 | return r, substrate |
||
| 310 | while substrate: |
||
| 311 | asn1Spec = self._getComponentTagMap(r, idx) |
||
| 312 | component, substrate = decodeFun( |
||
| 313 | substrate, asn1Spec |
||
| 314 | ) |
||
| 315 | idx = self._getComponentPositionByType( |
||
| 316 | r, component.getEffectiveTagSet(), idx |
||
| 317 | ) |
||
| 318 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
| 319 | idx = idx + 1 |
||
| 320 | r.setDefaultComponents() |
||
| 321 | r.verifySizeSpec() |
||
| 322 | return r, substrate |
||
| 323 | |||
| 324 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 325 | length, state, decodeFun): |
||
| 326 | r = self._createComponent(asn1Spec, tagSet) |
||
| 327 | idx = 0 |
||
| 328 | while substrate: |
||
| 329 | asn1Spec = self._getComponentTagMap(r, idx) |
||
| 330 | if not decodeFun: |
||
| 331 | return r, substrate |
||
| 332 | component, substrate = decodeFun(substrate, asn1Spec) |
||
| 333 | if component == eoo.endOfOctets: |
||
| 334 | break |
||
| 335 | idx = self._getComponentPositionByType( |
||
| 336 | r, component.getEffectiveTagSet(), idx |
||
| 337 | ) |
||
| 338 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
| 339 | idx = idx + 1 |
||
| 340 | else: |
||
| 341 | raise error.SubstrateUnderrunError( |
||
| 342 | 'No EOO seen before substrate ends' |
||
| 343 | ) |
||
| 344 | r.setDefaultComponents() |
||
| 345 | r.verifySizeSpec() |
||
| 346 | return r, substrate |
||
| 347 | |||
| 348 | class SequenceOfDecoder(AbstractConstructedDecoder): |
||
| 349 | protoComponent = univ.SequenceOf() |
||
| 350 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 351 | length, state, decodeFun): |
||
| 352 | substrate = substrate[:length] |
||
| 353 | r = self._createComponent(asn1Spec, tagSet) |
||
| 354 | asn1Spec = r.getComponentType() |
||
| 355 | idx = 0 |
||
| 356 | if not decodeFun: |
||
| 357 | return r, substrate |
||
| 358 | while substrate: |
||
| 359 | component, substrate = decodeFun( |
||
| 360 | substrate, asn1Spec |
||
| 361 | ) |
||
| 362 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
| 363 | idx = idx + 1 |
||
| 364 | r.verifySizeSpec() |
||
| 365 | return r, substrate |
||
| 366 | |||
| 367 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 368 | length, state, decodeFun): |
||
| 369 | r = self._createComponent(asn1Spec, tagSet) |
||
| 370 | asn1Spec = r.getComponentType() |
||
| 371 | idx = 0 |
||
| 372 | if not decodeFun: |
||
| 373 | return r, substrate |
||
| 374 | while substrate: |
||
| 375 | component, substrate = decodeFun(substrate, asn1Spec) |
||
| 376 | if component == eoo.endOfOctets: |
||
| 377 | break |
||
| 378 | r.setComponentByPosition(idx, component, asn1Spec is None) |
||
| 379 | idx = idx + 1 |
||
| 380 | else: |
||
| 381 | raise error.SubstrateUnderrunError( |
||
| 382 | 'No EOO seen before substrate ends' |
||
| 383 | ) |
||
| 384 | r.verifySizeSpec() |
||
| 385 | return r, substrate |
||
| 386 | |||
| 387 | class SetDecoder(SequenceDecoder): |
||
| 388 | protoComponent = univ.Set() |
||
| 389 | def _getComponentTagMap(self, r, idx): |
||
| 390 | return r.getComponentTagMap() |
||
| 391 | |||
| 392 | def _getComponentPositionByType(self, r, t, idx): |
||
| 393 | nextIdx = r.getComponentPositionByType(t) |
||
| 394 | if nextIdx is None: |
||
| 395 | return idx |
||
| 396 | else: |
||
| 397 | return nextIdx |
||
| 398 | |||
| 399 | class SetOfDecoder(SequenceOfDecoder): |
||
| 400 | protoComponent = univ.SetOf() |
||
| 401 | |||
| 402 | class ChoiceDecoder(AbstractConstructedDecoder): |
||
| 403 | protoComponent = univ.Choice() |
||
| 404 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 405 | length, state, decodeFun): |
||
| 406 | substrate = substrate[:length] |
||
| 407 | r = self._createComponent(asn1Spec, tagSet) |
||
| 408 | if not decodeFun: |
||
| 409 | return r, substrate |
||
| 410 | if r.getTagSet() == tagSet: # explicitly tagged Choice |
||
| 411 | component, substrate = decodeFun( |
||
| 412 | substrate, r.getComponentTagMap() |
||
| 413 | ) |
||
| 414 | else: |
||
| 415 | component, substrate = decodeFun( |
||
| 416 | substrate, r.getComponentTagMap(), tagSet, length, state |
||
| 417 | ) |
||
| 418 | if isinstance(component, univ.Choice): |
||
| 419 | effectiveTagSet = component.getEffectiveTagSet() |
||
| 420 | else: |
||
| 421 | effectiveTagSet = component.getTagSet() |
||
| 422 | r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) |
||
| 423 | return r, substrate |
||
| 424 | |||
| 425 | indefLenValueDecoder = valueDecoder |
||
| 426 | |||
| 427 | class AnyDecoder(AbstractSimpleDecoder): |
||
| 428 | protoComponent = univ.Any() |
||
| 429 | def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 430 | length, state, decodeFun): |
||
| 431 | if asn1Spec is None or \ |
||
| 432 | asn1Spec is not None and tagSet != asn1Spec.getTagSet(): |
||
| 433 | # untagged Any container, recover inner header substrate |
||
| 434 | length = length + len(fullSubstrate) - len(substrate) |
||
| 435 | substrate = fullSubstrate |
||
| 436 | substrate = substrate[:length] |
||
| 437 | return self._createComponent(asn1Spec, tagSet, value=substrate), '' |
||
| 438 | |||
| 439 | def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, |
||
| 440 | length, state, decodeFun): |
||
| 441 | if asn1Spec is not None and tagSet == asn1Spec.getTagSet(): |
||
| 442 | # tagged Any type -- consume header substrate |
||
| 443 | header = '' |
||
| 444 | else: |
||
| 445 | # untagged Any, recover header substrate |
||
| 446 | header = fullSubstrate[:-len(substrate)] |
||
| 447 | |||
| 448 | r = self._createComponent(asn1Spec, tagSet, header) |
||
| 449 | |||
| 450 | # Any components do not inherit initial tag |
||
| 451 | asn1Spec = self.protoComponent |
||
| 452 | |||
| 453 | if not decodeFun: |
||
| 454 | return r, substrate |
||
| 455 | while substrate: |
||
| 456 | component, substrate = decodeFun(substrate, asn1Spec) |
||
| 457 | if component == eoo.endOfOctets: |
||
| 458 | break |
||
| 459 | r = r + component |
||
| 460 | else: |
||
| 461 | raise error.SubstrateUnderrunError( |
||
| 462 | 'No EOO seen before substrate ends' |
||
| 463 | ) |
||
| 464 | return r, substrate |
||
| 465 | |||
| 466 | # character string types |
||
| 467 | class UTF8StringDecoder(OctetStringDecoder): |
||
| 468 | protoComponent = char.UTF8String() |
||
| 469 | class NumericStringDecoder(OctetStringDecoder): |
||
| 470 | protoComponent = char.NumericString() |
||
| 471 | class PrintableStringDecoder(OctetStringDecoder): |
||
| 472 | protoComponent = char.PrintableString() |
||
| 473 | class TeletexStringDecoder(OctetStringDecoder): |
||
| 474 | protoComponent = char.TeletexString() |
||
| 475 | class VideotexStringDecoder(OctetStringDecoder): |
||
| 476 | protoComponent = char.VideotexString() |
||
| 477 | class IA5StringDecoder(OctetStringDecoder): |
||
| 478 | protoComponent = char.IA5String() |
||
| 479 | class GraphicStringDecoder(OctetStringDecoder): |
||
| 480 | protoComponent = char.GraphicString() |
||
| 481 | class VisibleStringDecoder(OctetStringDecoder): |
||
| 482 | protoComponent = char.VisibleString() |
||
| 483 | class GeneralStringDecoder(OctetStringDecoder): |
||
| 484 | protoComponent = char.GeneralString() |
||
| 485 | class UniversalStringDecoder(OctetStringDecoder): |
||
| 486 | protoComponent = char.UniversalString() |
||
| 487 | class BMPStringDecoder(OctetStringDecoder): |
||
| 488 | protoComponent = char.BMPString() |
||
| 489 | |||
| 490 | # "useful" types |
||
| 491 | class GeneralizedTimeDecoder(OctetStringDecoder): |
||
| 492 | protoComponent = useful.GeneralizedTime() |
||
| 493 | class UTCTimeDecoder(OctetStringDecoder): |
||
| 494 | protoComponent = useful.UTCTime() |
||
| 495 | |||
| 496 | tagMap = { |
||
| 497 | eoo.endOfOctets.tagSet: EndOfOctetsDecoder(), |
||
| 498 | univ.Integer.tagSet: IntegerDecoder(), |
||
| 499 | univ.Boolean.tagSet: BooleanDecoder(), |
||
| 500 | univ.BitString.tagSet: BitStringDecoder(), |
||
| 501 | univ.OctetString.tagSet: OctetStringDecoder(), |
||
| 502 | univ.Null.tagSet: NullDecoder(), |
||
| 503 | univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(), |
||
| 504 | univ.Enumerated.tagSet: IntegerDecoder(), |
||
| 505 | univ.Real.tagSet: RealDecoder(), |
||
| 506 | univ.Sequence.tagSet: SequenceDecoder(), # conflicts with SequenceOf |
||
| 507 | univ.Set.tagSet: SetDecoder(), # conflicts with SetOf |
||
| 508 | univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any |
||
| 509 | # character string types |
||
| 510 | char.UTF8String.tagSet: UTF8StringDecoder(), |
||
| 511 | char.NumericString.tagSet: NumericStringDecoder(), |
||
| 512 | char.PrintableString.tagSet: PrintableStringDecoder(), |
||
| 513 | char.TeletexString.tagSet: TeletexStringDecoder(), |
||
| 514 | char.VideotexString.tagSet: VideotexStringDecoder(), |
||
| 515 | char.IA5String.tagSet: IA5StringDecoder(), |
||
| 516 | char.GraphicString.tagSet: GraphicStringDecoder(), |
||
| 517 | char.VisibleString.tagSet: VisibleStringDecoder(), |
||
| 518 | char.GeneralString.tagSet: GeneralStringDecoder(), |
||
| 519 | char.UniversalString.tagSet: UniversalStringDecoder(), |
||
| 520 | char.BMPString.tagSet: BMPStringDecoder(), |
||
| 521 | # useful types |
||
| 522 | useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), |
||
| 523 | useful.UTCTime.tagSet: UTCTimeDecoder() |
||
| 524 | } |
||
| 525 | |||
| 526 | # Type-to-codec map for ambiguous ASN.1 types |
||
| 527 | typeMap = { |
||
| 528 | univ.Set.typeId: SetDecoder(), |
||
| 529 | univ.SetOf.typeId: SetOfDecoder(), |
||
| 530 | univ.Sequence.typeId: SequenceDecoder(), |
||
| 531 | univ.SequenceOf.typeId: SequenceOfDecoder(), |
||
| 532 | univ.Choice.typeId: ChoiceDecoder(), |
||
| 533 | univ.Any.typeId: AnyDecoder() |
||
| 534 | } |
||
| 535 | |||
| 536 | ( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec, |
||
| 537 | stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue, |
||
| 538 | stDumpRawValue, stErrorCondition, stStop ) = [x for x in range(10)] |
||
| 539 | |||
| 540 | class Decoder: |
||
| 541 | defaultErrorState = stErrorCondition |
||
| 542 | # defaultErrorState = stDumpRawValue |
||
| 543 | defaultRawDecoder = AnyDecoder() |
||
| 544 | def __init__(self, tagMap, typeMap={}): |
||
| 545 | self.__tagMap = tagMap |
||
| 546 | self.__typeMap = typeMap |
||
| 547 | self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet() |
||
| 548 | # Tag & TagSet objects caches |
||
| 549 | self.__tagCache = {} |
||
| 550 | self.__tagSetCache = {} |
||
| 551 | |||
| 552 | def __call__(self, substrate, asn1Spec=None, tagSet=None, |
||
| 553 | length=None, state=stDecodeTag, recursiveFlag=1): |
||
| 554 | fullSubstrate = substrate |
||
| 555 | while state != stStop: |
||
| 556 | if state == stDecodeTag: |
||
| 557 | # Decode tag |
||
| 558 | if not substrate: |
||
| 559 | raise error.SubstrateUnderrunError( |
||
| 560 | 'Short octet stream on tag decoding' |
||
| 561 | ) |
||
| 562 | |||
| 563 | firstOctet = substrate[0] |
||
| 564 | substrate = substrate[1:] |
||
| 565 | if firstOctet in self.__tagCache: |
||
| 566 | lastTag = self.__tagCache[firstOctet] |
||
| 567 | else: |
||
| 568 | t = oct2int(firstOctet) |
||
| 569 | tagClass = t&0xC0 |
||
| 570 | tagFormat = t&0x20 |
||
| 571 | tagId = t&0x1F |
||
| 572 | if tagId == 0x1F: |
||
| 573 | tagId = 0 |
||
| 574 | while 1: |
||
| 575 | if not substrate: |
||
| 576 | raise error.SubstrateUnderrunError( |
||
| 577 | 'Short octet stream on long tag decoding' |
||
| 578 | ) |
||
| 579 | t = oct2int(substrate[0]) |
||
| 580 | tagId = tagId << 7 | (t&0x7F) |
||
| 581 | substrate = substrate[1:] |
||
| 582 | if not t&0x80: |
||
| 583 | break |
||
| 584 | lastTag = tag.Tag( |
||
| 585 | tagClass=tagClass, tagFormat=tagFormat, tagId=tagId |
||
| 586 | ) |
||
| 587 | if tagId < 31: |
||
| 588 | # cache short tags |
||
| 589 | self.__tagCache[firstOctet] = lastTag |
||
| 590 | if tagSet is None: |
||
| 591 | if firstOctet in self.__tagSetCache: |
||
| 592 | tagSet = self.__tagSetCache[firstOctet] |
||
| 593 | else: |
||
| 594 | # base tag not recovered |
||
| 595 | tagSet = tag.TagSet((), lastTag) |
||
| 596 | if firstOctet in self.__tagCache: |
||
| 597 | self.__tagSetCache[firstOctet] = tagSet |
||
| 598 | else: |
||
| 599 | tagSet = lastTag + tagSet |
||
| 600 | state = stDecodeLength |
||
| 601 | if state == stDecodeLength: |
||
| 602 | # Decode length |
||
| 603 | if not substrate: |
||
| 604 | raise error.SubstrateUnderrunError( |
||
| 605 | 'Short octet stream on length decoding' |
||
| 606 | ) |
||
| 607 | firstOctet = oct2int(substrate[0]) |
||
| 608 | if firstOctet == 128: |
||
| 609 | size = 1 |
||
| 610 | length = -1 |
||
| 611 | elif firstOctet < 128: |
||
| 612 | length, size = firstOctet, 1 |
||
| 613 | else: |
||
| 614 | size = firstOctet & 0x7F |
||
| 615 | # encoded in size bytes |
||
| 616 | length = 0 |
||
| 617 | lengthString = substrate[1:size+1] |
||
| 618 | # missing check on maximum size, which shouldn't be a |
||
| 619 | # problem, we can handle more than is possible |
||
| 620 | if len(lengthString) != size: |
||
| 621 | raise error.SubstrateUnderrunError( |
||
| 622 | '%s<%s at %s' % |
||
| 623 | (size, len(lengthString), tagSet) |
||
| 624 | ) |
||
| 625 | for char in lengthString: |
||
| 626 | length = (length << 8) | oct2int(char) |
||
| 627 | size = size + 1 |
||
| 628 | state = stGetValueDecoder |
||
| 629 | substrate = substrate[size:] |
||
| 630 | if length != -1 and len(substrate) < length: |
||
| 631 | raise error.SubstrateUnderrunError( |
||
| 632 | '%d-octet short' % (length - len(substrate)) |
||
| 633 | ) |
||
| 634 | if state == stGetValueDecoder: |
||
| 635 | if asn1Spec is None: |
||
| 636 | state = stGetValueDecoderByTag |
||
| 637 | else: |
||
| 638 | state = stGetValueDecoderByAsn1Spec |
||
| 639 | # |
||
| 640 | # There're two ways of creating subtypes in ASN.1 what influences |
||
| 641 | # decoder operation. These methods are: |
||
| 642 | # 1) Either base types used in or no IMPLICIT tagging has been |
||
| 643 | # applied on subtyping. |
||
| 644 | # 2) Subtype syntax drops base type information (by means of |
||
| 645 | # IMPLICIT tagging. |
||
| 646 | # The first case allows for complete tag recovery from substrate |
||
| 647 | # while the second one requires original ASN.1 type spec for |
||
| 648 | # decoding. |
||
| 649 | # |
||
| 650 | # In either case a set of tags (tagSet) is coming from substrate |
||
| 651 | # in an incremental, tag-by-tag fashion (this is the case of |
||
| 652 | # EXPLICIT tag which is most basic). Outermost tag comes first |
||
| 653 | # from the wire. |
||
| 654 | # |
||
| 655 | if state == stGetValueDecoderByTag: |
||
| 656 | if tagSet in self.__tagMap: |
||
| 657 | concreteDecoder = self.__tagMap[tagSet] |
||
| 658 | else: |
||
| 659 | concreteDecoder = None |
||
| 660 | if concreteDecoder: |
||
| 661 | state = stDecodeValue |
||
| 662 | else: |
||
| 663 | _k = tagSet[:1] |
||
| 664 | if _k in self.__tagMap: |
||
| 665 | concreteDecoder = self.__tagMap[_k] |
||
| 666 | else: |
||
| 667 | concreteDecoder = None |
||
| 668 | if concreteDecoder: |
||
| 669 | state = stDecodeValue |
||
| 670 | else: |
||
| 671 | state = stTryAsExplicitTag |
||
| 672 | if state == stGetValueDecoderByAsn1Spec: |
||
| 673 | if isinstance(asn1Spec, (dict, tagmap.TagMap)): |
||
| 674 | if tagSet in asn1Spec: |
||
| 675 | __chosenSpec = asn1Spec[tagSet] |
||
| 676 | else: |
||
| 677 | __chosenSpec = None |
||
| 678 | else: |
||
| 679 | __chosenSpec = asn1Spec |
||
| 680 | if __chosenSpec is not None and ( |
||
| 681 | tagSet == __chosenSpec.getTagSet() or \ |
||
| 682 | tagSet in __chosenSpec.getTagMap() |
||
| 683 | ): |
||
| 684 | # use base type for codec lookup to recover untagged types |
||
| 685 | baseTagSet = __chosenSpec.baseTagSet |
||
| 686 | if __chosenSpec.typeId is not None and \ |
||
| 687 | __chosenSpec.typeId in self.__typeMap: |
||
| 688 | # ambiguous type |
||
| 689 | concreteDecoder = self.__typeMap[__chosenSpec.typeId] |
||
| 690 | elif baseTagSet in self.__tagMap: |
||
| 691 | # base type or tagged subtype |
||
| 692 | concreteDecoder = self.__tagMap[baseTagSet] |
||
| 693 | else: |
||
| 694 | concreteDecoder = None |
||
| 695 | if concreteDecoder: |
||
| 696 | asn1Spec = __chosenSpec |
||
| 697 | state = stDecodeValue |
||
| 698 | else: |
||
| 699 | state = stTryAsExplicitTag |
||
| 700 | elif tagSet == self.__endOfOctetsTagSet: |
||
| 701 | concreteDecoder = self.__tagMap[tagSet] |
||
| 702 | state = stDecodeValue |
||
| 703 | else: |
||
| 704 | state = stTryAsExplicitTag |
||
| 705 | if state == stTryAsExplicitTag: |
||
| 706 | if tagSet and \ |
||
| 707 | tagSet[0][1] == tag.tagFormatConstructed and \ |
||
| 708 | tagSet[0][0] != tag.tagClassUniversal: |
||
| 709 | # Assume explicit tagging |
||
| 710 | concreteDecoder = explicitTagDecoder |
||
| 711 | state = stDecodeValue |
||
| 712 | else: |
||
| 713 | state = self.defaultErrorState |
||
| 714 | if state == stDumpRawValue: |
||
| 715 | concreteDecoder = self.defaultRawDecoder |
||
| 716 | state = stDecodeValue |
||
| 717 | if state == stDecodeValue: |
||
| 718 | if recursiveFlag: |
||
| 719 | decodeFun = self |
||
| 720 | else: |
||
| 721 | decodeFun = None |
||
| 722 | if length == -1: # indef length |
||
| 723 | value, substrate = concreteDecoder.indefLenValueDecoder( |
||
| 724 | fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
| 725 | stGetValueDecoder, decodeFun |
||
| 726 | ) |
||
| 727 | else: |
||
| 728 | value, _substrate = concreteDecoder.valueDecoder( |
||
| 729 | fullSubstrate, substrate, asn1Spec, tagSet, length, |
||
| 730 | stGetValueDecoder, decodeFun |
||
| 731 | ) |
||
| 732 | if recursiveFlag: |
||
| 733 | substrate = substrate[length:] |
||
| 734 | else: |
||
| 735 | substrate = _substrate |
||
| 736 | state = stStop |
||
| 737 | if state == stErrorCondition: |
||
| 738 | raise error.PyAsn1Error( |
||
| 739 | '%s not in asn1Spec: %s' % (tagSet, repr(asn1Spec)) |
||
| 740 | ) |
||
| 741 | return value, substrate |
||
| 742 | |||
| 743 | decode = Decoder(tagMap, typeMap) |
||
| 744 | |||
| 747 |