Passed
Push — master ( 2edf0a...c3d8b2 )
by Leon
02:07
created

ck_euserv   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 425
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 255
dl 0
loc 425
rs 9.36
c 0
b 0
f 0
wmc 38

10 Methods

Rating   Name   Duplication   Size   Complexity  
C EUserv.handle_captcha_solved_result() 0 50 11
A EUserv.captcha_solver() 0 35 1
A EUserv.get_pin_from_mailparser() 0 19 1
A EUserv.check() 0 11 4
A EUserv.get_servers() 0 23 4
B EUserv.main() 0 30 6
A EUserv.__init__() 0 10 1
B EUserv.renew() 0 73 2
A EUserv.get_captcha_solver_usage() 0 10 1
B EUserv.login() 0 79 6

1 Function

Rating   Name   Duplication   Size   Complexity  
A log() 0 3 1
1
# -*- coding: utf-8 -*-
2
"""
3
:author @ZetaoYang
4
cron: 0 10 */7 * *
5
new Env('EUserv');
6
"""
7
8
import base64
9
import json
10
import re
11
import time
12
13
import requests
14
from bs4 import BeautifulSoup
15
16
from notify_mtr import send
17
from utils import get_data
18
19
#
20
# SPDX-FileCopyrightText: (c) 2020-2021 CokeMine & Its repository contributors
21
# SPDX-FileCopyrightText: (c) 2021 A beam of light
22
#
23
# SPDX-License-Identifier: GPL-3.0-or-later
24
#
25
26
"""
27
euserv auto-renew script
28
29
ChangeLog
30
31
v2021.09.30
32
- Captcha automatic recognition using TrueCaptcha API
33
- Email notification
34
- Add login failure retry mechanism
35
- reformat log info
36
37
v2021.11.06
38
- Receive renew PIN(6-digits) using mailparser parsed data download url
39
  workflow: auto-forward your EUserv PIN email to your mailparser inbox 
40
  -> parsing PIN via mailparser -> get PIN from mailparser
41
- Update kc2_security_password_get_token request
42
43
v2021.11.26
44
- Adjust TrueCaptcha constraint parameters for high availability.
45
  Plus, the CAPTCHA of EUserv is currently case-insensitive, so the above adjustment works.
46
"""
47
48
# default value is TrueCaptcha demo credential,
49
# you can use your own credential via set environment variables:
50
# userid and apikey
51
# demo: https://apitruecaptcha.org/demo
52
# demo2: https://apitruecaptcha.org/demo2
53
# demo apikey also has a limit of 100 times per day
54
# {
55
# 'error': '101.0 above free usage limit 100 per day and no balance',
56
# 'requestId': '7690c065-70e0-4757-839b-5fd8381e65c7'
57
# }
58
59
60
desp = ""  # 空值
61
62
63
def log(info: str):
64
    global desp
65
    desp = desp + info + "\n"
66
67
68
class EUserv:
69
    def __init__(self, check_items):
70
        self.check_items = check_items
71
        self.BASE_URL = "https://support.euserv.com/index.iphp"
72
        self.UA = (
73
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
74
            "Chrome/94.0.4606.61 Safari/537.36 "
75
        )
76
        self.CHECK_CAPTCHA_SOLVER_USAGE = True
77
        self.MAILPARSER_DOWNLOAD_BASE_URL = "https://files.mailparser.io/d/"
78
        self.WAITING_TIME_OF_PIN = 15
79
80
    def captcha_solver(
81
        self, captcha_image_url: str, session: requests.session, userid, apikey
82
    ) -> dict:
83
        """
84
        TrueCaptcha API doc: https://apitruecaptcha.org/api
85
        Free to use 100 requests per day.
86
        -- response::
87
        {
88
            "result": "", ==> Or "result": 0
89
            "conf": 0.85,
90
            "usage": 0,
91
            "requestId": "ed0006e5-69f0-4617-b698-97dc054f9022",
92
            "version": "dev2"
93
        }
94
        """
95
        response = session.get(captcha_image_url)
96
        encoded_string = base64.b64encode(response.content)
97
        url = "https://api.apitruecaptcha.org/one/gettext"
98
99
        # Since "case": "mixed", "mode": "human"
100
        # can sometimes cause internal errors in the truecaptcha server.
101
        # So a more relaxed constraint(lower/upper & default) is used here.
102
        # Plus, the CAPTCHA of EUserv is currently case-insensitive, so the below adjustment works.
103
        data = {
104
            "userid": userid,
105
            "apikey": apikey,
106
            # case sensitivity of text (upper | lower| mixed)
107
            "case": "lower",
108
            # use human or AI (human | default)
109
            "mode": "default",
110
            "data": str(encoded_string)[2:-1],
111
        }
112
        r = requests.post(url=url, json=data)
113
        j = json.loads(r.text)
114
        return j
115
116
    def handle_captcha_solved_result(self, solved: dict) -> str:
117
        """Since CAPTCHA sometimes appears as a very simple binary arithmetic expression.
118
        But since recognition sometimes doesn't show the result of the calculation directly,
119
        that's what this function is for.
120
        """
121
        if "result" in solved:
122
            solved_result = solved["result"]
123
            if isinstance(solved_result, str):
124
                if "RESULT  IS" in solved_result:
125
                    log("[Captcha Solver] You are using the demo apikey.")
126
                    print(
127
                        "There is no guarantee that demo apikey will work in the future!"
128
                    )
129
                    # because using demo apikey
130
                    text = re.findall(r"RESULT  IS . (.*) .", solved_result)[0]
131
                else:
132
                    # using your own apikey
133
                    log("[Captcha Solver] You are using your own apikey.")
134
                    text = solved_result
135
                operators = ["X", "x", "+", "-"]
136
                if any(x in text for x in operators):
137
                    for operator in operators:
138
                        operator_pos = text.find(operator)
139
                        if operator == "x" or operator == "X":
140
                            operator = "*"
141
                        if operator_pos != -1:
142
                            left_part = text[:operator_pos]
143
                            right_part = text[operator_pos + 1 :]
144
                            if left_part.isdigit() and right_part.isdigit():
145
                                return eval(
146
                                    "{left} {operator} {right}".format(
147
                                        left=left_part,
148
                                        operator=operator,
149
                                        right=right_part,
150
                                    )
151
                                )
152
                            else:
153
                                # Because these symbols("X", "x", "+", "-") do not appear at the same time,
154
                                # it just contains an arithmetic symbol.
155
                                return text
156
                else:
157
                    return text
158
            else:
159
                print(f"[Captcha Solver] Returned JSON: {solved}")
160
                log("[Captcha Solver] Service Exception!")
161
                raise ValueError("[Captcha Solver] Service Exception!")
162
        else:
163
            print(f"[Captcha Solver] Returned JSON: {solved}")
164
            log("[Captcha Solver] Failed to find parsed results!")
165
            raise KeyError("[Captcha Solver] Failed to find parsed results!")
166
167
    def get_captcha_solver_usage(self, userid: str, apikey: str) -> dict:
168
        url = "https://api.apitruecaptcha.org/one/getusage"
169
170
        params = {
171
            "username": userid,
172
            "apikey": apikey,
173
        }
174
        r = requests.get(url=url, params=params)
175
        j = json.loads(r.text)
176
        return j
177
178
    def get_pin_from_mailparser(self, url_id: str) -> str:
179
        """
180
        response format:
181
        [
182
        {
183
            "id": "83b95f50f6202fb03950afbe00975eab",
184
            "received_at": "2021-11-06 02:30:07",  ==> up to mailparser account timezone setting, here is UTC 0000.
185
            "processed_at": "2021-11-06 02:30:07",
186
            "pin": "123456"
187
        }
188
        ]
189
        """
190
        response = requests.get(
191
            f"{self.MAILPARSER_DOWNLOAD_BASE_URL}{url_id}",
192
            # Mailparser parsed data download using Basic Authentication.
193
            # auth=("<your mailparser username>", "<your mailparser password>")
194
        )
195
        pin = response.json()[0]["pin"]
196
        return pin
197
198
    def login(
199
        self,
200
        username: str,
201
        password: str,
202
        userid: str,
203
        apikey: str,
204
    ) -> tuple:
205
        headers = {"user-agent": self.UA, "origin": "https://www.euserv.com"}
206
        url = self.BASE_URL
207
        captcha_image_url = "https://support.euserv.com/securimage_show.php"
208
        session = requests.Session()
209
210
        sess = session.get(url, headers=headers)
211
        sess_id = re.findall("PHPSESSID=(\\w{10,100});", str(sess.headers))[0]
212
        # visit png
213
        logo_png_url = "https://support.euserv.com/pic/logo_small.png"
214
        session.get(logo_png_url, headers=headers)
215
216
        login_data = {
217
            "email": username,
218
            "password": password,
219
            "form_selected_language": "en",
220
            "Submit": "Login",
221
            "subaction": "login",
222
            "sess_id": sess_id,
223
        }
224
        f = session.post(url, headers=headers, data=login_data)
225
        f.raise_for_status()
226
227
        if (
228
            f.text.find("Hello") == -1
229
            and f.text.find("Confirm or change your customer data here") == -1
230
        ):
231
            if (
232
                f.text.find(
233
                    "To finish the login process please solve the following captcha."
234
                )
235
                == -1
236
            ):
237
                return "-1", session
238
            else:
239
                log("[Captcha Solver] 进行验证码识别...")
240
                solved_result = self.captcha_solver(
241
                    captcha_image_url, session, userid, apikey
242
                )
243
                captcha_code = self.handle_captcha_solved_result(solved_result)
244
                log("[Captcha Solver] 识别的验证码是: {}".format(captcha_code))
245
246
                if self.CHECK_CAPTCHA_SOLVER_USAGE:
247
                    usage = self.get_captcha_solver_usage(userid, apikey)
248
                    log(
249
                        "[Captcha Solver] current date {0} api usage count: {1}".format(
250
                            usage[0]["date"], usage[0]["count"]
251
                        )
252
                    )
253
254
                f2 = session.post(
255
                    url,
256
                    headers=headers,
257
                    data={
258
                        "subaction": "login",
259
                        "sess_id": sess_id,
260
                        "captcha_code": captcha_code,
261
                    },
262
                )
263
                if (
264
                    f2.text.find(
265
                        "To finish the login process please solve the following captcha."
266
                    )
267
                    == -1
268
                ):
269
                    log("[Captcha Solver] 验证通过")
270
                    return sess_id, session
271
                else:
272
                    log("[Captcha Solver] 验证失败")
273
                    return "-1", session
274
275
        else:
276
            return sess_id, session
277
278
    def get_servers(self, sess_id: str, session: requests.session) -> dict:
279
        d = {}
280
        url = f"{self.BASE_URL}?sess_id=" + sess_id
281
        headers = {"user-agent": self.UA, "origin": "https://www.euserv.com"}
282
        f = session.get(url=url, headers=headers)
283
        f.raise_for_status()
284
        soup = BeautifulSoup(f.text, "html.parser")
285
        for tr in soup.select(
286
            "#kc2_order_customer_orders_tab_content_1 .kc2_order_table.kc2_content_table tr"
287
        ):
288
            server_id = tr.select(".td-z1-sp1-kc")
289
            if not len(server_id) == 1:
290
                continue
291
            flag = (
292
                True
293
                if tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0]
294
                .get_text()
295
                .find("Contract extension possible from")
296
                == -1
297
                else False
298
            )
299
            d[server_id[0].get_text()] = flag
300
        return d
301
302
    def renew(
303
        self,
304
        sess_id: str,
305
        session: requests.session,
306
        order_id: str,
307
        mailparser_dl_url_id: str,
308
    ) -> bool:
309
        url = self.BASE_URL
310
        headers = {
311
            "user-agent": self.UA,
312
            "Host": "support.euserv.com",
313
            "origin": "https://support.euserv.com",
314
            "Referer": self.BASE_URL,
315
        }
316
        data = {
317
            "Submit": "Extend contract",
318
            "sess_id": sess_id,
319
            "ord_no": order_id,
320
            "subaction": "choose_order",
321
            "choose_order_subaction": "show_contract_details",
322
        }
323
        session.post(url, headers=headers, data=data)
324
325
        # pop up 'Security Check' window, it will trigger 'send PIN' automatically.
326
        session.post(
327
            url,
328
            headers=headers,
329
            data={
330
                "sess_id": sess_id,
331
                "subaction": "show_kc2_security_password_dialog",
332
                "prefix": "kc2_customer_contract_details_extend_contract_",
333
                "type": "1",
334
            },
335
        )
336
337
        # # trigger 'Send new PIN to your Email-Address' (optional),
338
        # new_pin = session.post(url, headers=headers, data={
339
        #     "sess_id": sess_id,
340
        #     "subaction": "kc2_security_password_send_pin",
341
        #     "ident": f"kc2_customer_contract_details_extend_contract_{order_id}"
342
        # })
343
        # if not json.loads(new_pin.text)["rc"] == "100":
344
        #     print("new PIN Not Sended")
345
        #     return False
346
347
        # sleep WAITING_TIME_OF_PIN seconds waiting for mailparser email parsed PIN
348
        time.sleep(self.WAITING_TIME_OF_PIN)
349
        pin = self.get_pin_from_mailparser(mailparser_dl_url_id)
350
        log(f"[MailParser] PIN: {pin}")
351
352
        # using PIN instead of password to get token
353
        data = {
354
            "auth": pin,
355
            "sess_id": sess_id,
356
            "subaction": "kc2_security_password_get_token",
357
            "prefix": "kc2_customer_contract_details_extend_contract_",
358
            "type": 1,
359
            "ident": f"kc2_customer_contract_details_extend_contract_{order_id}",
360
        }
361
        f = session.post(url, headers=headers, data=data)
362
        f.raise_for_status()
363
        if not json.loads(f.text)["rs"] == "success":
364
            return False
365
        token = json.loads(f.text)["token"]["value"]
366
        data = {
367
            "sess_id": sess_id,
368
            "ord_id": order_id,
369
            "subaction": "kc2_customer_contract_details_extend_contract_term",
370
            "token": token,
371
        }
372
        session.post(url, headers=headers, data=data)
373
        time.sleep(5)
374
        return True
375
376
    def check(self, sess_id: str, session: requests.session):
377
        print("Checking.......")
378
        d = self.get_servers(sess_id, session)
379
        flag = True
380
        for key, val in d.items():
381
            if val:
382
                flag = False
383
                log("[EUserv] ServerID: %s Renew Failed!" % key)
384
385
        if flag:
386
            log("[EUserv] ALL Work Done! Enjoy~")
387
388
    def main(self):
389
        i = 0
390
        for check_item in self.check_items:
391
            username = check_item.get("username")
392
            password = check_item.get("password")
393
            userid = check_item.get("userid")
394
            apikey = check_item.get("apikey")
395
            mailparser_dl_url_id = check_item.get("mailparser_dl_url_id")
396
            log("*" * 12)
397
            log("[EUserv] 正在续费第 %d 个账号" % (i + 1))
398
            sessid, s = self.login(username, password, userid, apikey)
399
            if sessid == "-1":
400
                log("[EUserv] 第 %d 个账号登陆失败,请检查登录信息" % (i + 1))
401
                continue
402
            SERVERS = self.get_servers(sessid, s)
403
            log("[EUserv] 检测到第 {} 个账号有 {} 台 VPS,正在尝试续期".format(i + 1, len(SERVERS)))
404
            for k, v in SERVERS.items():
405
                if v:
406
                    if not self.renew(sessid, s, k, mailparser_dl_url_id):
407
                        log("[EUserv] ServerID: %s Renew Error!" % k)
408
                    else:
409
                        log("[EUserv] ServerID: %s has been successfully renewed!" % k)
410
                else:
411
                    log("[EUserv] ServerID: %s does not need to be renewed" % k)
412
            time.sleep(15)
413
            self.check(sessid, s)
414
            time.sleep(5)
415
            log("*" * 12)
416
            i += 1
417
        return desp
418
419
420
if __name__ == "__main__":
421
    data = get_data()
422
    _check_items = data.get("EUSERV", [])
423
    res = EUserv(check_items=_check_items).main()
424
    send("EUserv", res)
425