1 | # -*- coding: utf-8 -*- |
||
2 | from __future__ import absolute_import, unicode_literals |
||
3 | import unittest |
||
4 | |||
5 | import xmltodict |
||
6 | |||
7 | from wechatpy.enterprise import crypto as _crypto |
||
8 | from wechatpy.enterprise.crypto import WeChatCrypto |
||
9 | |||
10 | |||
11 | class PrpCryptoMock(_crypto.PrpCrypto): |
||
12 | |||
13 | def get_random_string(self): |
||
14 | return '1234567890123456' |
||
15 | |||
16 | |||
17 | class CryptoTestCase(unittest.TestCase): |
||
18 | |||
19 | token = '123456' |
||
20 | encoding_aes_key = 'kWxPEV2UEDyxWpmPdKC3F4dgPDmOvfKX1HGnEUDS1aR' |
||
21 | corp_id = 'wx49f0ab532d5d035a' |
||
22 | |||
23 | def test_check_signature_should_ok(self): |
||
24 | signature = 'dd6b9c95b495b3f7e2901bfbc76c664930ffdb96' |
||
25 | timestamp = '1411443780' |
||
26 | nonce = '437374425' |
||
27 | echo_str = '4ByGGj+sVCYcvGeQYhaKIk1o0pQRNbRjxybjTGblXrBaXlTXeOo1+bXFXDQQb1o6co6Yh9Bv41n7hOchLF6p+Q==' # NOQA |
||
28 | |||
29 | crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id) |
||
30 | echo_str = crypto.check_signature( |
||
31 | signature, |
||
32 | timestamp, |
||
33 | nonce, |
||
34 | echo_str |
||
35 | ) |
||
36 | |||
37 | def test_check_signature_should_fail(self): |
||
38 | from wechatpy.exceptions import InvalidSignatureException |
||
39 | |||
40 | signature = 'dd6b9c95b495b3f7e2901bfbc76c664930ffdb96' |
||
41 | timestamp = '1411443780' |
||
42 | nonce = '437374424' |
||
43 | echo_str = '4ByGGj+sVCYcvGeQYhaKIk1o0pQRNbRjxybjTGblXrBaXlTXeOo1+bXFXDQQb1o6co6Yh9Bv41n7hOchLF6p+Q==' # NOQA |
||
44 | |||
45 | crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id) |
||
46 | self.assertRaises( |
||
47 | InvalidSignatureException, |
||
48 | crypto.check_signature, |
||
49 | signature, timestamp, nonce, echo_str |
||
50 | ) |
||
51 | |||
52 | def test_encrypt_message(self): |
||
53 | origin_crypto = _crypto.PrpCrypto |
||
54 | _crypto.PrpCrypto = PrpCryptoMock |
||
55 | |||
56 | nonce = '461056294' |
||
57 | timestamp = '1411525903' |
||
58 | reply = """<xml> |
||
59 | <MsgType><![CDATA[text]]></MsgType> |
||
60 | <Content><![CDATA[test]]></Content> |
||
61 | <FromUserName><![CDATA[wx49f0ab532d5d035a]]></FromUserName> |
||
62 | <ToUserName><![CDATA[messense]]></ToUserName> |
||
63 | <AgentID>1</AgentID> |
||
64 | <CreateTime>1411525903</CreateTime> |
||
65 | </xml>""" |
||
66 | |||
67 | expected = """<xml> |
||
68 | <Encrypt><![CDATA[9s4gMv99m88kKTh/H8IdkOiMg6bisoy3ypwy9H4hvSPe9nsGaqyw5hhSjdYbcrKk+j3nba4HMOTzHrluLBYqxgNcBqGsL8GqxlhZgURnAtObvesEl5nZ+uBE8bviY0LWke8Zy9V/QYKxNV2FqllNXcfmstttyIkMKCCmVbCFM2JTF5wY0nFhHZSjPUL2Q1qvSUCUld+/WIXrx0oyKQmpB6o8NRrrNrsDf03oxI1p9FxUgMnwKKZeOA/uu+2IEvEBtb7muXsVbwbgX05UPPJvFurDXafG0RQyPR+mf1nDnAtQmmNOuiR5MIkdQ39xn1vWwi1O5oazPoQJz0nTYjxxEE8kv3kFxtAGVRe3ypD3WeK2XeFYFMNMpatF9XiKzHo3]]></Encrypt> |
||
69 | <MsgSignature><![CDATA[407518b7649e86ef23978113f92d27afa9296533]]></MsgSignature> |
||
70 | <TimeStamp>1411525903</TimeStamp> |
||
71 | <Nonce><![CDATA[461056294]]></Nonce> |
||
72 | </xml>""" |
||
73 | |||
74 | crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id) |
||
75 | encrypted = crypto.encrypt_message(reply, nonce, timestamp) |
||
76 | |||
77 | _crypto.PrpCrypto = origin_crypto |
||
78 | |||
79 | self.assertEqual(expected, encrypted) |
||
80 | |||
81 | View Code Duplication | def test_decrypt_message(self): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
82 | xml = """<xml><ToUserName><![CDATA[wx49f0ab532d5d035a]]></ToUserName> |
||
83 | <Encrypt><![CDATA[RgqEoJj5A4EMYlLvWO1F86ioRjZfaex/gePD0gOXTxpsq5Yj4GNglrBb8I2BAJVODGajiFnXBu7mCPatfjsu6IHCrsTyeDXzF6Bv283dGymzxh6ydJRvZsryDyZbLTE7rhnus50qGPMfp2wASFlzEgMW9z1ef/RD8XzaFYgm7iTdaXpXaG4+BiYyolBug/gYNx410cvkKR2/nPwBiT+P4hIiOAQqGp/TywZBtDh1yCF2KOd0gpiMZ5jSw3e29mTvmUHzkVQiMS6td7vXUaWOMZnYZlF3So2SjHnwh4jYFxdgpkHHqIrH/54SNdshoQgWYEvccTKe7FS709/5t6NMxuGhcUGAPOQipvWTT4dShyqio7mlsl5noTrb++x6En749zCpQVhDpbV6GDnTbcX2e8K9QaNWHp91eBdCRxthuL0=]]></Encrypt> |
||
84 | <AgentID><![CDATA[1]]></AgentID> |
||
85 | </xml>""" |
||
86 | |||
87 | signature = '74d92dfeb87ba7c714f89d98870ae5eb62dff26d' |
||
88 | timestamp = '1411525903' |
||
89 | nonce = '461056294' |
||
90 | |||
91 | crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id) |
||
92 | msg = crypto.decrypt_message(xml, signature, timestamp, nonce) |
||
93 | msg_dict = xmltodict.parse(msg)['xml'] |
||
94 | self.assertEqual('test', msg_dict['Content']) |
||
95 | self.assertEqual('messense', msg_dict['FromUserName']) |
||
96 | |||
97 | View Code Duplication | def test_decrypt_binary_message(self): |
|
0 ignored issues
–
show
|
|||
98 | xml = b"""<xml><ToUserName><![CDATA[wx49f0ab532d5d035a]]></ToUserName> |
||
99 | <Encrypt><![CDATA[RgqEoJj5A4EMYlLvWO1F86ioRjZfaex/gePD0gOXTxpsq5Yj4GNglrBb8I2BAJVODGajiFnXBu7mCPatfjsu6IHCrsTyeDXzF6Bv283dGymzxh6ydJRvZsryDyZbLTE7rhnus50qGPMfp2wASFlzEgMW9z1ef/RD8XzaFYgm7iTdaXpXaG4+BiYyolBug/gYNx410cvkKR2/nPwBiT+P4hIiOAQqGp/TywZBtDh1yCF2KOd0gpiMZ5jSw3e29mTvmUHzkVQiMS6td7vXUaWOMZnYZlF3So2SjHnwh4jYFxdgpkHHqIrH/54SNdshoQgWYEvccTKe7FS709/5t6NMxuGhcUGAPOQipvWTT4dShyqio7mlsl5noTrb++x6En749zCpQVhDpbV6GDnTbcX2e8K9QaNWHp91eBdCRxthuL0=]]></Encrypt> |
||
100 | <AgentID><![CDATA[1]]></AgentID> |
||
101 | </xml>""" |
||
102 | |||
103 | signature = '74d92dfeb87ba7c714f89d98870ae5eb62dff26d' |
||
104 | timestamp = '1411525903' |
||
105 | nonce = '461056294' |
||
106 | |||
107 | crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id) |
||
108 | msg = crypto.decrypt_message(xml, signature, timestamp, nonce) |
||
109 | msg_dict = xmltodict.parse(msg)['xml'] |
||
110 | self.assertEqual('test', msg_dict['Content']) |
||
111 | self.assertEqual('messense', msg_dict['FromUserName']) |
||
112 | |||
113 | def test_wxa_decrypt_message(self): |
||
114 | from wechatpy.crypto import WeChatWxaCrypto |
||
115 | |||
116 | appid = 'wx4f4bc4dec97d474b' |
||
117 | session_key = 'tiihtNczf5v6AKRyjwEUhQ==' |
||
118 | encrypted_data = 'CiyLU1Aw2KjvrjMdj8YKliAjtP4gsMZMQmRzooG2xrDcvSnxIMXFufNstNGTyaGS9uT5geRa0W4oTOb1WT7fJlAC' \ |
||
119 | '+oNPdbB+3hVbJSRgv+4lGOETKUQz6OYStslQ142dNCuabNPGBzlooOmB231qMM85d2' \ |
||
120 | '/fV6ChevvXvQP8Hkue1poOFtnEtpyxVLW1zAo6/1Xx1COxFvrc2d7UL/lmHInNlxuacJXwu0fjpXfz' \ |
||
121 | '/YqYzBIBzD6WUfTIF9GRHpOn/Hz7saL8xz+W//FRAUid1OksQaQx4CMs8LOddcQhULW4ucetDf96JcR3g0gfRK4PC7E' \ |
||
122 | '/r7Z6xNrXd2UIeorGj5Ef7b1pJAYB6Y5anaHqZ9J6nKEBvB4DnNLIVWSgARns' \ |
||
123 | '/8wR2SiRS7MNACwTyrGvt9ts8p12PKFdlqYTopNHR1Vf7XjfhQlVsAJdNiKdYmYVoKlaRv85IfVunYzO0IKXsyl7JCUjCpoG' \ |
||
124 | '20f0a04COwfneQAGGwd5oa+T8yO5hzuyDb/XcxxmK01EpqOyuxINew==' |
||
125 | iv = 'r7BXXKkLb8qrSNn05n0qiA==' |
||
126 | |||
127 | crypto = WeChatWxaCrypto(session_key, iv, appid) |
||
128 | decrypted = crypto.decrypt_message(encrypted_data) |
||
129 | self.assertEqual(appid, decrypted['watermark']['appid']) |
||
130 | |||
131 | def test_refund_notify_decrypt_message(self): |
||
132 | from wechatpy.crypto import WeChatRefundCrypto |
||
133 | |||
134 | appid = 'wx4f4bc4dec97d474b' |
||
135 | mch_id = '12345678' |
||
136 | api_key = 'OF4Ne3znh8KqL0V4NqmALQa0uXMYKcEk' |
||
137 | xml = ''' |
||
138 | <xml> |
||
139 | <return_code>SUCCESS</return_code> |
||
140 | <appid><![CDATA[wx4f4bc4dec97d474b]]></appid> |
||
141 | <mch_id><![CDATA[12345678]]></mch_id> |
||
142 | <nonce_str><![CDATA[c1c237efc87ee8dfcb3f9124b63a4d94]]></nonce_str> |
||
143 | <req_info><![CDATA[51A27s9LxbaEv52nBOrqa0FCs3HVIM31Wc3BpNqSCP5MQlS8HL1Vl7mZ21043B7PZy7adtOZGtZv44lGl0U8JXlVFL4Ltxsw6+pACjZBB0Qi1LUDjFlP2IyTxYX4eC57kxnCphDdIyRkU67kOWW/HN/efggTUlaPJkoL5kVe9Y1MZF55IHR4lwGI7WtVLtxATX1tkAVZF+/ri0WlPXo9+GkqeB2/id6Ozw09Tqp8C/TKbmKBpRhTCRU2TzfQoLa6fjbgETFHJ795aEMyUZPQYbbsz6U9Rj9lUzEh8+Oc8KYTXa9zVk+nnFr82+yjffb3d36ylTe0y/VmYf90rf9cbiJQdM00zZ8Qa99SlMTs2wjOO4Mi4Btlg1PebsLF8TkqbSZkRJDGs61UunUB8Km1b/VeBxXuv4D3H7DiVZ9Fj7lq2j07zFDpxxlhhen7kZvMWWO3lrfFpA9c4eFOqfF9SxcU9nBmHfDD1g/dsrMl6+a71xQHJ/gn80fZOefUBv92ggqutsuJ4gd4OqkmO0XUs8e4XQyLpra+LvpmahsU5lBIZuQcMq3FHrqe8hXTfp3+3qym2wGfvPI75jx41VZwmBQy9izB1Id+A5lUeXC5QJy+ryDu/dWkhB+aMjkElaI/Epmcc/IEPaI73W6Hc58hNednOM536rPnn+GDc98PdGszpLTjmaZSu8dtqwPaym66yRqFQowzFoBbooik57iHUFBCeWya7XcXYL9WKhRF3uIZM+Kfv9/Zs2pF+w9/EJpltuzPpT1GP2VTMSHz45zwphFlS2lX8wHyZ+k9n2dptzQqpOCPedt5iQAmAe1oNQMxNdPDH9g68x9UzWD1kfK/dPO+3NPAh0XG+io4x15fIknNf7JLjpAuWicMMzCdwC4hhqeYOzQ+pC9f2xJXi5YViNKIZY69oO8eoHYRitLZrHLgx3oB2zimmqX6BI+DMklKAzAqqgGmbFxcVTvxZB6cMCx+H7TTWXnjAG+6A3SrCcxg0l5+UB93pkAYR7SVJ+VaZoiBqlqnZvb7IM17n9XaaFntlzvbbFCIa7OnOQb5Lvw=]]></req_info> |
||
144 | </xml>''' |
||
145 | refund_crypto = WeChatRefundCrypto(api_key) |
||
146 | data = refund_crypto.decrypt_message(xml, appid, mch_id) |
||
147 | self.assertEqual('1234', data['out_refund_no']) |
||
148 | self.assertEqual('2020010418301551404339261814', data['out_trade_no']) |
||
149 | self.assertEqual('4200000483202001042069747825', data['transaction_id']) |
||
150 | self.assertEqual('50300103182020010413419046597', data['refund_id']) |
||
151 |