1
|
1 |
|
import os |
2
|
1 |
|
from datetime import datetime, UTC |
3
|
|
|
|
4
|
1 |
|
from ..ans1e import Ans1Encoder, Numbers, Classes, to_utc_time |
5
|
1 |
|
from ..models import Signer |
6
|
|
|
|
7
|
1 |
|
current_dir = os.path.dirname(__file__) |
8
|
|
|
|
9
|
1 |
|
from cryptography.hazmat.primitives import hashes |
10
|
1 |
|
from cryptography.hazmat.primitives.asymmetric import padding |
11
|
|
|
|
12
|
|
|
|
13
|
1 |
|
def create_pkcs7(data, signer: Signer, hash_algorithm): |
14
|
1 |
|
cert_bytes = signer.certificate_bytes() |
15
|
1 |
|
issuer_der = signer.certificate.get_issuer().der() |
16
|
1 |
|
serial = signer.certificate.get_serial_number() |
17
|
|
|
|
18
|
1 |
|
hash_object = hashes.Hash(hash_algorithm) |
19
|
1 |
|
hash_object.update(data) |
20
|
1 |
|
digest = hash_object.finalize() |
21
|
|
|
|
22
|
1 |
|
utctime = to_utc_time(datetime.now(UTC).replace(tzinfo=None)) |
23
|
|
|
|
24
|
1 |
|
e = Ans1Encoder() |
25
|
1 |
|
with e.seq(): |
26
|
1 |
|
e.oid("1.2.840.113549.1.9.3") |
27
|
1 |
|
with e.set(): |
28
|
1 |
|
e.oid("1.2.840.113549.1.7.1") |
29
|
1 |
|
with e.seq(): |
30
|
1 |
|
e.oid("1.2.840.113549.1.9.5") |
31
|
1 |
|
with e.set(): |
32
|
1 |
|
e(utctime, nr=Numbers.UTCTime) |
33
|
1 |
|
with e.seq(): |
34
|
1 |
|
e.oid("1.2.840.113549.1.9.4") |
35
|
1 |
|
with e.set(): |
36
|
1 |
|
e(digest, nr=Numbers.OctetString) |
37
|
1 |
|
signed_attributes = e.output() |
38
|
|
|
|
39
|
1 |
|
e = Ans1Encoder() |
40
|
1 |
|
with e.set(): |
41
|
1 |
|
e.write(signed_attributes) |
42
|
1 |
|
signing_data = e.output() |
43
|
|
|
|
44
|
1 |
|
signature = signer.key.sign( |
45
|
|
|
data=signing_data, |
46
|
|
|
padding=padding.PKCS1v15(), |
47
|
|
|
algorithm=hash_algorithm |
48
|
|
|
) |
49
|
|
|
|
50
|
1 |
|
e = Ans1Encoder() |
51
|
1 |
|
with e.seq(): |
52
|
1 |
|
e.oid('1.2.840.113549.1.7.2') |
53
|
1 |
|
with e.enter(nr=0, cls=Classes.Context): |
54
|
1 |
|
with e.seq(): |
55
|
1 |
|
e(1, nr=Numbers.Integer) |
56
|
1 |
|
with e.set(): |
57
|
1 |
|
with e.seq(): |
58
|
1 |
|
e.oid('1.3.14.3.2.26') |
59
|
1 |
|
e(nr=Numbers.Null) |
60
|
1 |
|
with e.seq(): |
61
|
1 |
|
e.oid("1.2.840.113549.1.7.1") |
62
|
1 |
|
with e.enter(nr=0, cls=Classes.Context): |
63
|
1 |
|
e(data, nr=Numbers.OctetString) |
64
|
1 |
|
with e.enter(nr=0, cls=Classes.Context): |
65
|
1 |
|
e.write(cert_bytes) |
66
|
1 |
|
with e.set(): |
67
|
1 |
|
with e.seq(): |
68
|
1 |
|
e(1, nr=Numbers.Integer) |
69
|
1 |
|
with e.seq(): |
70
|
1 |
|
e.write(issuer_der) |
71
|
1 |
|
e(serial, nr=Numbers.Integer) |
72
|
1 |
|
with e.seq(): |
73
|
1 |
|
e.oid('1.3.14.3.2.26') |
74
|
1 |
|
e(nr=Numbers.Null) |
75
|
1 |
|
with e.enter(nr=0, cls=Classes.Context): |
76
|
1 |
|
e.write(signed_attributes) |
77
|
1 |
|
with e.seq(): |
78
|
1 |
|
e.oid('1.2.840.113549.1.1.1') |
79
|
1 |
|
e(nr=Numbers.Null) |
80
|
1 |
|
e(signature, nr=Numbers.OctetString) |
81
|
|
|
|
82
|
|
|
return e.output() |
83
|
|
|
|