Completed
Pull Request — master (#137)
by
unknown
03:26
created

wechatpy.crypto.BasePrpCrypto   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 35
Duplicated Lines 0 %

Test Coverage

Coverage 89.66%
Metric Value
dl 0
loc 35
ccs 26
cts 29
cp 0.8966
rs 10
wmc 5

4 Methods

Rating   Name   Duplication   Size   Complexity  
A _decrypt() 0 12 2
A _encrypt() 0 14 1
A get_random_string() 0 2 1
A __init__() 0 2 1
1
# -*- coding: utf-8 -*-
2 10
from __future__ import absolute_import, unicode_literals
3 10
import struct
4 10
import socket
5 10
import base64
6
7 10
from wechatpy.utils import to_text, to_binary, random_string, byte2int
8 10
from wechatpy.crypto.pkcs7 import PKCS7Encoder
9 10
try:
10 10
    from wechatpy.crypto.cryptography import WeChatCipher
11 5
except ImportError:
12 5
    try:
13 5
        from wechatpy.crypto.pycrypto import WeChatCipher
14
    except ImportError:
15
        raise Exception('You must install either cryptography or PyCrypto!')
16
17
18 10
class BasePrpCrypto(object):
19
20 10
    def __init__(self, key):
21 10
        self.cipher = WeChatCipher(key)
22
23 10
    def get_random_string(self):
24
        return random_string(16)
25
26 10
    def _encrypt(self, text, _id):
27 10
        text = to_binary(text)
28 10
        tmp_list = []
29 10
        tmp_list.append(to_binary(self.get_random_string()))
30 10
        length = struct.pack(b'I', socket.htonl(len(text)))
31 10
        tmp_list.append(length)
32 10
        tmp_list.append(text)
33 10
        tmp_list.append(to_binary(_id))
34
35 10
        text = b''.join(tmp_list)
36 10
        text = PKCS7Encoder.encode(text)
37
38 10
        ciphertext = to_binary(self.cipher.encrypt(text))
39 10
        return base64.b64encode(ciphertext)
40
41 10
    def _decrypt(self, text, _id, exception=None):
42 10
        text = to_binary(text)
43 10
        plain_text = self.cipher.decrypt(base64.b64decode(text))
44 10
        padding = byte2int(plain_text[-1])
45 10
        content = plain_text[16:-padding]
46 10
        xml_length = socket.ntohl(struct.unpack(b'I', content[:4])[0])
47 10
        xml_content = to_text(content[4:xml_length + 4])
48 10
        from_id = to_text(content[xml_length + 4:])
49 10
        if from_id != _id:
50
            exception = exception or Exception
51
            raise exception()
52
        return xml_content
53