ck_airport.SspanelQd.checkin()   C
last analyzed

Complexity

Conditions 10

Size

Total Lines 77
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 55
nop 3
dl 0
loc 77
rs 5.6727
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ck_airport.SspanelQd.checkin() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
"""
3
:author @Icrons
4
cron: 20 10 * * *
5
new Env('机场签到');
6
"""
7
8
import json
9
import re
10
import traceback
11
12
import requests
13
import urllib3
14
15
from notify_mtr import send
16
from utils import get_data
17
18
urllib3.disable_warnings()
19
20
21
class SspanelQd:
22
    def __init__(self, check_items):
23
        self.check_items = check_items
24
25
    @staticmethod
26
    def checkin(url, email, password):
27
        url = url.rstrip("/")
28
        emails = email.split("@")
29
        email = f"{emails[0]}%40{emails[1]}" if len(emails) > 1 else emails[0]
30
        session = requests.session()
31
32
        # 以下 except 都是用来捕获当 requests 请求出现异常时,
33
        # 通过捕获然后等待网络情况的变化,以此来保护程序的不间断运行
34
        try:
35
            session.get(url, verify=False)
36
        except requests.exceptions.ConnectionError:
37
            return f"{url}\n网络不通"
38
        except requests.exceptions.ChunkedEncodingError:
39
            return f"{url}\n分块编码错误"
40
        except Exception:
41
            print(f"未知错误,错误信息:\n{traceback.format_exc()}")
42
            return f"{url}\n未知错误,请查看日志"
43
44
        login_url = f"{url}/auth/login"
45
        headers = {
46
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) "
47
            "AppleWebKit/537.36 (KHTML, like Gecko) "
48
            "Chrome/56.0.2924.87 Safari/537.36",
49
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
50
        }
51
        login_data = f"email={email}&passwd={password}&code=".encode()
52
53
        try:
54
            response = session.post(
55
                login_url, login_data, headers=headers, verify=False
56
            )
57
            login_text = response.text.encode("utf-8").decode("unicode_escape")
58
            print(f"{url} 接口登录返回信息:{login_text}")
59
            login_json = json.loads(login_text)
60
            if login_json.get("ret") == 0:
61
                return f'{url}\n{login_json.get("msg")}'
62
        except Exception:
63
            print(f"登录失败,错误信息:\n{traceback.format_exc()}")
64
            return f"{url}\n登录失败,请查看日志"
65
66
        headers = {
67
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) "
68
            "AppleWebKit/537.36 (KHTML, like Gecko) "
69
            "Chrome/56.0.2924.87 Safari/537.36",
70
            "Referer": f"{url}/user",
71
        }
72
73
        try:
74
            response = session.post(
75
                f"{url}/user/checkin", headers=headers, verify=False
76
            )
77
            sign_text = response.text.encode("utf-8").decode("unicode_escape")
78
            print(f"{url} 接口签到返回信息:{sign_text}")
79
            sign_json = json.loads(sign_text)
80
            sign_msg = sign_json.get("msg")
81
            msg = f"{url}\n{sign_msg}" if sign_msg else f"{url}\n{sign_json}"
82
        except Exception:
83
            msg = f"{url}\n签到失败失败,请查看日志"
84
            print(f"签到失败,错误信息:\n{traceback.format_exc()}")
85
86
        # 以下只适配了editXY主题
87
        try:
88
            info_url = f"{url}/user"
89
            response = session.get(info_url, verify=False)
90
            level = re.findall(r'\["Class", "(.*?)"],', response.text)[0]
91
            day = re.findall(r'\["Class_Expire", "(.*)"],', response.text)[0]
92
            rest = re.findall(r'\["Unused_Traffic", "(.*?)"]', response.text)[0]
93
            return (
94
                f"{url}\n"
95
                f"- 今日签到信息:{msg}\n"
96
                f"- 用户等级:{level}\n"
97
                f"- 到期时间:{day}\n"
98
                f"- 剩余流量:{rest}"
99
            )
100
        except Exception:
101
            return msg
102
103
    def main(self):
104
        msg_all = ""
105
        for check_item in self.check_items:
106
            # 机场地址
107
            url = str(check_item.get("url"))
108
            # 登录信息
109
            email = str(check_item.get("email"))
110
            password = str(check_item.get("password"))
111
            if url and email and password:
112
                msg = self.checkin(url, email, password)
113
            else:
114
                msg = "配置错误"
115
            msg_all += msg + "\n\n"
116
        return msg_all
117
118
119
if __name__ == "__main__":
120
    _data = get_data()
121
    _check_items = _data.get("AIRPORT", [])
122
    res = SspanelQd(check_items=_check_items).main()
123
    send("机场签到", res)
124