Passed
Push — master ( ce37c1...0e39ca )
by Leon
02:27
created

ck_epic.run()   B

Complexity

Conditions 5

Size

Total Lines 26
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 23
nop 2
dl 0
loc 26
rs 8.8613
c 0
b 0
f 0
1
"""
2
:author @luminoleon
3
cron: 50 1 * * *
4
new Env('Epic');
5
"""
6
7
import argparse
8
import asyncio
9
import base64
10
import datetime
11
import hashlib
12
import hmac
13
import json
14
import os
15
import re
16
import signal
17
import sys
18
import time
19
import urllib
20
from getpass import getpass
21
from json.decoder import JSONDecodeError
22
from typing import Callable, Dict, List, Optional, Tuple, Union
23
24
import requests
25
import schedule
26
from pyppeteer import launch, launcher
27
from pyppeteer.element_handle import ElementHandle
28
from pyppeteer.frame_manager import Frame
29
from pyppeteer.network_manager import Request
30
31
from notify_mtr import send
32
from utils import get_data
33
from utils_env import get_env_str
34
35
__version__ = "1.6.7"
36
37
38
NOTIFICATION_TITLE_START = "Epicgames Claimer:启动成功"
39
NOTIFICATION_TITLE_NEED_LOGIN = "Epicgames Claimer:需要登录"
40
NOTIFICATION_TITLE_CLAIM_SUCCEED = "Epicgames Claimer:领取成功"
41
NOTIFICATION_TITLE_ERROR = "EpicGames Claimer:错误"
42
NOTIFICATION_TITLE_TEST = "EpicGames Claimer:测试"
43
NOTIFICATION_CONTENT_START = "如果你收到了此消息,表示你可以正常接收来自Epicgames Claimer的通知推送"
44
NOTIFICATION_CONTENT_NEED_LOGIN = "未登录或登录信息已失效,请检查并尝试重新登录"
45
NOTIFICATION_CONTENT_CLAIM_SUCCEED = "成功领取到游戏:"
46
NOTIFICATION_CONTENT_OPEN_BROWSER_FAILED = "打开浏览器失败:"
47
NOTIFICATION_CONTENT_LOGIN_FAILED = "登录失败:"
48
NOTIFICATION_CONTENT_CLAIM_FAILED = "领取失败:"
49
NOTIFICATION_CONTENT_TEST = "测试是否通知推送已被正确设置"
50
NOTIFICATION_CONTENT_OWNED_ALL = "所有可领取的每周免费游戏已全部在库中"
51
52
53
if "--enable-automation" in launcher.DEFAULT_ARGS:
54
    launcher.DEFAULT_ARGS.remove("--enable-automation")
55
# Solve the issue of zombie processes
56
if "SIGCHLD" in dir(signal):
57
    signal.signal(signal.SIGCHLD, signal.SIG_IGN)
58
59
60
def get_current_time() -> str:
61
    current_time_string = str(datetime.datetime.now()).split(".")[0]
62
    return current_time_string
63
64
65
def log(text: str, level: str = "info") -> None:
66
    localtime = get_current_time()
67
    if level == "info":
68
        print("[{}  INFO] {}".format(localtime, text))
69
    elif level == "warning":
70
        print("\033[33m[{}  WARN] {}\033[0m".format(localtime, text))
71
    elif level == "error":
72
        print("\033[31m[{} ERROR] {}\033[0m".format(localtime, text))
73
74
75
class WeChat:
76
    def __init__(self, corpid, corpsecret, agentid) -> None:
77
        self.corpid = corpid
78
        self.corpsecret = corpsecret
79
        self.agentid = agentid
80
81
    def get_token(self) -> str:
82
        url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}".format(
83
            self.corpid, self.corpsecret
84
        )
85
        response = requests.get(url)
86
        if response.status_code == 200:
87
            return response.json()["access_token"]
88
        else:
89
            log("Failed to get access_token.", level="error")
90
            return ""
91
92
    def send_text(self, message, touser="@all") -> str:
93
        url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(
94
            self.get_token()
95
        )
96
        data = {
97
            "touser": touser,
98
            "msgtype": "text",
99
            "agentid": self.agentid,
100
            "text": {"content": message},
101
            "safe": 0,
102
        }
103
        send_msges = bytes(json.dumps(data), "utf-8")
104
        response = requests.post(url, send_msges)
105
        return response
106
107
    def send_mpnews(self, title, message, media_id, touser="@all") -> str:
108
        url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(
109
            self.get_token()
110
        )
111
        if not message:
112
            message = title
113
        data = {
114
            "touser": touser,
115
            "msgtype": "mpnews",
116
            "agentid": self.agentid,
117
            "mpnews": {
118
                "articles": [
119
                    {
120
                        "title": title,
121
                        "thumb_media_id": media_id,
122
                        "content_source_url": "",
123
                        "content": message.replace("\n", "<br/>"),
124
                        "digest": message,
125
                    }
126
                ]
127
            },
128
            "safe": 0,
129
        }
130
        send_msges = bytes(json.dumps(data), "utf-8")
131
        response = requests.post(url, send_msges)
132
        return response
133
134
135
class Notifications:
136
    def __init__(
137
        self,
138
        serverchan_sendkey: str = None,
139
        bark_push_url: str = "https://api.day.app/push",
140
        bark_device_key: str = None,
141
        telegram_bot_token: str = None,
142
        telegram_chat_id: str = None,
143
        wechat_qywx_am: str = None,
144
        dingtalk_access_token: str = None,
145
        dingtalk_secret: str = None,
146
    ) -> None:
147
        self.serverchan_sendkey = serverchan_sendkey
148
        self.bark_push_url = bark_push_url
149
        self.bark_device_key = bark_device_key
150
        self.telegram_bot_token = telegram_bot_token
151
        self.telegram_chat_id = telegram_chat_id
152
        self.wechat_qywx_am = wechat_qywx_am
153
        self.dingtalk_access_token = dingtalk_access_token
154
        self.dingtalk_secret = dingtalk_secret
155
156
    def push_serverchan(self, title: str, content: str = None) -> None:
157
        if self.serverchan_sendkey != None:
158
            try:
159
                url = "https://sctapi.ftqq.com/{}.send".format(self.serverchan_sendkey)
160
                data = {"title": title}
161
                if content != None:
162
                    data["desp"] = content
163
                requests.post(url, data=data)
164
            except Exception as e:
165
                log("Failed to push to ServerChan: {}".format(e), "error")
166
167
    def push_bark(self, title: str, content: str = None) -> None:
168
        if self.bark_device_key:
169
            try:
170
                response = requests.post(
171
                    url=self.bark_push_url,
172
                    headers={
173
                        "Content-Type": "application/json; charset=utf-8",
174
                    },
175
                    data=json.dumps(
176
                        {
177
                            "body": content,
178
                            "device_key": self.bark_device_key,
179
                            "title": title,
180
                        }
181
                    ),
182
                )
183
                log(f"Bark Response HTTP Status Code: {response.status_code}")
184
                log(f"Bark Response HTTP Response Body: {response.content}")
185
            except Exception as e:
186
                log("Failed to push to Bark: {}".format(e), "error")
187
188
    def push_telegram(self, title: str = None, content: str = None) -> None:
189
        if self.telegram_bot_token:
190
            try:
191
                push_text = f"{title}\n\n{content}" if title else content
192
                response = requests.post(
193
                    url=f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage",
194
                    data={"chat_id": self.telegram_chat_id, "text": push_text},
195
                )
196
                log(f"Telegram Response HTTP Status Code: {response.status_code}")
197
                log(f"Telegram Response HTTP Response Body: {response.content}")
198
            except Exception as e:
199
                log("Failed to push to Telegram: {}".format(e), "error")
200
201
    def push_wechat(self, title: str, content: str = None) -> None:
202
        if self.wechat_qywx_am:
203
            try:
204
                qywx_am_ay = re.split(",", self.wechat_qywx_am)
205
                if 4 < len(qywx_am_ay) > 5:
206
                    log("WeChat AM is invalid.", level="error")
207
                    return
208
                corpid = qywx_am_ay[0]
209
                corpsecret = qywx_am_ay[1]
210
                touser = qywx_am_ay[2]
211
                agentid = qywx_am_ay[3]
212
                try:
213
                    media_id = qywx_am_ay[4]
214
                except:
215
                    media_id = None
216
                wx = WeChat(corpid, corpsecret, agentid)
217
                if media_id != None:
218
                    response = wx.send_mpnews(title, content, media_id, touser)
219
                else:
220
                    message = title + "\n\n" + content
221
                    response = wx.send_text(message, touser)
222
                response = response.json()
223
                if response["errmsg"] == "ok":
224
                    log("Successfully sent wechat message.")
225
                else:
226
                    log(
227
                        "Failed to send wechat message. errmsg: {}".format(
228
                            response["errmsg"]
229
                        ),
230
                        level="error",
231
                    )
232
            except Exception as e:
233
                log("Failed to send wechat message. ExceptErrmsg:{}".format(e), "error")
234
235
    def _get_dingtalk_timestamp_and_sign(self) -> Tuple[str, str]:
236
        timestamp = str(round(time.time() * 1000))
237
        secret = self.dingtalk_secret
238
        secret_enc = secret.encode("utf-8")
239
        string_to_sign = "{}\n{}".format(timestamp, secret)
240
        string_to_sign_enc = string_to_sign.encode("utf-8")
241
        hmac_code = hmac.new(
242
            secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
243
        ).digest()
244
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
245
        return timestamp, sign
246
247
    def push_dingtalk(self, title: str, content: str) -> None:
248
        if self.dingtalk_access_token:
249
            try:
250
                headers = {"Content-Type": "application/json; charset=utf-8"}
251
                webhook = "https://oapi.dingtalk.com/robot/send"
252
                params = {"access_token": self.dingtalk_access_token}
253
                if self.dingtalk_secret:
254
                    (
255
                        params["timestamp"],
256
                        params["sign"],
257
                    ) = self._get_dingtalk_timestamp_and_sign()
258
                push_text = f"{title}\n\n{content}" if title else content
259
                data = {"msgtype": "text", "text": {"content": push_text}}
260
                response = requests.post(
261
                    url=webhook, headers=headers, params=params, data=json.dumps(data)
262
                )
263
                response_json = response.json()
264
                errcode = response_json["errcode"]
265
                errmsg = response_json["errmsg"]
266
                if errcode == 0:
267
                    log("Successfully sent DingTalk message")
268
                else:
269
                    log(f"Failed to send Dingtalk message: {errmsg}", level="error")
270
            except Exception as e:
271
                log(f"Failed to send Dingtalk message: {e}", level="error")
272
273
    def notify(self, title: str, content: str = None) -> None:
274
        self.push_serverchan(title, content)
275
        self.push_bark(title, content)
276
        self.push_telegram(title, content)
277
        self.push_wechat(title, content)
278
        self.push_dingtalk(title, content)
279
280
281
class Item:
282
    def __init__(self, title: str, offer_id: str, namespace: str, type: str) -> None:
283
        self.title = title
284
        self.offer_id = offer_id
285
        self.namespace = namespace
286
        self.type = type
287
288
    @property
289
    def purchase_url(self) -> str:
290
        url = "https://www.epicgames.com/store/purchase?lang=en-US&namespace={}&offers={}".format(
291
            self.namespace, self.offer_id
292
        )
293
        return url
294
295
296
class Game:
297
    def __init__(self, base_game: Item, dlcs: List[Item] = []) -> None:
298
        self.base_game = base_game
299
        self.dlcs = dlcs
300
301
    @property
302
    def item_amount(self) -> int:
303
        return len(self.dlcs) + 1
304
305
306
class EpicgamesClaimer:
307
    def __init__(
308
        self,
309
        data_dir: Optional[str] = None,
310
        headless: bool = True,
311
        sandbox: bool = False,
312
        chromium_path: Optional[str] = None,
313
        claimer_notifications: Notifications = None,
314
        timeout: int = 180000,
315
        debug: bool = False,
316
        cookies: str = None,
317
        browser_args: List[str] = [
318
            "--disable-infobars",
319
            "--blink-settings=imagesEnabled=false",
320
            "--no-first-run",
321
            "--disable-gpu",
322
        ],
323
    ) -> None:
324
        self.data_dir = data_dir
325
        self.headless = headless
326
        self.browser_args = browser_args
327
        self.sandbox = sandbox
328
        if not self.sandbox:
329
            self.browser_args.append("--no-sandbox")
330
        self.chromium_path = chromium_path
331
        if "win" in launcher.current_platform() and self.chromium_path == None:
332
            if os.path.exists("chrome-win32"):
333
                self.chromium_path = "chrome-win32/chrome.exe"
334
            elif os.path.exists("chrome-win"):
335
                self.chromium_path = "chrome-win/chrome.exe"
336
        self._loop = asyncio.get_event_loop()
337
        self.browser_opened = False
338
        self.claimer_notifications = (
339
            claimer_notifications if claimer_notifications != None else Notifications()
340
        )
341
        self.timeout = timeout
342
        self.debug = debug
343
        self.cookies = cookies
344
        self.page = None
345
        self.open_browser()
346
347
    def log(self, text: str, level: str = "info") -> None:
348
        localtime = get_current_time()
349
        if level == "info":
350
            print("[{}  INFO] {}".format(localtime, text))
351
        elif level == "warning":
352
            print("\033[33m[{}  WARN] {}\033[0m".format(localtime, text))
353
        elif level == "error":
354
            print("\033[31m[{} ERROR] {}\033[0m".format(localtime, text))
355
        elif level == "debug":
356
            if self.debug:
357
                print("[{} DEBUG] {}".format(localtime, text))
358
359
    async def _headless_stealth_async(self):
360
        original_user_agent = await self.page.evaluate("navigator.userAgent")
361
        user_agent = original_user_agent.replace("Headless", "")
362
        await self.page.evaluateOnNewDocument(
363
            "() => {Object.defineProperty(navigator, 'webdriver', {get: () => false})}"
364
        )
365
        await self.page.evaluateOnNewDocument(
366
            "window.chrome = {'loadTimes': {}, 'csi': {}, 'app': {'isInstalled': false, 'getDetails': {}, 'getIsInstalled': {}, 'installState': {}, 'runningState': {}, 'InstallState': {'DISABLED': 'disabled', 'INSTALLED': 'installed', 'NOT_INSTALLED': 'not_installed'}, 'RunningState': {'CANNOT_RUN': 'cannot_run', 'READY_TO_RUN': 'ready_to_run', 'RUNNING': 'running'}}, 'webstore': {'onDownloadProgress': {'addListener': {}, 'removeListener': {}, 'hasListener': {}, 'hasListeners': {}, 'dispatch': {}}, 'onInstallStageChanged': {'addListener': {}, 'removeListener': {}, 'hasListener': {}, 'hasListeners': {}, 'dispatch': {}}, 'install': {}, 'ErrorCode': {'ABORTED': 'aborted', 'BLACKLISTED': 'blacklisted', 'BLOCKED_BY_POLICY': 'blockedByPolicy', 'ICON_ERROR': 'iconError', 'INSTALL_IN_PROGRESS': 'installInProgress', 'INVALID_ID': 'invalidId', 'INVALID_MANIFEST': 'invalidManifest', 'INVALID_WEBSTORE_RESPONSE': 'invalidWebstoreResponse', 'LAUNCH_FEATURE_DISABLED': 'launchFeatureDisabled', 'LAUNCH_IN_PROGRESS': 'launchInProgress', 'LAUNCH_UNSUPPORTED_EXTENSION_TYPE': 'launchUnsupportedExtensionType', 'MISSING_DEPENDENCIES': 'missingDependencies', 'NOT_PERMITTED': 'notPermitted', 'OTHER_ERROR': 'otherError', 'REQUIREMENT_VIOLATIONS': 'requirementViolations', 'USER_CANCELED': 'userCanceled', 'WEBSTORE_REQUEST_ERROR': 'webstoreRequestError'}, 'InstallStage': {'DOWNLOADING': 'downloading', 'INSTALLING': 'installing'}}}"
367
        )
368
        await self.page.evaluateOnNewDocument(
369
            "() => {Reflect.defineProperty(navigator.connection,'rtt', {get: () => 200, enumerable: true})}"
370
        )
371
        await self.page.evaluateOnNewDocument(
372
            "() => {Object.defineProperty(navigator, 'plugins', {get: () => [{'description': 'Portable Document Format', 'filename': 'internal-pdf-viewer', 'length': 1, 'name': 'Chrome PDF Plugin'}, {'description': '', 'filename': 'mhjfbmdgcfjbbpaeojofohoefgiehjai', 'length': 1, 'name': 'Chromium PDF Viewer'}, {'description': '', 'filename': 'internal-nacl-plugin', 'length': 2, 'name': 'Native Client'}]})}"
373
        )
374
        await self.page.evaluateOnNewDocument(
375
            "() => {const newProto = navigator.__proto__; delete newProto.webdriver; navigator.__proto__ = newProto}"
376
        )
377
        await self.page.evaluateOnNewDocument(
378
            "const getParameter = WebGLRenderingContext.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) {if (parameter === 37445) {return 'Intel Open Source Technology Center';}; if (parameter === 37446) {return 'Mesa DRI Intel(R) Ivybridge Mobile ';}; return getParameter(parameter);}"
379
        )
380
        await self.page.evaluateOnNewDocument(
381
            "() => {Reflect.defineProperty(navigator, 'mimeTypes', {get: () => [{type: 'application/pdf', suffixes: 'pdf', description: '', enabledPlugin: Plugin}, {type: 'application/x-google-chrome-pdf', suffixes: 'pdf', description: 'Portable Document Format', enabledPlugin: Plugin}, {type: 'application/x-nacl', suffixes: '', description: 'Native Client Executable', enabledPlugin: Plugin}, {type: 'application/x-pnacl', suffixes: '', description: 'Portable Native Client Executable', enabledPlugin: Plugin}]})}"
382
        )
383
        await self.page.evaluateOnNewDocument(
384
            "() => {const p = {'defaultRequest': null, 'receiver': null}; Reflect.defineProperty(navigator, 'presentation', {get: () => p})}"
385
        )
386
        await self.page.setExtraHTTPHeaders(
387
            {"Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8"}
388
        )
389
        await self.page.setUserAgent(user_agent)
390
391
    async def _open_browser_async(self) -> None:
392
        if not self.browser_opened:
393
            self.browser = await launch(
394
                options={"args": self.browser_args, "headless": self.headless},
395
                userDataDir=None
396
                if self.data_dir == None
397
                else os.path.abspath(self.data_dir),
398
                executablePath=self.chromium_path,
399
            )
400
            self.page = (await self.browser.pages())[0]
401
            await self.page.setViewport({"width": 1000, "height": 600})
402
            # Async callback functions aren't possible to use (Refer to https://github.com/pyppeteer/pyppeteer/issues/220).
403
            # await self.page.setRequestInterception(True)
404
            # self.page.on('request', self._intercept_request_async)
405
            if self.headless:
406
                await self._headless_stealth_async()
407
            self.browser_opened = True
408
            if self.cookies:
409
                await self._load_cookies_async(self.cookies)
410
            if self.data_dir != None:
411
                cookies_path = os.path.join(self.data_dir, "cookies.json")
412
                if os.path.exists(cookies_path):
413
                    await self._load_cookies_async(cookies_path)
414
                    os.remove(cookies_path)
415
        # await self._refresh_cookies_async()
416
417
    async def _refresh_cookies_async(self) -> None:
418
        await self._navigate_async("https://www.epicgames.com/store/en-US/")
419
420
    async def _intercept_request_async(self, request: Request) -> None:
421
        if request.resourceType in ["image", "media", "font"]:
422
            await request.abort()
423
        else:
424
            await request.continue_()
425
426
    async def _close_browser_async(self):
427
        if self.browser_opened:
428
            if self.cookies:
429
                await self._save_cookies_async(self.cookies)
430
            await self.browser.close()
431
            self.browser_opened = False
432
433
    async def _type_async(
434
        self, selector: str, text: str, sleep: Union[int, float] = 0
435
    ) -> None:
436
        await self.page.waitForSelector(selector)
437
        await asyncio.sleep(sleep)
438
        await self.page.type(selector, text)
439
440
    async def _click_async(
441
        self,
442
        selector: str,
443
        sleep: Union[int, float] = 2,
444
        timeout: int = 30000,
445
        frame_index: int = 0,
446
    ) -> None:
447
        if frame_index == 0:
448
            await self.page.waitForSelector(selector, options={"timeout": timeout})
449
            await asyncio.sleep(sleep)
450
            await self.page.click(selector)
451
        else:
452
            await self.page.waitForSelector(
453
                "iframe:nth-child({})".format(frame_index), options={"timeout": timeout}
454
            )
455
            frame = self.page.frames[frame_index]
456
            await frame.waitForSelector(selector)
457
            await asyncio.sleep(sleep)
458
            await frame.click(selector)
459
460
    async def _get_text_async(self, selector: str) -> str:
461
        await self.page.waitForSelector(selector)
462
        return await (
463
            await (await self.page.querySelector(selector)).getProperty("textContent")
464
        ).jsonValue()
465
466
    async def _get_texts_async(self, selector: str) -> List[str]:
467
        texts = []
468
        try:
469
            await self.page.waitForSelector(selector)
470
            for element in await self.page.querySelectorAll(selector):
471
                texts.append(
472
                    await (await element.getProperty("textContent")).jsonValue()
473
                )
474
        except:
475
            pass
476
        return texts
477
478
    async def _get_element_text_async(self, element: ElementHandle) -> str:
479
        return await (await element.getProperty("textContent")).jsonValue()
480
481
    async def _get_property_async(self, selector: str, property: str) -> str:
482
        await self.page.waitForSelector(selector)
483
        return await self.page.evaluate(
484
            "document.querySelector('{}').getAttribute('{}')".format(selector, property)
485
        )
486
487
    async def _get_links_async(
488
        self, selector: str, filter_selector: str, filter_value: str
489
    ) -> List[str]:
490
        links = []
491
        try:
492
            await self.page.waitForSelector(selector)
493
            elements = await self.page.querySelectorAll(selector)
494
            judgement_texts = await self._get_texts_async(filter_selector)
495
        except:
496
            return []
497
        for element, judgement_text in zip(elements, judgement_texts):
498
            if judgement_text == filter_value:
499
                link = await (await element.getProperty("href")).jsonValue()
500
                links.append(link)
501
        return links
502
503
    async def _find_async(
504
        self, selectors: Union[str, List[str]], timeout: int = None, frame: Frame = None
505
    ) -> Union[bool, int]:
506
        if frame == None:
507
            frame = self.page
508
        if type(selectors) == str:
509
            try:
510
                if timeout == None:
511
                    timeout = 1000
512
                await frame.waitForSelector(selectors, options={"timeout": timeout})
513
                return True
514
            except:
515
                return False
516
        elif type(selectors) == list:
517
            if timeout == None:
518
                timeout = 300000
519
            for _ in range(int(timeout / 1000 / len(selectors))):
520
                for i in range(len(selectors)):
521
                    if await self._find_async(selectors[i], timeout=1000, frame=frame):
522
                        return i
523
            return -1
524
        else:
525
            raise ValueError
526
527
    async def _try_click_async(
528
        self, selector: str, sleep: Union[int, float] = 2
529
    ) -> bool:
530
        try:
531
            await asyncio.sleep(sleep)
532
            await self.page.click(selector)
533
            return True
534
        except:
535
            return False
536
537
    async def _get_elements_async(
538
        self, selector: str
539
    ) -> Union[List[ElementHandle], None]:
540
        try:
541
            await self.page.waitForSelector(selector)
542
            return await self.page.querySelectorAll(selector)
543
        except:
544
            return None
545
546
    async def _wait_for_element_text_change_async(
547
        self, element: ElementHandle, text: str, timeout: int = 30
548
    ) -> None:
549
        if await self._get_element_text_async(element) != text:
550
            return
551
        for _ in range(timeout):
552
            await asyncio.sleep(1)
553
            if await self._get_element_text_async(element) != text:
554
                return
555
        raise TimeoutError(
556
            'Waiting for element "{}" text content change failed: timeout {}s exceeds'.format(
557
                element, timeout
558
            )
559
        )
560
561
    async def _navigate_async(
562
        self, url: str, timeout: int = 30000, reload: bool = True
563
    ) -> None:
564
        if self.page.url == url and not reload:
565
            return
566
        await self.page.goto(url, options={"timeout": timeout})
567
568
    async def _get_json_async(self, url: str, arguments: Dict[str, str] = None) -> dict:
569
        response_text = await self._get_async(url, arguments)
570
        try:
571
            response_json = json.loads(response_text)
572
        except JSONDecodeError:
573
            response_text_partial = (
574
                response_text if len(response_text) <= 96 else response_text[0:96]
575
            )
576
            raise ValueError(
577
                "Epic Games returnes content that cannot be resolved. Response: {} ...".format(
578
                    response_text_partial
579
                )
580
            )
581
        return response_json
582
583
    async def _login_async(
584
        self,
585
        email: str,
586
        password: str,
587
        verifacation_code: str = None,
588
        interactive: bool = True,
589
        remember_me: bool = True,
590
    ) -> None:
591
        self.log("Start to login.", level="debug")
592
        if email == None or email == "":
593
            raise ValueError("Email can't be null.")
594
        if password == None or password == "":
595
            raise ValueError("Password can't be null.")
596
        await self._navigate_async(
597
            "https://www.epicgames.com/store/en-US/", timeout=self.timeout, reload=False
598
        )
599
        await self._click_async("#user", timeout=self.timeout)
600
        await self._click_async("#login-with-epic", timeout=self.timeout)
601
        await self._type_async("#email", email)
602
        await self._type_async("#password", password)
603
        if not remember_me:
604
            await self._click_async("#rememberMe")
605
        await self._click_async("#sign-in[tabindex='0']", timeout=self.timeout)
606
        login_result = await self._find_async(
607
            [
608
                "#talon_frame_login_prod[style*=visible]",
609
                "div.MuiPaper-root[role=alert] h6[class*=subtitle1]",
610
                "input[name=code-input-0]",
611
                "#user",
612
            ],
613
            timeout=self.timeout,
614
        )
615
        if login_result == -1:
616
            raise TimeoutError("Chcek login result timeout.")
617
        elif login_result == 0:
618
            raise PermissionError("CAPTCHA is required for unknown reasons.")
619
        elif login_result == 1:
620
            alert_text = await self._get_text_async(
621
                "div.MuiPaper-root[role=alert] h6[class*=subtitle1]"
622
            )
623
            raise PermissionError("From Epic Games: {}".format(alert_text))
624
        elif login_result == 2:
625
            if interactive:
626
                await self._type_async(
627
                    "input[name=code-input-0]", input("Verification code: ")
628
                )
629
            else:
630
                await self._type_async("input[name=code-input-0]", verifacation_code)
631
            await self._click_async("#continue[tabindex='0']", timeout=self.timeout)
632
            verify_result = await self._find_async(
633
                ["#modal-content div[role*=alert]", "#user"], timeout=self.timeout
634
            )
635
            if verify_result == -1:
636
                raise TimeoutError("Chcek login result timeout.")
637
            elif verify_result == 0:
638
                alert_text = await self._get_text_async(
639
                    "#modal-content div[role*=alert]"
640
                )
641
                raise PermissionError("From Epic Games: {}".format(alert_text))
642
        self.log("Login end.", level="debug")
643
644
    async def _need_login_async(self, use_api: bool = False) -> bool:
645
        need_login = False
646
        if use_api:
647
            page_content_json = await self._get_json_async(
648
                "https://www.epicgames.com/account/v2/ajaxCheckLogin"
649
            )
650
            need_login = page_content_json["needLogin"]
651
        else:
652
            await self._navigate_async(
653
                "https://www.epicgames.com/store/en-US/", timeout=self.timeout
654
            )
655
            if (
656
                await self._get_property_async("#user", "data-component")
657
            ) == "SignedIn":
658
                need_login = False
659
            else:
660
                need_login = True
661
        self.log(f"Need Login: {need_login}.", level="debug")
662
        return need_login
663
664
    async def _get_authentication_method_async(self) -> Optional[str]:
665
        page_content_json = await self._get_json_async(
666
            "https://www.epicgames.com/account/v2/security/settings/ajaxGet"
667
        )
668
        if page_content_json["settings"]["enabled"] == False:
669
            return None
670
        else:
671
            return page_content_json["settings"]["defaultMethod"]
672
673
    def _quit(self, signum=None, frame=None) -> None:
674
        try:
675
            self.close_browser()
676
        except:
677
            pass
678
        exit(1)
679
680
    def _screenshot(self, path: str) -> None:
681
        return self._loop.run_until_complete(self.page.screenshot({"path": path}))
682
683
    async def _post_json_async(
684
        self,
685
        url: str,
686
        data: str,
687
        host: str = "www.epicgames.com",
688
        sleep: Union[int, float] = 2,
689
    ):
690
        await asyncio.sleep(sleep)
691
        if not host in self.page.url:
692
            await self._navigate_async("https://{}".format(host))
693
        response = await self.page.evaluate(
694
            """
695
            xmlhttp = new XMLHttpRequest();
696
            xmlhttp.open("POST", "{}", true);
697
            xmlhttp.setRequestHeader("Content-Type", "application/json;charset=UTF-8");
698
            xmlhttp.send('{}');
699
            xmlhttp.responseText;
700
        """.format(
701
                url, data
702
            )
703
        )
704
        return response
705
706
    async def _post_async(
707
        self,
708
        url: str,
709
        data: dict,
710
        host: str = "www.epicgames.com",
711
        sleep: Union[int, float] = 2,
712
    ) -> str:
713
        await asyncio.sleep(sleep)
714
        if not host in self.page.url:
715
            await self._navigate_async("https://{}".format(host))
716
        evaluate_form = "var form = new FormData();\n"
717
        for key, value in data.items():
718
            evaluate_form += "form.append(`{}`, `{}`);\n".format(key, value)
719
        response = await self.page.evaluate(
720
            evaluate_form
721
            + """
722
            var form = new FormData();
723
            xmlhttp = new XMLHttpRequest();
724
            xmlhttp.open("POST", `{}`, true);
725
            xmlhttp.send(form);
726
            xmlhttp.responseText;
727
        """.format(
728
                url
729
            )
730
        )
731
        return response
732
733
    async def _get_account_id_async(self):
734
        if await self._need_login_async():
735
            return None
736
        else:
737
            await self._navigate_async("https://www.epicgames.com/account/personal")
738
            account_id = (
739
                await self._get_text_async("#personalView div.paragraph-container p")
740
            ).split(": ")[1]
741
            return account_id
742
743
    async def _get_async(
744
        self, url: str, arguments: Dict[str, str] = None, sleep: Union[int, float] = 2
745
    ):
746
        args = ""
747
        if arguments != None:
748
            args = "?"
749
            for key, value in arguments.items():
750
                args += "{}={}&".format(key, value)
751
            args = args.rstrip("&")
752
        await self._navigate_async(url + args)
753
        response_text = await self._get_text_async("body")
754
        await asyncio.sleep(sleep)
755
        return response_text
756
757
    async def _get_game_infos_async(self, url_slug: str):
758
        game_infos = {}
759
        response = await self._get_json_async(
760
            "https://store-content.ak.epicgames.com/api/en-US/content/products/{}".format(
761
                url_slug
762
            )
763
        )
764
        game_infos["product_name"] = response["productName"]
765
        game_infos["namespace"] = response["namespace"]
766
        game_infos["pages"] = []
767
        for page in response["pages"]:
768
            game_info_page = {}
769
            if page["offer"]["hasOffer"]:
770
                game_info_page["offer_id"] = page["offer"]["id"]
771
                game_info_page["namespace"] = page["offer"]["namespace"]
772
                game_infos["pages"].append(game_info_page)
773
        return game_infos
774
775
    def _get_purchase_url(self, namespace: str, offer_id: str):
776
        purchase_url = "https://www.epicgames.com/store/purchase?lang=en-US&namespace={}&offers={}".format(
777
            namespace, offer_id
778
        )
779
        return purchase_url
780
781
    async def _get_weekly_free_base_games_async(self) -> List[Item]:
782
        response_text = await self._get_async(
783
            "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
784
        )
785
        response_json = json.loads(response_text)
786
        base_games = []
787
        for item in response_json["data"]["Catalog"]["searchStore"]["elements"]:
788
            if {"path": "freegames"} in item["categories"]:
789
                if (
790
                    item["price"]["totalPrice"]["discountPrice"] == 0
791
                    and item["price"]["totalPrice"]["originalPrice"] != 0
792
                ):
793
                    if item["offerType"] == "BASE_GAME":
794
                        base_game = Item(
795
                            item["title"], item["id"], item["namespace"], "BASE_GAME"
796
                        )
797
                        base_games.append(base_game)
798
        return base_games
799
800
    async def _get_weekly_free_items_async(
801
        self, user_country: str = "CN"
802
    ) -> List[Item]:
803
        try:
804
            user_country = await self._get_user_country_async()
805
        except:
806
            pass
807
        response_text = await self._get_async(
808
            f"https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions?country={user_country}&allowCountries={user_country}"
809
        )
810
        response_json = json.loads(response_text)
811
        items = []
812
        for item in response_json["data"]["Catalog"]["searchStore"]["elements"]:
813
            if item["status"] == "ACTIVE":
814
                if {"path": "freegames"} in item["categories"]:
815
                    if item["price"]["totalPrice"]["discountPrice"] == 0:
816
                        if item["promotions"] != None:
817
                            if (
818
                                item["promotions"]["promotionalOffers"] != []
819
                                and item["promotions"]["promotionalOffers"] != None
820
                            ):
821
                                items.append(
822
                                    Item(
823
                                        item["title"],
824
                                        item["id"],
825
                                        item["namespace"],
826
                                        item["offerType"],
827
                                    )
828
                                )
829
        return items
830
831
    async def _get_free_dlcs_async(self, namespace: str) -> List[Item]:
832
        args = {
833
            "query": "query searchStoreQuery($namespace: String, $category: String, $freeGame: Boolean, $count: Int){Catalog{searchStore(namespace: $namespace, category: $category, freeGame: $freeGame, count: $count){elements{title id namespace}}}}",
834
            "variables": '{{"namespace": "{}", "category": "digitalextras/book|addons|digitalextras/soundtrack|digitalextras/video", "freeGame": true, "count": 1000}}'.format(
835
                namespace
836
            ),
837
        }
838
        response = await self._get_json_async("https://www.epicgames.com/graphql", args)
839
        free_dlcs = []
840
        for item in response["data"]["Catalog"]["searchStore"]["elements"]:
841
            free_dlc = Item(item["title"], item["id"], item["namespace"], "DLC")
842
            free_dlcs.append(free_dlc)
843
        return free_dlcs
844
845
    async def _get_free_base_game_async(self, namespace: str) -> Optional[Item]:
846
        args = {
847
            "query": "query searchStoreQuery($namespace: String, $category: String, $freeGame: Boolean, $count: Int){Catalog{searchStore(namespace: $namespace, category: $category, freeGame: $freeGame, count: $count){elements{title id namespace}}}}",
848
            "variables": '{{"namespace": "{}", "category": "games/edition/base", "freeGame": true, "count": 1000}}'.format(
849
                namespace
850
            ),
851
        }
852
        response = await self._get_json_async("https://www.epicgames.com/graphql", args)
853
        if len(response["data"]["Catalog"]["searchStore"]["elements"]) > 0:
854
            base_game_info = response["data"]["Catalog"]["searchStore"]["elements"][0]
855
            base_game = Item(
856
                base_game_info["title"],
857
                base_game_info["id"],
858
                base_game_info["namespace"],
859
                "BASE_GAME",
860
            )
861
            return base_game
862
863
    async def _get_weekly_free_games_async(self) -> List[Game]:
864
        free_items = await self._get_weekly_free_items_async()
865
        free_games = []
866
        for item in free_items:
867
            if item.type == "BASE_GAME":
868
                free_dlcs = await self._get_free_dlcs_async(item.namespace)
869
                free_games.append(Game(item, free_dlcs))
870
            elif item.type == "DLC":
871
                free_base_game = await self._get_free_base_game_async(item.namespace)
872
                if free_base_game != None:
873
                    free_dlcs = await self._get_free_dlcs_async(
874
                        free_base_game.namespace
875
                    )
876
                    free_games.append(Game(free_base_game, free_dlcs))
877
            else:
878
                free_base_game = await self._get_free_base_game_async(item.namespace)
879
                if free_base_game == None:
880
                    free_games.append(Game(item))
881
                else:
882
                    free_dlcs = await self._get_free_dlcs_async(
883
                        free_base_game.namespace
884
                    )
885
                    free_games.append(Game(free_base_game, free_dlcs))
886
        return free_games
887
888
    async def _claim_async(self, item: Item) -> None:
889
        async def findx_async(
890
            items: List[Dict[str, Union[str, bool, int]]], timeout: int
891
        ) -> int:
892
            for _ in range(int(timeout / 1000 / (len(items)))):
893
                for i in range(0, len(items)):
894
                    if items[i]["exist"]:
895
                        if await self._find_async(
896
                            items[i]["selector"],
897
                            timeout=1000,
898
                            frame=self.page.frames[items[i]["frame"]],
899
                        ):
900
                            return i
901
                    else:
902
                        if not await self._find_async(
903
                            items[i]["selector"],
904
                            timeout=1000,
905
                            frame=self.page.frames[items[i]["frame"]],
906
                        ):
907
                            return i
908
            return -1
909
910
        await self._navigate_async(item.purchase_url, timeout=self.timeout)
911
        await self._click_async(
912
            "#purchase-app button[class*=confirm]:not([disabled])", timeout=self.timeout
913
        )
914
        await self._try_click_async(
915
            "#purchaseAppContainer div.payment-overlay button.payment-btn--primary"
916
        )
917
        result = await findx_async(
918
            [
919
                {
920
                    "selector": "#purchase-app div[class*=alert]",
921
                    "exist": True,
922
                    "frame": 0,
923
                },
924
                {"selector": "div.MuiDialog-root", "exist": True, "frame": 1},
925
                {"selector": "#purchase-app > div", "exist": False, "frame": 0},
926
            ],
927
            timeout=self.timeout,
928
        )
929
        if result == -1:
930
            raise TimeoutError("Timeout when claiming")
931
        elif result == 0:
932
            message = await self._get_text_async(
933
                "#purchase-app div[class*=alert]:not([disabled])"
934
            )
935
            raise PermissionError(message)
936
        elif result == 1:
937
            raise PermissionError("CAPTCHA is required for unknown reasons")
938
        else:
939
            owned = await self._is_owned_async(item.offer_id, item.namespace)
940
            if not owned:
941
                raise RuntimeError(
942
                    "An item was mistakenly considered to have been claimed"
943
                )
944
945
    async def _screenshot_async(self, path: str) -> None:
946
        await self.page.screenshot({"path": path})
947
948
    def add_quit_signal(self):
949
        signal.signal(signal.SIGINT, self._quit)
950
        signal.signal(signal.SIGTERM, self._quit)
951
        if "SIGBREAK" in dir(signal):
952
            signal.signal(signal.SIGBREAK, self._quit)
953
        if "SIGHUP" in dir(signal):
954
            signal.signal(signal.SIGHUP, self._quit)
955
956
    async def _is_owned_async(self, offer_id: str, namespace: str) -> bool:
957
        args = {
958
            "query": "query launcherQuery($namespace: String!, $offerId: String!){Launcher{entitledOfferItems(namespace: $namespace, offerId: $offerId){entitledToAllItemsInOffer}}}",
959
            "variables": '{{"namespace": "{}", "offerId": "{}"}}'.format(
960
                namespace, offer_id
961
            ),
962
        }
963
        response = await self._get_json_async("https://www.epicgames.com/graphql", args)
964
        try:
965
            owned = response["data"]["Launcher"]["entitledOfferItems"][
966
                "entitledToAllItemsInOffer"
967
            ]
968
        except:
969
            raise ValueError("The returned data seems to be incorrect.")
970
        return owned
971
972
    async def _get_user_country_async(self) -> None:
973
        response = await self._get_json_async(
974
            "https://www.epicgames.com/account/v2/personal/ajaxGet"
975
        )
976
        try:
977
            country = response["userInfo"]["country"]["value"]
978
        except:
979
            raise ValueError("The returned data seems to be incorrect.")
980
        return country
981
982
    async def _try_get_webpage_content_async(self) -> Optional[str]:
983
        try:
984
            if self.browser_opened:
985
                webpage_content = await self._get_text_async("body")
986
                return webpage_content
987
        except:
988
            pass
989
990
    def _async_auto_retry(
991
        self,
992
        retries: int,
993
        error_message: str,
994
        error_notification: str,
995
        raise_error: bool = True,
996
    ) -> None:
997
        def retry(func: Callable) -> Callable:
998
            async def wrapper(*arg, **kw):
999
                for i in range(retries):
1000
                    try:
1001
                        await func(*arg, **kw)
1002
                        break
1003
                    except Exception as e:
1004
                        if i < retries - 1:
1005
                            self.log(f"{e}", level="warning")
1006
                        else:
1007
                            self.log(f"{error_message}{e}", "error")
1008
                            self.claimer_notifications.notify(
1009
                                NOTIFICATION_TITLE_ERROR, f"{error_notification}{e}"
1010
                            )
1011
                            await self._screenshot_async("screenshot.png")
1012
                            if raise_error:
1013
                                raise e
1014
1015
            return wrapper
1016
1017
        return retry
1018
1019
    async def _run_once_async(
1020
        self,
1021
        interactive: bool = True,
1022
        email: str = None,
1023
        password: str = None,
1024
        verification_code: str = None,
1025
        retries: int = 3,
1026
        raise_error: bool = False,
1027
    ) -> List[str]:
1028
        @self._async_auto_retry(
1029
            retries,
1030
            "Failed to open the browser: ",
1031
            NOTIFICATION_CONTENT_OPEN_BROWSER_FAILED,
1032
        )
1033
        async def run_open_browser():
1034
            if not self.browser_opened:
1035
                await self._open_browser_async()
1036
1037
        @self._async_auto_retry(
1038
            retries, "Failed to login: ", NOTIFICATION_CONTENT_LOGIN_FAILED
1039
        )
1040
        async def run_login(
1041
            interactive: bool,
1042
            email: Optional[str],
1043
            password: Optional[str],
1044
            verification_code: str = None,
1045
        ):
1046
            if await self._need_login_async():
1047
                if interactive:
1048
                    self.log("Need login")
1049
                    self.claimer_notifications.notify(
1050
                        NOTIFICATION_TITLE_NEED_LOGIN, NOTIFICATION_CONTENT_NEED_LOGIN
1051
                    )
1052
                    await self._close_browser_async()
1053
                    email = input("Email: ")
1054
                    password = getpass("Password: ")
1055
                    await self._open_browser_async()
1056
                    await self._login_async(email, password)
1057
                    self.log("Login successful")
1058
                else:
1059
                    await self._login_async(
1060
                        email, password, verification_code, interactive=False
1061
                    )
1062
1063
        async def run_claim() -> List[str]:
1064
            claimed_item_titles = []
1065
            owned_item_titles = []
1066
1067
            @self._async_auto_retry(
1068
                retries,
1069
                "Failed to claim one item: ",
1070
                NOTIFICATION_CONTENT_CLAIM_FAILED,
1071
                raise_error=False,
1072
            )
1073
            async def retried_claim(item: Item):
1074
                if not await self._is_owned_async(item.offer_id, item.namespace):
1075
                    await self._claim_async(item)
1076
                    claimed_item_titles.append(item.title)
1077
                    self.log(f"Successfully claimed: {item.title}")
1078
                else:
1079
                    owned_item_titles.append(item.title)
1080
1081
            free_games = await self._get_weekly_free_games_async()
1082
            item_amount = 0
1083
            for game in free_games:
1084
                item_amount += game.item_amount
1085
            for game in free_games:
1086
                await retried_claim(game.base_game)
1087
                for dlc in game.dlcs:
1088
                    await retried_claim(dlc)
1089
            if len(owned_item_titles) == item_amount:
1090
                self.log("All available free games are already in your library")
1091
            if len(claimed_item_titles) != 0:
1092
                claimed_item_titles_string = ""
1093
                for title in claimed_item_titles:
1094
                    claimed_item_titles_string += f"{title}, "
1095
                claimed_item_titles_string = claimed_item_titles_string.rstrip(", ")
1096
                self.claimer_notifications.notify(
1097
                    NOTIFICATION_TITLE_CLAIM_SUCCEED,
1098
                    f"{NOTIFICATION_CONTENT_CLAIM_SUCCEED}{claimed_item_titles_string}",
1099
                )
1100
            if len(claimed_item_titles) + len(owned_item_titles) < item_amount:
1101
                raise PermissionError("Failed to claim some items")
1102
            return claimed_item_titles
1103
1104
        claimed_item_titles = []
1105
        if raise_error:
1106
            await run_open_browser()
1107
            await run_login(interactive, email, password, verification_code)
1108
            claimed_item_titles = await run_claim()
1109
        else:
1110
            try:
1111
                await run_open_browser()
1112
                await run_login(interactive, email, password, verification_code)
1113
                claimed_item_titles = await run_claim()
1114
            except:
1115
                pass
1116
        try:
1117
            await self._close_browser_async()
1118
        except:
1119
            pass
1120
        return claimed_item_titles
1121
1122
    async def _load_cookies_async(self, path: str) -> None:
1123
        with open(path, "r") as cookies_file:
1124
            cookies = cookies_file.read()
1125
            for cookie in json.loads(cookies):
1126
                await self.page.setCookie(cookie)
1127
1128
    async def _save_cookies_async(self, path: str) -> None:
1129
        dir = os.path.dirname(path)
1130
        if not dir == "" and not os.path.exists(dir):
1131
            os.mkdir(dir)
1132
        with open(path, "w") as cookies_file:
1133
            await self.page.cookies()
1134
            cookies = await self.page.cookies()
1135
            cookies_file.write(json.dumps(cookies, separators=(",", ": "), indent=4))
1136
1137
    def open_browser(self) -> None:
1138
        return self._loop.run_until_complete(self._open_browser_async())
1139
1140
    def close_browser(self) -> None:
1141
        return self._loop.run_until_complete(self._close_browser_async())
1142
1143
    def need_login(self) -> bool:
1144
        return self._loop.run_until_complete(self._need_login_async())
1145
1146
    def login(
1147
        self,
1148
        email: str,
1149
        password: str,
1150
        verifacation_code: str = None,
1151
        interactive: bool = True,
1152
        remember_me: bool = True,
1153
    ) -> None:
1154
        return self._loop.run_until_complete(
1155
            self._login_async(
1156
                email, password, verifacation_code, interactive, remember_me
1157
            )
1158
        )
1159
1160
    def get_weekly_free_games(self) -> List[Game]:
1161
        return self._loop.run_until_complete(self._get_weekly_free_games_async())
1162
1163
    def run_once(
1164
        self,
1165
        interactive: bool = True,
1166
        email: str = None,
1167
        password: str = None,
1168
        verification_code: str = None,
1169
        retries: int = 3,
1170
        raise_error: bool = False,
1171
    ) -> List[str]:
1172
        return self._loop.run_until_complete(
1173
            self._run_once_async(
1174
                interactive, email, password, verification_code, retries, raise_error
1175
            )
1176
        )
1177
1178
    def scheduled_run(
1179
        self,
1180
        at: str,
1181
        interactive: bool = True,
1182
        email: str = None,
1183
        password: str = None,
1184
        verification_code: str = None,
1185
        retries: int = 3,
1186
    ) -> None:
1187
        self.add_quit_signal()
1188
        schedule.every().day.at(at).do(
1189
            self.run_once, interactive, email, password, verification_code, retries
1190
        )
1191
        while True:
1192
            schedule.run_pending()
1193
            time.sleep(1)
1194
1195
    def load_cookies(self, path: str) -> None:
1196
        return self._loop.run_until_complete(self._load_cookies_async(path))
1197
1198
    def save_cookies(self, path: str) -> None:
1199
        return self._loop.run_until_complete(self._save_cookies_async(path))
1200
1201
    def navigate(self, url: str, timeout: int = 30000, reload: bool = True) -> None:
1202
        return self._loop.run_until_complete(self._navigate_async(url, timeout, reload))
1203
1204
    def find(self, selector: str, timeout: int = None, frame: Frame = None) -> bool:
1205
        return self._loop.run_until_complete(self._find_async(selector, timeout, frame))
1206
1207
    def virtual_console(self) -> None:
1208
        print(
1209
            "You can input JavaScript commands here for testing. Type exit and press Enter to quit."
1210
        )
1211
        while True:
1212
            try:
1213
                command = input("> ")
1214
            except EOFError:
1215
                break
1216
            if command == "exit":
1217
                break
1218
            try:
1219
                result = self._loop.run_until_complete(self.page.evaluate(command))
1220
                print(result)
1221
            except Exception as e:
1222
                print(f"{e}")
1223
1224
1225
def login(cookies_path: str) -> None:
1226
    claimer = EpicgamesClaimer(
1227
        headless=False,
1228
        sandbox=True,
1229
        browser_args=["--disable-infobars", "--no-first-run"],
1230
    )
1231
    claimer.log("Creating user data, please log in in the browser ...")
1232
    claimer.navigate(
1233
        "https://www.epicgames.com/id/login?redirectUrl=https%3A%2F%2Fwww.epicgames.com%2Fstore",
1234
        timeout=0,
1235
    )
1236
    claimer.find("#user[data-component=SignedIn]", timeout=0)
1237
    claimer.save_cookies(cookies_path)
1238
    claimer.log("Login successful")
1239
    claimer.close_browser()
1240
1241
1242
def get_args(run_by_main_script: bool = False) -> argparse.Namespace:
1243
    def update_args_from_env(args: argparse.Namespace) -> argparse.Namespace:
1244
        for key in args.__dict__.keys():
1245
            env = os.environ.get(key.upper())
1246
            if env != None:
1247
                if type(args.__dict__[key]) == int:
1248
                    args.__setattr__(key, int(env))
1249
                elif type(args.__dict__[key]) == bool:
1250
                    if env == "true":
1251
                        args.__setattr__(key, True)
1252
                    elif env == "false":
1253
                        args.__setattr__(key, False)
1254
                else:
1255
                    args.__setattr__(key, env)
1256
        return args
1257
1258
    parser = argparse.ArgumentParser(
1259
        description="Claim weekly free games from Epic Games Store."
1260
    )
1261
    parser.add_argument(
1262
        "-n", "--no-headless", action="store_true", help="run the browser with GUI"
1263
    )
1264
    parser.add_argument(
1265
        "-c", "--chromium-path", type=str, help="set path to browser executable"
1266
    )
1267
    parser.add_argument(
1268
        "-r",
1269
        "--run-at",
1270
        type=str,
1271
        help="set daily check and claim time, format to HH:MM, default to the current time",
1272
    )
1273
    parser.add_argument(
1274
        "-o", "--once", action="store_true", help="claim once then exit"
1275
    )
1276
    if run_by_main_script:
1277
        parser.add_argument(
1278
            "-a", "--auto-update", action="store_true", help="enable auto update"
1279
        )
1280
    if not run_by_main_script:
1281
        parser.add_argument(
1282
            "-e",
1283
            "--external-schedule",
1284
            action="store_true",
1285
            help="run in external schedule mode",
1286
        )
1287
    parser.add_argument(
1288
        "-u", "--email", "--username", type=str, help="set username/email"
1289
    )
1290
    parser.add_argument("-p", "--password", type=str, help="set password")
1291
    parser.add_argument(
1292
        "-t", "--verification-code", type=str, help="set verification code (2FA)"
1293
    )
1294
    parser.add_argument("--cookies", type=str, help="set path to cookies file")
1295
    parser.add_argument(
1296
        "-l", "--login", action="store_true", help="create logged-in user data and quit"
1297
    )
1298
    parser.add_argument("-d", "--debug", action="store_true", help="enable debug mode")
1299
    parser.add_argument(
1300
        "-dt",
1301
        "--debug-timeout",
1302
        type=int,
1303
        default=180000,
1304
        help="set timeout in milliseconds",
1305
    )
1306
    parser.add_argument(
1307
        "-dr", "--debug-retries", type=int, default=3, help="set the number of retries"
1308
    )
1309
    parser.add_argument(
1310
        "-dp",
1311
        "--debug-push-test",
1312
        action="store_true",
1313
        help="Push a notification for testing and quit",
1314
    )
1315
    parser.add_argument(
1316
        "-ds",
1317
        "--debug-show-args",
1318
        action="store_true",
1319
        help="Push a notification for testing and quit",
1320
    )
1321
    parser.add_argument(
1322
        "-ps", "--push-serverchan-sendkey", type=str, help="set ServerChan sendkey"
1323
    )
1324
    parser.add_argument(
1325
        "-pbu",
1326
        "--push-bark-url",
1327
        type=str,
1328
        default="https://api.day.app/push",
1329
        help="set Bark server address",
1330
    )
1331
    parser.add_argument(
1332
        "-pbk", "--push-bark-device-key", type=str, help="set Bark device key"
1333
    )
1334
    parser.add_argument(
1335
        "-ptt", "--push-telegram-bot-token", type=str, help="set Telegram bot token"
1336
    )
1337
    parser.add_argument(
1338
        "-pti", "--push-telegram-chat-id", type=str, help="set Telegram chat ID"
1339
    )
1340
    parser.add_argument(
1341
        "-pwx", "--push-wechat-qywx-am", type=str, help="set WeChat QYWX"
1342
    )
1343
    parser.add_argument(
1344
        "-pda",
1345
        "--push-dingtalk-access-token",
1346
        type=str,
1347
        help="set DingTalk access token",
1348
    )
1349
    parser.add_argument(
1350
        "-pds", "--push-dingtalk-secret", type=str, help="set DingTalk secret"
1351
    )
1352
    parser.add_argument(
1353
        "-ns",
1354
        "--no-startup-notification",
1355
        action="store_true",
1356
        help="disable pushing a notification at startup",
1357
    )
1358
    parser.add_argument(
1359
        "-v",
1360
        "--version",
1361
        action="version",
1362
        version=__version__,
1363
        help="print version information and quit",
1364
    )
1365
    args = parser.parse_args()
1366
    args = update_args_from_env(args)
1367
    if args.run_at == None:
1368
        localtime = time.localtime()
1369
        args.run_at = "{0:02d}:{1:02d}".format(localtime.tm_hour, localtime.tm_min)
1370
    if args.email != None and args.password == None:
1371
        raise ValueError("Must input both username and password.")
1372
    if args.email == None and args.password != None:
1373
        raise ValueError("Must input both username and password.")
1374
    args.interactive = True if args.email == None else False
1375
    args.data_dir = (
1376
        "User_Data/Default" if args.interactive else "User_Data/{}".format(args.email)
1377
    )
1378
    if args.debug_push_test:
1379
        test_notifications = Notifications(
1380
            serverchan_sendkey=args.push_serverchan_sendkey,
1381
            bark_push_url=args.push_bark_url,
1382
            bark_device_key=args.push_bark_device_key,
1383
            telegram_bot_token=args.push_telegram_bot_token,
1384
            telegram_chat_id=args.push_telegram_chat_id,
1385
        )
1386
        test_notifications.notify(NOTIFICATION_TITLE_TEST, NOTIFICATION_CONTENT_TEST)
1387
        exit()
1388
    if args.debug_show_args:
1389
        print(args)
1390
        exit()
1391
    if args.login:
1392
        login("User_Data/Default/cookies.json")
1393
        exit()
1394
    return args
1395
1396
1397
def main(
1398
    args: argparse.Namespace = None, raise_error: bool = False
1399
) -> Optional[List[str]]:
1400
    if args == None:
1401
        args = get_args()
1402
    claimer_notifications = Notifications(
1403
        serverchan_sendkey=args.push_serverchan_sendkey,
1404
        bark_push_url=args.push_bark_url,
1405
        bark_device_key=args.push_bark_device_key,
1406
        telegram_bot_token=args.push_telegram_bot_token,
1407
        telegram_chat_id=args.push_telegram_chat_id,
1408
        wechat_qywx_am=args.push_wechat_qywx_am,
1409
        dingtalk_access_token=args.push_dingtalk_access_token,
1410
        dingtalk_secret=args.push_dingtalk_secret,
1411
    )
1412
    claimer = EpicgamesClaimer(
1413
        args.data_dir,
1414
        headless=not args.no_headless,
1415
        chromium_path=args.chromium_path,
1416
        claimer_notifications=claimer_notifications,
1417
        timeout=args.debug_timeout,
1418
        debug=args.debug,
1419
        cookies=args.cookies,
1420
    )
1421
    if args.once:
1422
        return claimer.run_once(
1423
            args.interactive,
1424
            args.email,
1425
            args.password,
1426
            args.verification_code,
1427
            retries=args.debug_retries,
1428
            raise_error=raise_error,
1429
        )
1430
    elif args.external_schedule:
1431
        if not args.no_startup_notification:
1432
            claimer_notifications.notify(
1433
                NOTIFICATION_TITLE_START, NOTIFICATION_CONTENT_START
1434
            )
1435
        claimer.run_once(
1436
            args.interactive,
1437
            args.email,
1438
            args.password,
1439
            args.verification_code,
1440
            retries=args.debug_retries,
1441
            raise_error=raise_error,
1442
        )
1443
    else:
1444
        if not args.no_startup_notification:
1445
            claimer_notifications.notify(
1446
                NOTIFICATION_TITLE_START, NOTIFICATION_CONTENT_START
1447
            )
1448
        claimer.run_once(
1449
            args.interactive,
1450
            args.email,
1451
            args.password,
1452
            args.verification_code,
1453
            retries=args.debug_retries,
1454
        )
1455
        claimer.scheduled_run(
1456
            args.run_at,
1457
            args.interactive,
1458
            args.email,
1459
            args.password,
1460
            args.verification_code,
1461
        )
1462
1463
1464
# Entry function of SCF
1465
def main_handler(event: Dict[str, str] = None, context: Dict[str, str] = None) -> str:
1466
    cwd = os.getcwd()
1467
    sys.path.append(cwd)
1468
    os.chdir("/tmp")
1469
    args = get_args()
1470
    args.chromium_path = cwd + "/chrome-linux/chrome"
1471
    args.once = True
1472
    claimed_item_titles = main(args, raise_error=True)
1473
    result_message = (
1474
        f"{NOTIFICATION_CONTENT_CLAIM_SUCCEED}{claimed_item_titles}"
1475
        if len(claimed_item_titles)
1476
        else NOTIFICATION_CONTENT_OWNED_ALL
1477
    )
1478
    return result_message
1479
1480
1481
def run(args: argparse.Namespace, check_items: dict) -> str or None:
1482
    args.chromium_path = "chromium-browser"
1483
    args.interactive = False
1484
    args.once = True
1485
1486
    msg_all = ""
1487
    for check_item in check_items:
1488
        args.email = check_item.get("email")
1489
        args.password = check_item.get("password")
1490
        args.data_dir = "User_Data/{}".format(args.email)
1491
        args.cookies = args.data_dir + "/cookies.json"
1492
        if not os.path.exists(args.cookies):
1493
            args.cookies = None
1494
        if not os.path.exists("User_Data"):
1495
            log(
1496
                f"未发现 User_Data 文件夹,判断为初次使用。若遇到人机验证,请在能手动登录浏览器页面的环境(如 Win10)使用 get_cookies.exe/py 获取 cookies.json 并放入 {args.data_dir} 文件夹,然后再尝试",
1497
                level="warning",
1498
            )
1499
        claimed_item_titles = main(args, raise_error=True)
1500
        msg = (
1501
            f"{NOTIFICATION_CONTENT_CLAIM_SUCCEED}{claimed_item_titles}"
1502
            if len(claimed_item_titles)
1503
            else NOTIFICATION_CONTENT_OWNED_ALL
1504
        )
1505
        msg_all += msg + "\n\n"
1506
    return msg_all
1507
1508
1509
def start() -> None:
1510
    args = get_args()
1511
    if args.email and args.password:
1512
        main()
1513
        return
1514
    ENV = get_env_str()
1515
    if ENV == "v2p":
1516
        os.chdir("/usr/local/app/script/Lists")
1517
    elif ENV == "ql":
1518
        os.chdir("/ql/config")
1519
    else:
1520
        main()
1521
        return
1522
    data = get_data()
1523
    _check_items = data.get("EPIC", [])
1524
    res = run(args, check_items=_check_items)
1525
    send("Epicgames", res)
1526
1527
1528
if __name__ == "__main__":
1529
    start()
1530