Completed
Pull Request — master (#350)
by
unknown
18:05
created

_check_signature()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 4
ccs 1
cts 4
cp 0.25
crap 1.4218
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 10
from __future__ import absolute_import, unicode_literals
3 10
4 10
import base64
5 10
import copy
6
import hashlib
7 10
import socket
8
9 10
import six
10
from Crypto.PublicKey import RSA
11
12 10
from wechatpy.utils import to_binary, to_text
13 10
from Crypto.Cipher import PKCS1_OAEP
14 10
15 10
16 10
def format_url(params, api_key=None):
17
    data = [to_binary('{0}={1}'.format(k, params[k])) for k in sorted(params) if params[k]]
18
    if api_key:
19 10
        data.append(to_binary('key={0}'.format(api_key)))
20 10
    return b"&".join(data)
21 10
22
23
def calculate_signature(params, api_key):
24 10
    url = format_url(params, api_key)
25
    return to_text(hashlib.md5(url).hexdigest().upper())
26
27
28
def _check_signature(params, api_key):
29
    _params = copy.deepcopy(params)
30 10
    sign = _params.pop('sign', '')
31 10
    return sign == calculate_signature(_params, api_key)
32 10
33
34 10
def dict_to_xml(d, sign):
35 10
    xml = ['<xml>\n']
36 10
    for k in sorted(d):
37
        # use sorted to avoid test error on Py3k
38 10
        v = d[k]
39
        if isinstance(v, six.integer_types) or (isinstance(v, six.string_types) and v.isdigit()):
40
            xml.append('<{0}>{1}</{0}>\n'.format(to_text(k), to_text(v)))
41 10
        else:
42 10
            xml.append(
43
                '<{0}><![CDATA[{1}]]></{0}>\n'.format(to_text(k), to_text(v))
44
            )
45 10
    xml.append('<sign><![CDATA[{0}]]></sign>\n</xml>'.format(to_text(sign)))
46 10
    return ''.join(xml)
47 10
48 10
49 10
def get_external_ip():
50 10
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
51 10
    try:
52 10
        wechat_ip = socket.gethostbyname('api.mch.weixin.qq.com')
53
        sock.connect((wechat_ip, 80))
54
        addr, port = sock.getsockname()
55
        sock.close()
56
        return addr
57
    except socket.error:
58
        return '127.0.0.1'
59
60
61
def rsa_encrypt(data, pem):
62
    """
63
    加密
64
    :param data: 待加密字符串
65
    :param pem: RSA key 内容
66
    :return:
67
    """
68
    rsakey = RSA.importKey(pem)
69
    cipher = PKCS1_OAEP.new(rsakey)
70
    return base64.b64encode(cipher.encrypt(data.encode())).decode()
71