Completed
Pull Request — master (#350)
by
unknown
20:58
created

format_url()   A

Complexity

Conditions 4

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 5
ccs 5
cts 5
cp 1
crap 4
rs 9.2
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 10
from __future__ import absolute_import, unicode_literals
3 10
4 10
import base64
5 10
import copy
6
import hashlib
7 10
import socket
8
9 10
import six
10
11
from wechatpy.utils import to_binary, to_text
12 10
13 10
14 10
def format_url(params, api_key=None):
15 10
    data = [to_binary('{0}={1}'.format(k, params[k])) for k in sorted(params) if params[k]]
16 10
    if api_key:
17
        data.append(to_binary('key={0}'.format(api_key)))
18
    return b"&".join(data)
19 10
20 10
21 10
def calculate_signature(params, api_key):
22
    url = format_url(params, api_key)
23
    return to_text(hashlib.md5(url).hexdigest().upper())
24 10
25
26
def _check_signature(params, api_key):
27
    _params = copy.deepcopy(params)
28
    sign = _params.pop('sign', '')
29
    return sign == calculate_signature(_params, api_key)
30 10
31 10
32 10
def dict_to_xml(d, sign):
33
    xml = ['<xml>\n']
34 10
    for k in sorted(d):
35 10
        # use sorted to avoid test error on Py3k
36 10
        v = d[k]
37
        if isinstance(v, six.integer_types) or (isinstance(v, six.string_types) and v.isdigit()):
38 10
            xml.append('<{0}>{1}</{0}>\n'.format(to_text(k), to_text(v)))
39
        else:
40
            xml.append(
41 10
                '<{0}><![CDATA[{1}]]></{0}>\n'.format(to_text(k), to_text(v))
42 10
            )
43
    xml.append('<sign><![CDATA[{0}]]></sign>\n</xml>'.format(to_text(sign)))
44
    return ''.join(xml)
45 10
46 10
47 10
def get_external_ip():
48 10
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
49 10
    try:
50 10
        wechat_ip = socket.gethostbyname('api.mch.weixin.qq.com')
51 10
        sock.connect((wechat_ip, 80))
52 10
        addr, port = sock.getsockname()
53
        sock.close()
54
        return addr
55
    except socket.error:
56
        return '127.0.0.1'
57
58
59
def rsa_encrypt(data, pem):
60
    """
61
    加密
62
    :param data: 待加密字符串
63
    :param pem: RSA key 内容
64
    :return:
65
    """
66
    encoded_data = data.encode() if not isinstance(data, bytes) else data
67
    try:
68
        from Crypto.PublicKey import RSA
69
        from Crypto.Cipher import PKCS1_OAEP
70
        rsakey = RSA.importKey(pem)
71
        cipher = PKCS1_OAEP.new(rsakey)
72
        encrypted_data = cipher.encrypt(encoded_data)
73
    except ModuleNotFoundError:
74
        from cryptography.hazmat.backends import default_backend
75
        from cryptography.hazmat.primitives import serialization
76
        from cryptography.hazmat.primitives import hashes
77
        from cryptography.hazmat.primitives.asymmetric import padding
78
79
        private_key = serialization.load_pem_private_key(pem, password=None, backend=default_backend())
80
        encrypted_data = private_key.sign(
81
            data=encoded_data,
82
            padding=padding.OAEP(
83
                mgf=padding.MGF1(hashes.SHA1()),
84
                algorithm=hashes.SHA1(),
85
                label=None,
86
            ),
87
            algorithm=hashes.SHA1()
88
        )
89
    except ModuleNotFoundError:
90
        raise ModuleNotFoundError('either crypto or cryptography is required')
91
    return base64.b64encode(encrypted_data).decode()
92