1
|
|
|
# BER encoder |
2
|
|
|
from pyasn1.type import base, tag, univ, char, useful |
3
|
|
|
from pyasn1.codec.ber import eoo |
4
|
|
|
from pyasn1.compat.octets import int2oct, ints2octs, null, str2octs |
5
|
|
|
from pyasn1 import error |
6
|
|
|
|
7
|
|
|
class Error(Exception): pass |
8
|
|
|
|
9
|
|
|
class AbstractItemEncoder: |
10
|
|
|
supportIndefLenMode = 1 |
11
|
|
|
def encodeTag(self, t, isConstructed): |
12
|
|
|
tagClass, tagFormat, tagId = t.asTuple() # this is a hotspot |
13
|
|
|
v = tagClass | tagFormat |
14
|
|
|
if isConstructed: |
15
|
|
|
v = v|tag.tagFormatConstructed |
16
|
|
|
if tagId < 31: |
17
|
|
|
return int2oct(v|tagId) |
18
|
|
|
else: |
19
|
|
|
s = int2oct(tagId&0x7f) |
20
|
|
|
tagId = tagId >> 7 |
21
|
|
|
while tagId: |
22
|
|
|
s = int2oct(0x80|(tagId&0x7f)) + s |
23
|
|
|
tagId = tagId >> 7 |
24
|
|
|
return int2oct(v|0x1F) + s |
25
|
|
|
|
26
|
|
|
def encodeLength(self, length, defMode): |
27
|
|
|
if not defMode and self.supportIndefLenMode: |
28
|
|
|
return int2oct(0x80) |
29
|
|
|
if length < 0x80: |
30
|
|
|
return int2oct(length) |
31
|
|
|
else: |
32
|
|
|
substrate = null |
33
|
|
|
while length: |
34
|
|
|
substrate = int2oct(length&0xff) + substrate |
35
|
|
|
length = length >> 8 |
36
|
|
|
substrateLen = len(substrate) |
37
|
|
|
if substrateLen > 126: |
38
|
|
|
raise Error('Length octets overflow (%d)' % substrateLen) |
39
|
|
|
return int2oct(0x80 | substrateLen) + substrate |
40
|
|
|
|
41
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
42
|
|
|
raise Error('Not implemented') |
43
|
|
|
|
44
|
|
|
def _encodeEndOfOctets(self, encodeFun, defMode): |
45
|
|
|
if defMode or not self.supportIndefLenMode: |
46
|
|
|
return null |
47
|
|
|
else: |
48
|
|
|
return encodeFun(eoo.endOfOctets, defMode) |
49
|
|
|
|
50
|
|
|
def encode(self, encodeFun, value, defMode, maxChunkSize): |
51
|
|
|
substrate, isConstructed = self.encodeValue( |
52
|
|
|
encodeFun, value, defMode, maxChunkSize |
53
|
|
|
) |
54
|
|
|
tagSet = value.getTagSet() |
55
|
|
|
if tagSet: |
56
|
|
|
if not isConstructed: # primitive form implies definite mode |
57
|
|
|
defMode = 1 |
58
|
|
|
return self.encodeTag( |
59
|
|
|
tagSet[-1], isConstructed |
60
|
|
|
) + self.encodeLength( |
61
|
|
|
len(substrate), defMode |
62
|
|
|
) + substrate + self._encodeEndOfOctets(encodeFun, defMode) |
63
|
|
|
else: |
64
|
|
|
return substrate # untagged value |
65
|
|
|
|
66
|
|
|
class EndOfOctetsEncoder(AbstractItemEncoder): |
67
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
68
|
|
|
return null, 0 |
69
|
|
|
|
70
|
|
|
class ExplicitlyTaggedItemEncoder(AbstractItemEncoder): |
71
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
72
|
|
|
if isinstance(value, base.AbstractConstructedAsn1Item): |
73
|
|
|
value = value.clone(tagSet=value.getTagSet()[:-1], |
74
|
|
|
cloneValueFlag=1) |
75
|
|
|
else: |
76
|
|
|
value = value.clone(tagSet=value.getTagSet()[:-1]) |
77
|
|
|
return encodeFun(value, defMode, maxChunkSize), 1 |
78
|
|
|
|
79
|
|
|
explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder() |
80
|
|
|
|
81
|
|
|
class IntegerEncoder(AbstractItemEncoder): |
82
|
|
|
supportIndefLenMode = 0 |
83
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
84
|
|
|
octets = [] |
85
|
|
|
value = int(value) # to save on ops on asn1 type |
86
|
|
|
while 1: |
87
|
|
|
octets.insert(0, value & 0xff) |
88
|
|
|
if value == 0 or value == -1: |
89
|
|
|
break |
90
|
|
|
value = value >> 8 |
91
|
|
|
if value == 0 and octets[0] & 0x80: |
92
|
|
|
octets.insert(0, 0) |
93
|
|
|
while len(octets) > 1 and \ |
94
|
|
|
(octets[0] == 0 and octets[1] & 0x80 == 0 or \ |
95
|
|
|
octets[0] == 0xff and octets[1] & 0x80 != 0): |
96
|
|
|
del octets[0] |
97
|
|
|
return ints2octs(octets), 0 |
98
|
|
|
|
99
|
|
|
class BitStringEncoder(AbstractItemEncoder): |
100
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
101
|
|
|
if not maxChunkSize or len(value) <= maxChunkSize*8: |
102
|
|
|
r = {}; l = len(value); p = 0; j = 7 |
103
|
|
|
while p < l: |
104
|
|
|
i, j = divmod(p, 8) |
105
|
|
|
r[i] = r.get(i,0) | value[p]<<(7-j) |
106
|
|
|
p = p + 1 |
107
|
|
|
keys = list(r); keys.sort() |
108
|
|
|
return int2oct(7-j) + ints2octs([r[k] for k in keys]), 0 |
109
|
|
|
else: |
110
|
|
|
pos = 0; substrate = null |
111
|
|
|
while 1: |
112
|
|
|
# count in octets |
113
|
|
|
v = value.clone(value[pos*8:pos*8+maxChunkSize*8]) |
114
|
|
|
if not v: |
115
|
|
|
break |
116
|
|
|
substrate = substrate + encodeFun(v, defMode, maxChunkSize) |
117
|
|
|
pos = pos + maxChunkSize |
118
|
|
|
return substrate, 1 |
119
|
|
|
|
120
|
|
|
class OctetStringEncoder(AbstractItemEncoder): |
121
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
122
|
|
|
if not maxChunkSize or len(value) <= maxChunkSize: |
123
|
|
|
return value.asOctets(), 0 |
124
|
|
|
else: |
125
|
|
|
pos = 0; substrate = null |
126
|
|
|
while 1: |
127
|
|
|
v = value.clone(value[pos:pos+maxChunkSize]) |
128
|
|
|
if not v: |
129
|
|
|
break |
130
|
|
|
substrate = substrate + encodeFun(v, defMode, maxChunkSize) |
131
|
|
|
pos = pos + maxChunkSize |
132
|
|
|
return substrate, 1 |
133
|
|
|
|
134
|
|
|
class NullEncoder(AbstractItemEncoder): |
135
|
|
|
supportIndefLenMode = 0 |
136
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
137
|
|
|
return null, 0 |
138
|
|
|
|
139
|
|
|
class ObjectIdentifierEncoder(AbstractItemEncoder): |
140
|
|
|
supportIndefLenMode = 0 |
141
|
|
|
precomputedValues = { |
142
|
|
|
(1, 3, 6, 1, 2): (43, 6, 1, 2), |
143
|
|
|
(1, 3, 6, 1, 4): (43, 6, 1, 4) |
144
|
|
|
} |
145
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
146
|
|
|
oid = value.asTuple() |
147
|
|
|
if oid[:5] in self.precomputedValues: |
148
|
|
|
octets = self.precomputedValues[oid[:5]] |
149
|
|
|
index = 5 |
150
|
|
|
else: |
151
|
|
|
if len(oid) < 2: |
152
|
|
|
raise error.PyAsn1Error('Short OID %s' % value) |
153
|
|
|
|
154
|
|
|
# Build the first twos |
155
|
|
|
index = 0 |
156
|
|
|
subid = oid[index] * 40 |
157
|
|
|
subid = subid + oid[index+1] |
158
|
|
|
if subid < 0 or subid > 0xff: |
159
|
|
|
raise error.PyAsn1Error( |
160
|
|
|
'Initial sub-ID overflow %s in OID %s' % (oid[index:], value) |
161
|
|
|
) |
162
|
|
|
octets = (subid,) |
163
|
|
|
index = index + 2 |
164
|
|
|
|
165
|
|
|
# Cycle through subids |
166
|
|
|
for subid in oid[index:]: |
167
|
|
|
if subid > -1 and subid < 128: |
168
|
|
|
# Optimize for the common case |
169
|
|
|
octets = octets + (subid & 0x7f,) |
170
|
|
|
elif subid < 0 or subid > 0xFFFFFFFF: |
171
|
|
|
raise error.PyAsn1Error( |
172
|
|
|
'SubId overflow %s in %s' % (subid, value) |
173
|
|
|
) |
174
|
|
|
else: |
175
|
|
|
# Pack large Sub-Object IDs |
176
|
|
|
res = (subid & 0x7f,) |
177
|
|
|
subid = subid >> 7 |
178
|
|
|
while subid > 0: |
179
|
|
|
res = (0x80 | (subid & 0x7f),) + res |
180
|
|
|
subid = subid >> 7 |
181
|
|
|
# Add packed Sub-Object ID to resulted Object ID |
182
|
|
|
octets += res |
183
|
|
|
|
184
|
|
|
return ints2octs(octets), 0 |
185
|
|
|
|
186
|
|
|
class RealEncoder(AbstractItemEncoder): |
187
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
188
|
|
|
if value.isPlusInfinity(): |
189
|
|
|
return int2oct(0x40), 0 |
190
|
|
|
if value.isMinusInfinity(): |
191
|
|
|
return int2oct(0x41), 0 |
192
|
|
|
m, b, e = value |
193
|
|
|
if not m: |
194
|
|
|
return null, 0 |
195
|
|
|
if b == 10: |
196
|
|
|
return str2octs('\x03%dE%s%d' % (m, e == 0 and '+' or '', e)), 0 |
197
|
|
|
elif b == 2: |
198
|
|
|
fo = 0x80 # binary enoding |
199
|
|
|
if m < 0: |
200
|
|
|
fo = fo | 0x40 # sign bit |
201
|
|
|
m = -m |
202
|
|
|
while int(m) != m: # drop floating point |
203
|
|
|
m *= 2 |
204
|
|
|
e -= 1 |
205
|
|
|
while m & 0x1 == 0: # mantissa normalization |
206
|
|
|
m >>= 1 |
207
|
|
|
e += 1 |
208
|
|
|
eo = null |
209
|
|
|
while e: |
210
|
|
|
eo = int2oct(e&0xff) + eo |
211
|
|
|
e >>= 8 |
212
|
|
|
n = len(eo) |
213
|
|
|
if n > 0xff: |
214
|
|
|
raise error.PyAsn1Error('Real exponent overflow') |
215
|
|
|
if n == 1: |
216
|
|
|
pass |
217
|
|
|
elif n == 2: |
218
|
|
|
fo |= 1 |
219
|
|
|
elif n == 3: |
220
|
|
|
fo |= 2 |
221
|
|
|
else: |
222
|
|
|
fo |= 3 |
223
|
|
|
eo = int2oct(n//0xff+1) + eo |
224
|
|
|
po = null |
225
|
|
|
while m: |
226
|
|
|
po = int2oct(m&0xff) + po |
227
|
|
|
m >>= 8 |
228
|
|
|
substrate = int2oct(fo) + eo + po |
229
|
|
|
return substrate, 0 |
230
|
|
|
else: |
231
|
|
|
raise error.PyAsn1Error('Prohibited Real base %s' % b) |
232
|
|
|
|
233
|
|
|
class SequenceEncoder(AbstractItemEncoder): |
234
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
235
|
|
|
value.setDefaultComponents() |
236
|
|
|
value.verifySizeSpec() |
237
|
|
|
substrate = null; idx = len(value) |
238
|
|
|
while idx > 0: |
239
|
|
|
idx = idx - 1 |
240
|
|
|
if value[idx] is None: # Optional component |
241
|
|
|
continue |
242
|
|
|
component = value.getDefaultComponentByPosition(idx) |
243
|
|
|
if component is not None and component == value[idx]: |
244
|
|
|
continue |
245
|
|
|
substrate = encodeFun( |
246
|
|
|
value[idx], defMode, maxChunkSize |
247
|
|
|
) + substrate |
248
|
|
|
return substrate, 1 |
249
|
|
|
|
250
|
|
|
class SequenceOfEncoder(AbstractItemEncoder): |
251
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
252
|
|
|
value.verifySizeSpec() |
253
|
|
|
substrate = null; idx = len(value) |
254
|
|
|
while idx > 0: |
255
|
|
|
idx = idx - 1 |
256
|
|
|
substrate = encodeFun( |
257
|
|
|
value[idx], defMode, maxChunkSize |
258
|
|
|
) + substrate |
259
|
|
|
return substrate, 1 |
260
|
|
|
|
261
|
|
|
class ChoiceEncoder(AbstractItemEncoder): |
262
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
263
|
|
|
return encodeFun(value.getComponent(), defMode, maxChunkSize), 1 |
264
|
|
|
|
265
|
|
|
class AnyEncoder(OctetStringEncoder): |
266
|
|
|
def encodeValue(self, encodeFun, value, defMode, maxChunkSize): |
267
|
|
|
return value.asOctets(), defMode == 0 |
268
|
|
|
|
269
|
|
|
tagMap = { |
270
|
|
|
eoo.endOfOctets.tagSet: EndOfOctetsEncoder(), |
271
|
|
|
univ.Boolean.tagSet: IntegerEncoder(), |
272
|
|
|
univ.Integer.tagSet: IntegerEncoder(), |
273
|
|
|
univ.BitString.tagSet: BitStringEncoder(), |
274
|
|
|
univ.OctetString.tagSet: OctetStringEncoder(), |
275
|
|
|
univ.Null.tagSet: NullEncoder(), |
276
|
|
|
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), |
277
|
|
|
univ.Enumerated.tagSet: IntegerEncoder(), |
278
|
|
|
univ.Real.tagSet: RealEncoder(), |
279
|
|
|
# Sequence & Set have same tags as SequenceOf & SetOf |
280
|
|
|
univ.SequenceOf.tagSet: SequenceOfEncoder(), |
281
|
|
|
univ.SetOf.tagSet: SequenceOfEncoder(), |
282
|
|
|
univ.Choice.tagSet: ChoiceEncoder(), |
283
|
|
|
# character string types |
284
|
|
|
char.UTF8String.tagSet: OctetStringEncoder(), |
285
|
|
|
char.NumericString.tagSet: OctetStringEncoder(), |
286
|
|
|
char.PrintableString.tagSet: OctetStringEncoder(), |
287
|
|
|
char.TeletexString.tagSet: OctetStringEncoder(), |
288
|
|
|
char.VideotexString.tagSet: OctetStringEncoder(), |
289
|
|
|
char.IA5String.tagSet: OctetStringEncoder(), |
290
|
|
|
char.GraphicString.tagSet: OctetStringEncoder(), |
291
|
|
|
char.VisibleString.tagSet: OctetStringEncoder(), |
292
|
|
|
char.GeneralString.tagSet: OctetStringEncoder(), |
293
|
|
|
char.UniversalString.tagSet: OctetStringEncoder(), |
294
|
|
|
char.BMPString.tagSet: OctetStringEncoder(), |
295
|
|
|
# useful types |
296
|
|
|
useful.GeneralizedTime.tagSet: OctetStringEncoder(), |
297
|
|
|
useful.UTCTime.tagSet: OctetStringEncoder() |
298
|
|
|
} |
299
|
|
|
|
300
|
|
|
# Type-to-codec map for ambiguous ASN.1 types |
301
|
|
|
typeMap = { |
302
|
|
|
univ.Set.typeId: SequenceEncoder(), |
303
|
|
|
univ.SetOf.typeId: SequenceOfEncoder(), |
304
|
|
|
univ.Sequence.typeId: SequenceEncoder(), |
305
|
|
|
univ.SequenceOf.typeId: SequenceOfEncoder(), |
306
|
|
|
univ.Choice.typeId: ChoiceEncoder(), |
307
|
|
|
univ.Any.typeId: AnyEncoder() |
308
|
|
|
} |
309
|
|
|
|
310
|
|
|
class Encoder: |
311
|
|
|
def __init__(self, tagMap, typeMap={}): |
312
|
|
|
self.__tagMap = tagMap |
313
|
|
|
self.__typeMap = typeMap |
314
|
|
|
|
315
|
|
|
def __call__(self, value, defMode=1, maxChunkSize=0): |
316
|
|
|
tagSet = value.getTagSet() |
317
|
|
|
if len(tagSet) > 1: |
318
|
|
|
concreteEncoder = explicitlyTaggedItemEncoder |
319
|
|
|
else: |
320
|
|
|
if value.typeId is not None and value.typeId in self.__typeMap: |
321
|
|
|
concreteEncoder = self.__typeMap[value.typeId] |
322
|
|
|
elif tagSet in self.__tagMap: |
323
|
|
|
concreteEncoder = self.__tagMap[tagSet] |
324
|
|
|
else: |
325
|
|
|
baseTagSet = value.baseTagSet |
326
|
|
|
if baseTagSet in self.__tagMap: |
327
|
|
|
concreteEncoder = self.__tagMap[baseTagSet] |
328
|
|
|
else: |
329
|
|
|
raise Error('No encoder for %s' % value) |
330
|
|
|
return concreteEncoder.encode( |
331
|
|
|
self, value, defMode, maxChunkSize |
332
|
|
|
) |
333
|
|
|
|
334
|
|
|
encode = Encoder(tagMap, typeMap) |
335
|
|
|
|