Completed
Pull Request — master (#350)
by
unknown
20:30 queued 22s
created

get_external_ip()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 10
ccs 8
cts 10
cp 0.8
crap 2.032
rs 9.4285
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2 10
from __future__ import absolute_import, unicode_literals
3 10
4 10
import base64
5 10
import copy
6
import hashlib
7 10
import socket
8
9 10
import six
10
from cryptography.hazmat.backends import default_backend
11
from cryptography.hazmat.primitives import serialization
12 10
from cryptography.hazmat.primitives import hashes
13 10
from cryptography.hazmat.primitives.asymmetric import padding
14 10
15 10
16 10
from wechatpy.utils import to_binary, to_text
17
18
19 10
def format_url(params, api_key=None):
20 10
    data = [to_binary('{0}={1}'.format(k, params[k])) for k in sorted(params) if params[k]]
21 10
    if api_key:
22
        data.append(to_binary('key={0}'.format(api_key)))
23
    return b"&".join(data)
24 10
25
26
def calculate_signature(params, api_key):
27
    url = format_url(params, api_key)
28
    return to_text(hashlib.md5(url).hexdigest().upper())
29
30 10
31 10
def _check_signature(params, api_key):
32 10
    _params = copy.deepcopy(params)
33
    sign = _params.pop('sign', '')
34 10
    return sign == calculate_signature(_params, api_key)
35 10
36 10
37
def dict_to_xml(d, sign):
38 10
    xml = ['<xml>\n']
39
    for k in sorted(d):
40
        # use sorted to avoid test error on Py3k
41 10
        v = d[k]
42 10
        if isinstance(v, six.integer_types) or (isinstance(v, six.string_types) and v.isdigit()):
43
            xml.append('<{0}>{1}</{0}>\n'.format(to_text(k), to_text(v)))
44
        else:
45 10
            xml.append(
46 10
                '<{0}><![CDATA[{1}]]></{0}>\n'.format(to_text(k), to_text(v))
47 10
            )
48 10
    xml.append('<sign><![CDATA[{0}]]></sign>\n</xml>'.format(to_text(sign)))
49 10
    return ''.join(xml)
50 10
51 10
52 10
def get_external_ip():
53
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
54
    try:
55
        wechat_ip = socket.gethostbyname('api.mch.weixin.qq.com')
56
        sock.connect((wechat_ip, 80))
57
        addr, port = sock.getsockname()
58
        sock.close()
59
        return addr
60
    except socket.error:
61
        return '127.0.0.1'
62
63
64
def rsa_encrypt(data, pem, b64_encode=True):
65
    """
66
    rsa 加密
67
    :param data: 待加密字符串
68
    :param pem: RSA public key 内容
69
    :param b64_encode: 是否对输出进行 base64 encode
70
    :return:
71
    """
72
    encoded_data = data.encode('utf-8') if not isinstance(data, bytes) else data
73
    pem = pem.encode('utf-8') if not isinstance(pem, bytes) else pem
74
    public_key = serialization.load_pem_public_key(pem, backend=default_backend())
75
    encrypted_data = public_key.encrypt(
76
        encoded_data,
77
        padding=padding.OAEP(
78
            mgf=padding.MGF1(hashes.SHA1()),
79
            algorithm=hashes.SHA1(),
80
            label=b'',
81
        )
82
    )
83
    if b64_encode:
84
        encrypted_data = base64.b64encode(encrypted_data).decode('utf-8')
85
    return encrypted_data
86
87
88
def rsa_decrypt(encrypted_data, pem, password=None):
89
    """
90
    rsa 解密
91
    :param encrypted_data: 待解密 bytes
92
    :param pem: RSA private key 内容
93
    :param password: RSA private key pass phrase
94
    :return:
95
    """
96
    pem = pem.encode('utf-8') if not isinstance(pem, bytes) else pem
97
    private_key = serialization.load_pem_private_key(pem, password, backend=default_backend())
98
    data = private_key.decrypt(
99
        encrypted_data,
100
        padding=padding.OAEP(
101
            mgf=padding.MGF1(hashes.SHA1()),
102
            algorithm=hashes.SHA1(),
103
            label=b'',
104
        )
105
    )
106
    return data
107