Passed
Push — master ( d6e860...d6796d )
by Leon
02:41
created

ck_epic.EpicgamesClaimer._load_cookies_async()   A

Complexity

Conditions 3

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
"""
2
:author @luminoleon
3
cron: 50 1 * * *
4
new Env('Epic');
5
"""
6
7
import argparse
8
import asyncio
9
import base64
10
import datetime
11
import hashlib
12
import hmac
13
import json
14
import os
15
import re
16
import signal
17
import sys
18
import time
19
import urllib
20
from getpass import getpass
21
from json.decoder import JSONDecodeError
22
from typing import Callable, Dict, List, Optional, Tuple, Union
23
24
import requests
25
import schedule
26
from pyppeteer import launch, launcher
27
from pyppeteer.element_handle import ElementHandle
28
from pyppeteer.frame_manager import Frame
29
from pyppeteer.network_manager import Request
30
31
from notify_mtr import send
32
from utils import get_data
33
from utils_env import get_env_str
34
35
__version__ = "1.6.7"
36
37
38
NOTIFICATION_TITLE_START = "Epicgames Claimer:启动成功"
39
NOTIFICATION_TITLE_NEED_LOGIN = "Epicgames Claimer:需要登录"
40
NOTIFICATION_TITLE_CLAIM_SUCCEED = "Epicgames Claimer:领取成功"
41
NOTIFICATION_TITLE_ERROR = "EpicGames Claimer:错误"
42
NOTIFICATION_TITLE_TEST = "EpicGames Claimer:测试"
43
NOTIFICATION_CONTENT_START = "如果你收到了此消息,表示你可以正常接收来自Epicgames Claimer的通知推送"
44
NOTIFICATION_CONTENT_NEED_LOGIN = "未登录或登录信息已失效,请检查并尝试重新登录"
45
NOTIFICATION_CONTENT_CLAIM_SUCCEED = "成功领取到游戏:"
46
NOTIFICATION_CONTENT_OPEN_BROWSER_FAILED = "打开浏览器失败:"
47
NOTIFICATION_CONTENT_LOGIN_FAILED = "登录失败:"
48
NOTIFICATION_CONTENT_CLAIM_FAILED = "领取失败:"
49
NOTIFICATION_CONTENT_TEST = "测试是否通知推送已被正确设置"
50
NOTIFICATION_CONTENT_OWNED_ALL = "所有可领取的每周免费游戏已全部在库中"
51
52
53
if "--enable-automation" in launcher.DEFAULT_ARGS:
54
    launcher.DEFAULT_ARGS.remove("--enable-automation")
55
# Solve the issue of zombie processes
56
if "SIGCHLD" in dir(signal):
57
    signal.signal(signal.SIGCHLD, signal.SIG_IGN)
58
59
60
def get_current_time() -> str:
61
    current_time_string = str(datetime.datetime.now()).split(".")[0]
62
    return current_time_string
63
64
65
def log(text: str, level: str = "info") -> None:
66
    localtime = get_current_time()
67
    if level == "info":
68
        print("[{}  INFO] {}".format(localtime, text))
69
    elif level == "warning":
70
        print("\033[33m[{}  WARN] {}\033[0m".format(localtime, text))
71
    elif level == "error":
72
        print("\033[31m[{} ERROR] {}\033[0m".format(localtime, text))
73
74
75
class WeChat:
76
    def __init__(self, corpid, corpsecret, agentid) -> None:
77
        self.corpid = corpid
78
        self.corpsecret = corpsecret
79
        self.agentid = agentid
80
81
    def get_token(self) -> str:
82
        url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}".format(
83
            self.corpid, self.corpsecret
84
        )
85
        response = requests.get(url)
86
        if response.status_code == 200:
87
            return response.json()["access_token"]
88
        else:
89
            log("Failed to get access_token.", level="error")
90
            return ""
91
92
    def send_text(self, message, touser="@all") -> str:
93
        url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(
94
            self.get_token()
95
        )
96
        data = {
97
            "touser": touser,
98
            "msgtype": "text",
99
            "agentid": self.agentid,
100
            "text": {"content": message},
101
            "safe": 0,
102
        }
103
        send_msges = bytes(json.dumps(data), "utf-8")
104
        response = requests.post(url, send_msges)
105
        return response
106
107
    def send_mpnews(self, title, message, media_id, touser="@all") -> str:
108
        url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(
109
            self.get_token()
110
        )
111
        if not message:
112
            message = title
113
        data = {
114
            "touser": touser,
115
            "msgtype": "mpnews",
116
            "agentid": self.agentid,
117
            "mpnews": {
118
                "articles": [
119
                    {
120
                        "title": title,
121
                        "thumb_media_id": media_id,
122
                        "content_source_url": "",
123
                        "content": message.replace("\n", "<br/>"),
124
                        "digest": message,
125
                    }
126
                ]
127
            },
128
            "safe": 0,
129
        }
130
        send_msges = bytes(json.dumps(data), "utf-8")
131
        response = requests.post(url, send_msges)
132
        return response
133
134
135
class Notifications:
136
    def __init__(
137
        self,
138
        serverchan_sendkey: str = None,
139
        bark_push_url: str = "https://api.day.app/push",
140
        bark_device_key: str = None,
141
        telegram_bot_token: str = None,
142
        telegram_chat_id: str = None,
143
        wechat_qywx_am: str = None,
144
        dingtalk_access_token: str = None,
145
        dingtalk_secret: str = None,
146
    ) -> None:
147
        self.serverchan_sendkey = serverchan_sendkey
148
        self.bark_push_url = bark_push_url
149
        self.bark_device_key = bark_device_key
150
        self.telegram_bot_token = telegram_bot_token
151
        self.telegram_chat_id = telegram_chat_id
152
        self.wechat_qywx_am = wechat_qywx_am
153
        self.dingtalk_access_token = dingtalk_access_token
154
        self.dingtalk_secret = dingtalk_secret
155
156
    def push_serverchan(self, title: str, content: str = None) -> None:
157
        if self.serverchan_sendkey != None:
158
            try:
159
                url = "https://sctapi.ftqq.com/{}.send".format(self.serverchan_sendkey)
160
                data = {"title": title}
161
                if content != None:
162
                    data["desp"] = content
163
                requests.post(url, data=data)
164
            except Exception as e:
165
                log("Failed to push to ServerChan: {}".format(e), "error")
166
167
    def push_bark(self, title: str, content: str = None) -> None:
168
        if self.bark_device_key:
169
            try:
170
                response = requests.post(
171
                    url=self.bark_push_url,
172
                    headers={
173
                        "Content-Type": "application/json; charset=utf-8",
174
                    },
175
                    data=json.dumps(
176
                        {
177
                            "body": content,
178
                            "device_key": self.bark_device_key,
179
                            "title": title,
180
                        }
181
                    ),
182
                )
183
                log(f"Bark Response HTTP Status Code: {response.status_code}")
184
                log(f"Bark Response HTTP Response Body: {response.content}")
185
            except Exception as e:
186
                log("Failed to push to Bark: {}".format(e), "error")
187
188
    def push_telegram(self, title: str = None, content: str = None) -> None:
189
        if self.telegram_bot_token:
190
            try:
191
                push_text = f"{title}\n\n{content}" if title else content
192
                response = requests.post(
193
                    url=f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage",
194
                    data={"chat_id": self.telegram_chat_id, "text": push_text},
195
                )
196
                log(f"Telegram Response HTTP Status Code: {response.status_code}")
197
                log(f"Telegram Response HTTP Response Body: {response.content}")
198
            except Exception as e:
199
                log("Failed to push to Telegram: {}".format(e), "error")
200
201
    def push_wechat(self, title: str, content: str = None) -> None:
202
        if self.wechat_qywx_am:
203
            try:
204
                qywx_am_ay = re.split(",", self.wechat_qywx_am)
205
                if 4 < len(qywx_am_ay) > 5:
206
                    log("WeChat AM is invalid.", level="error")
207
                    return
208
                corpid = qywx_am_ay[0]
209
                corpsecret = qywx_am_ay[1]
210
                touser = qywx_am_ay[2]
211
                agentid = qywx_am_ay[3]
212
                try:
213
                    media_id = qywx_am_ay[4]
214
                except:
215
                    media_id = None
216
                wx = WeChat(corpid, corpsecret, agentid)
217
                if media_id != None:
218
                    response = wx.send_mpnews(title, content, media_id, touser)
219
                else:
220
                    message = title + "\n\n" + content
221
                    response = wx.send_text(message, touser)
222
                response = response.json()
223
                if response["errmsg"] == "ok":
224
                    log("Successfully sent wechat message.")
225
                else:
226
                    log(
227
                        "Failed to send wechat message. errmsg: {}".format(
228
                            response["errmsg"]
229
                        ),
230
                        level="error",
231
                    )
232
            except Exception as e:
233
                log("Failed to send wechat message. ExceptErrmsg:{}".format(e), "error")
234
235
    def _get_dingtalk_timestamp_and_sign(self) -> Tuple[str, str]:
236
        timestamp = str(round(time.time() * 1000))
237
        secret = self.dingtalk_secret
238
        secret_enc = secret.encode("utf-8")
239
        string_to_sign = "{}\n{}".format(timestamp, secret)
240
        string_to_sign_enc = string_to_sign.encode("utf-8")
241
        hmac_code = hmac.new(
242
            secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
243
        ).digest()
244
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
245
        return timestamp, sign
246
247
    def push_dingtalk(self, title: str, content: str) -> None:
248
        if self.dingtalk_access_token:
249
            try:
250
                headers = {"Content-Type": "application/json; charset=utf-8"}
251
                webhook = "https://oapi.dingtalk.com/robot/send"
252
                params = {"access_token": self.dingtalk_access_token}
253
                if self.dingtalk_secret:
254
                    (
255
                        params["timestamp"],
256
                        params["sign"],
257
                    ) = self._get_dingtalk_timestamp_and_sign()
258
                push_text = f"{title}\n\n{content}" if title else content
259
                data = {"msgtype": "text", "text": {"content": push_text}}
260
                response = requests.post(
261
                    url=webhook, headers=headers, params=params, data=json.dumps(data)
262
                )
263
                response_json = response.json()
264
                errcode = response_json["errcode"]
265
                errmsg = response_json["errmsg"]
266
                if errcode == 0:
267
                    log("Successfully sent DingTalk message")
268
                else:
269
                    log(f"Failed to send Dingtalk message: {errmsg}", level="error")
270
            except Exception as e:
271
                log(f"Failed to send Dingtalk message: {e}", level="error")
272
273
    def notify(self, title: str, content: str = None) -> None:
274
        self.push_serverchan(title, content)
275
        self.push_bark(title, content)
276
        self.push_telegram(title, content)
277
        self.push_wechat(title, content)
278
        self.push_dingtalk(title, content)
279
280
281
class Item:
282
    def __init__(self, title: str, offer_id: str, namespace: str, type: str) -> None:
283
        self.title = title
284
        self.offer_id = offer_id
285
        self.namespace = namespace
286
        self.type = type
287
288
    @property
289
    def purchase_url(self) -> str:
290
        url = "https://www.epicgames.com/store/purchase?lang=en-US&namespace={}&offers={}".format(
291
            self.namespace, self.offer_id
292
        )
293
        return url
294
295
296
class Game:
297
    def __init__(self, base_game: Item, dlcs: List[Item] = []) -> None:
298
        self.base_game = base_game
299
        self.dlcs = dlcs
300
301
    @property
302
    def item_amount(self) -> int:
303
        return len(self.dlcs) + 1
304
305
306
class EpicgamesClaimer:
307
    def __init__(
308
        self,
309
        data_dir: Optional[str] = None,
310
        headless: bool = True,
311
        sandbox: bool = False,
312
        chromium_path: Optional[str] = None,
313
        claimer_notifications: Notifications = None,
314
        timeout: int = 180000,
315
        debug: bool = False,
316
        cookies: str = None,
317
        browser_args: List[str] = [
318
            "--disable-infobars",
319
            "--blink-settings=imagesEnabled=false",
320
            "--no-first-run",
321
            "--disable-gpu",
322
        ],
323
    ) -> None:
324
        self.data_dir = data_dir
325
        self.headless = headless
326
        self.browser_args = browser_args
327
        self.sandbox = sandbox
328
        if not self.sandbox:
329
            self.browser_args.append("--no-sandbox")
330
        self.chromium_path = chromium_path
331
        if "win" in launcher.current_platform() and self.chromium_path == None:
332
            if os.path.exists("chrome-win32"):
333
                self.chromium_path = "chrome-win32/chrome.exe"
334
            elif os.path.exists("chrome-win"):
335
                self.chromium_path = "chrome-win/chrome.exe"
336
        self._loop = asyncio.get_event_loop()
337
        self.browser_opened = False
338
        self.claimer_notifications = (
339
            claimer_notifications if claimer_notifications != None else Notifications()
340
        )
341
        self.timeout = timeout
342
        self.debug = debug
343
        self.cookies = cookies
344
        self.page = None
345
        self.open_browser()
346
347
    def log(self, text: str, level: str = "info") -> None:
348
        localtime = get_current_time()
349
        if level == "info":
350
            print("[{}  INFO] {}".format(localtime, text))
351
        elif level == "warning":
352
            print("\033[33m[{}  WARN] {}\033[0m".format(localtime, text))
353
        elif level == "error":
354
            print("\033[31m[{} ERROR] {}\033[0m".format(localtime, text))
355
        elif level == "debug":
356
            if self.debug:
357
                print("[{} DEBUG] {}".format(localtime, text))
358
359
    async def _headless_stealth_async(self):
360
        await self.page.evaluateOnNewDocument(
361
            "() => {"
362
            "Object.defineProperty(navigator, 'appVersion', {get: () => '5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3542.0 Safari/537.36',});"
363
            "Object.defineProperty(navigator, 'plugins', {get: () => [{'description': 'Portable Document Format', 'filename': 'internal-pdf-viewer', 'length': 1, 'name': 'Chrome PDF Plugin'}]});"
364
            "Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en'],});"
365
            "const originalQuery = window.navigator.permissions.query;"
366
            "window.navigator.permissions.query = (parameters) => (parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters));"
367
            "window.chrome = {}; window.chrome.app = {'InstallState':'a', 'RunningState':'b', 'getDetails':'c', 'getIsInstalled':'d'}; window.chrome.csi = function(){}; window.chrome.loadTimes = function(){}; window.chrome.runtime = function(){};"
368
            "const newProto = navigator.__proto__; delete newProto.webdriver; navigator.__proto__ = newProto;"
369
            "Reflect.defineProperty(navigator.connection,'rtt', {get: () => 150, enumerable:true});"
370
            "const getParameter = WebGLRenderingContext.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) {if (parameter === 37445) {return 'Intel Open Source Technology Center';}; if (parameter === 37446) {return 'Mesa DRI Intel(R) Ivybridge Mobile ';}; return getParameter(parameter);};"
371
            "['height', 'width'].forEach(property => {const imageDescriptor = Object.getOwnPropertyDescriptor(HTMLImageElement.prototype, property); Object.defineProperty(HTMLImageElement.prototype, property, {...imageDescriptor, get: function() {if (this.complete && this.naturalHeight == 0) {return 16;}; return imageDescriptor.get.apply(this);},});});"
372
            "}"
373
        )
374
        await self.page.evaluateOnNewDocument(
375
            "window.navigator.chrome = {runtime: {}, loadTimes: function() {}, csi: function() {}, app: {}};"
376
        )
377
        await self.page.evaluateOnNewDocument(
378
            "window.navigator.language = {runtime: {}, loadTimes: function() {}, csi: function() {}, app: {}};"
379
        )
380
        await self.page.setExtraHTTPHeaders(
381
            {"Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8"}
382
        )
383
        await self.page.setUserAgent(
384
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3542.0 Safari/537.36"
385
        )
386
387
    async def _open_browser_async(self) -> None:
388
        if not self.browser_opened:
389
            self.browser = await launch(
390
                options={"args": self.browser_args, "headless": self.headless},
391
                userDataDir=None
392
                if self.data_dir == None
393
                else os.path.abspath(self.data_dir),
394
                executablePath=self.chromium_path,
395
            )
396
            self.page = (await self.browser.pages())[0]
397
            await self.page.setViewport({"width": 1000, "height": 600})
398
            # Async callback functions aren't possible to use (Refer to https://github.com/pyppeteer/pyppeteer/issues/220).
399
            # await self.page.setRequestInterception(True)
400
            # self.page.on('request', self._intercept_request_async)
401
            if self.headless:
402
                await self._headless_stealth_async()
403
            self.browser_opened = True
404
            if self.cookies:
405
                await self._load_cookies_async(self.cookies)
406
            if self.data_dir != None:
407
                cookies_path = os.path.join(self.data_dir, "cookies.json")
408
                if os.path.exists(cookies_path):
409
                    await self._load_cookies_async(cookies_path)
410
                    os.remove(cookies_path)
411
        # await self._refresh_cookies_async()
412
413
    async def _refresh_cookies_async(self) -> None:
414
        await self._navigate_async("https://www.epicgames.com/store/en-US/")
415
416
    async def _intercept_request_async(self, request: Request) -> None:
417
        if request.resourceType in ["image", "media", "font"]:
418
            await request.abort()
419
        else:
420
            await request.continue_()
421
422
    async def _close_browser_async(self):
423
        if self.browser_opened:
424
            if self.cookies:
425
                await self._save_cookies_async(self.cookies)
426
            await self.browser.close()
427
            self.browser_opened = False
428
429
    async def _type_async(
430
        self, selector: str, text: str, sleep: Union[int, float] = 0
431
    ) -> None:
432
        await self.page.waitForSelector(selector)
433
        await asyncio.sleep(sleep)
434
        await self.page.type(selector, text)
435
436
    async def _click_async(
437
        self,
438
        selector: str,
439
        sleep: Union[int, float] = 2,
440
        timeout: int = 30000,
441
        frame_index: int = 0,
442
    ) -> None:
443
        if frame_index == 0:
444
            await self.page.waitForSelector(selector, options={"timeout": timeout})
445
            await asyncio.sleep(sleep)
446
            await self.page.click(selector)
447
        else:
448
            await self.page.waitForSelector(
449
                "iframe:nth-child({})".format(frame_index), options={"timeout": timeout}
450
            )
451
            frame = self.page.frames[frame_index]
452
            await frame.waitForSelector(selector)
453
            await asyncio.sleep(sleep)
454
            await frame.click(selector)
455
456
    async def _get_text_async(self, selector: str) -> str:
457
        await self.page.waitForSelector(selector)
458
        return await (
459
            await (await self.page.querySelector(selector)).getProperty("textContent")
460
        ).jsonValue()
461
462
    async def _get_texts_async(self, selector: str) -> List[str]:
463
        texts = []
464
        try:
465
            await self.page.waitForSelector(selector)
466
            for element in await self.page.querySelectorAll(selector):
467
                texts.append(
468
                    await (await element.getProperty("textContent")).jsonValue()
469
                )
470
        except:
471
            pass
472
        return texts
473
474
    async def _get_element_text_async(self, element: ElementHandle) -> str:
475
        return await (await element.getProperty("textContent")).jsonValue()
476
477
    async def _get_property_async(self, selector: str, property: str) -> str:
478
        await self.page.waitForSelector(selector)
479
        return await self.page.evaluate(
480
            "document.querySelector('{}').getAttribute('{}')".format(selector, property)
481
        )
482
483
    async def _get_links_async(
484
        self, selector: str, filter_selector: str, filter_value: str
485
    ) -> List[str]:
486
        links = []
487
        try:
488
            await self.page.waitForSelector(selector)
489
            elements = await self.page.querySelectorAll(selector)
490
            judgement_texts = await self._get_texts_async(filter_selector)
491
        except:
492
            return []
493
        for element, judgement_text in zip(elements, judgement_texts):
494
            if judgement_text == filter_value:
495
                link = await (await element.getProperty("href")).jsonValue()
496
                links.append(link)
497
        return links
498
499
    async def _find_async(
500
        self, selectors: Union[str, List[str]], timeout: int = None, frame: Frame = None
501
    ) -> Union[bool, int]:
502
        if frame == None:
503
            frame = self.page
504
        if type(selectors) == str:
505
            try:
506
                if timeout == None:
507
                    timeout = 1000
508
                await frame.waitForSelector(selectors, options={"timeout": timeout})
509
                return True
510
            except:
511
                return False
512
        elif type(selectors) == list:
513
            if timeout == None:
514
                timeout = 300000
515
            for _ in range(int(timeout / 1000 / len(selectors))):
516
                for i in range(len(selectors)):
517
                    if await self._find_async(selectors[i], timeout=1000, frame=frame):
518
                        return i
519
            return -1
520
        else:
521
            raise ValueError
522
523
    async def _try_click_async(
524
        self, selector: str, sleep: Union[int, float] = 2
525
    ) -> bool:
526
        try:
527
            await asyncio.sleep(sleep)
528
            await self.page.click(selector)
529
            return True
530
        except:
531
            return False
532
533
    async def _get_elements_async(
534
        self, selector: str
535
    ) -> Union[List[ElementHandle], None]:
536
        try:
537
            await self.page.waitForSelector(selector)
538
            return await self.page.querySelectorAll(selector)
539
        except:
540
            return None
541
542
    async def _wait_for_element_text_change_async(
543
        self, element: ElementHandle, text: str, timeout: int = 30
544
    ) -> None:
545
        if await self._get_element_text_async(element) != text:
546
            return
547
        for _ in range(timeout):
548
            await asyncio.sleep(1)
549
            if await self._get_element_text_async(element) != text:
550
                return
551
        raise TimeoutError(
552
            'Waiting for element "{}" text content change failed: timeout {}s exceeds'.format(
553
                element, timeout
554
            )
555
        )
556
557
    async def _navigate_async(
558
        self, url: str, timeout: int = 30000, reload: bool = True
559
    ) -> None:
560
        if self.page.url == url and not reload:
561
            return
562
        await self.page.goto(url, options={"timeout": timeout})
563
564
    async def _get_json_async(self, url: str, arguments: Dict[str, str] = None) -> dict:
565
        response_text = await self._get_async(url, arguments)
566
        try:
567
            response_json = json.loads(response_text)
568
        except JSONDecodeError:
569
            response_text_partial = (
570
                response_text if len(response_text) <= 96 else response_text[0:96]
571
            )
572
            raise ValueError(
573
                "Epic Games returnes content that cannot be resolved. Response: {} ...".format(
574
                    response_text_partial
575
                )
576
            )
577
        return response_json
578
579
    async def _login_async(
580
        self,
581
        email: str,
582
        password: str,
583
        verifacation_code: str = None,
584
        interactive: bool = True,
585
        remember_me: bool = True,
586
    ) -> None:
587
        self.log("Start to login.", level="debug")
588
        if email == None or email == "":
589
            raise ValueError("Email can't be null.")
590
        if password == None or password == "":
591
            raise ValueError("Password can't be null.")
592
        await self._navigate_async(
593
            "https://www.epicgames.com/store/en-US/", timeout=self.timeout, reload=False
594
        )
595
        await self._click_async("#user", timeout=self.timeout)
596
        await self._click_async("#login-with-epic", timeout=self.timeout)
597
        await self._type_async("#email", email)
598
        await self._type_async("#password", password)
599
        if not remember_me:
600
            await self._click_async("#rememberMe")
601
        await self._click_async("#sign-in[tabindex='0']", timeout=self.timeout)
602
        login_result = await self._find_async(
603
            [
604
                "#talon_frame_login_prod[style*=visible]",
605
                "div.MuiPaper-root[role=alert] h6[class*=subtitle1]",
606
                "input[name=code-input-0]",
607
                "#user",
608
            ],
609
            timeout=self.timeout,
610
        )
611
        if login_result == -1:
612
            raise TimeoutError("Chcek login result timeout.")
613
        elif login_result == 0:
614
            raise PermissionError("CAPTCHA is required for unknown reasons.")
615
        elif login_result == 1:
616
            alert_text = await self._get_text_async(
617
                "div.MuiPaper-root[role=alert] h6[class*=subtitle1]"
618
            )
619
            raise PermissionError("From Epic Games: {}".format(alert_text))
620
        elif login_result == 2:
621
            if interactive:
622
                await self._type_async(
623
                    "input[name=code-input-0]", input("Verification code: ")
624
                )
625
            else:
626
                await self._type_async("input[name=code-input-0]", verifacation_code)
627
            await self._click_async("#continue[tabindex='0']", timeout=self.timeout)
628
            verify_result = await self._find_async(
629
                ["#modal-content div[role*=alert]", "#user"], timeout=self.timeout
630
            )
631
            if verify_result == -1:
632
                raise TimeoutError("Chcek login result timeout.")
633
            elif verify_result == 0:
634
                alert_text = await self._get_text_async(
635
                    "#modal-content div[role*=alert]"
636
                )
637
                raise PermissionError("From Epic Games: {}".format(alert_text))
638
        self.log("Login end.", level="debug")
639
640
    async def _need_login_async(self, use_api: bool = False) -> bool:
641
        need_login = False
642
        if use_api:
643
            page_content_json = await self._get_json_async(
644
                "https://www.epicgames.com/account/v2/ajaxCheckLogin"
645
            )
646
            need_login = page_content_json["needLogin"]
647
        else:
648
            await self._navigate_async(
649
                "https://www.epicgames.com/store/en-US/", timeout=self.timeout
650
            )
651
            if (
652
                await self._get_property_async("#user", "data-component")
653
            ) == "SignedIn":
654
                need_login = False
655
            else:
656
                need_login = True
657
        self.log(f"Need Login: {need_login}.", level="debug")
658
        return need_login
659
660
    async def _get_authentication_method_async(self) -> Optional[str]:
661
        page_content_json = await self._get_json_async(
662
            "https://www.epicgames.com/account/v2/security/settings/ajaxGet"
663
        )
664
        if page_content_json["settings"]["enabled"] == False:
665
            return None
666
        else:
667
            return page_content_json["settings"]["defaultMethod"]
668
669
    def _quit(self, signum=None, frame=None) -> None:
670
        try:
671
            self.close_browser()
672
        except:
673
            pass
674
        exit(1)
675
676
    def _screenshot(self, path: str) -> None:
677
        return self._loop.run_until_complete(self.page.screenshot({"path": path}))
678
679
    async def _post_json_async(
680
        self,
681
        url: str,
682
        data: str,
683
        host: str = "www.epicgames.com",
684
        sleep: Union[int, float] = 2,
685
    ):
686
        await asyncio.sleep(sleep)
687
        if not host in self.page.url:
688
            await self._navigate_async("https://{}".format(host))
689
        response = await self.page.evaluate(
690
            """
691
            xmlhttp = new XMLHttpRequest();
692
            xmlhttp.open("POST", "{}", true);
693
            xmlhttp.setRequestHeader("Content-Type", "application/json;charset=UTF-8");
694
            xmlhttp.send('{}');
695
            xmlhttp.responseText;
696
        """.format(
697
                url, data
698
            )
699
        )
700
        return response
701
702
    async def _post_async(
703
        self,
704
        url: str,
705
        data: dict,
706
        host: str = "www.epicgames.com",
707
        sleep: Union[int, float] = 2,
708
    ) -> str:
709
        await asyncio.sleep(sleep)
710
        if not host in self.page.url:
711
            await self._navigate_async("https://{}".format(host))
712
        evaluate_form = "var form = new FormData();\n"
713
        for key, value in data.items():
714
            evaluate_form += "form.append(`{}`, `{}`);\n".format(key, value)
715
        response = await self.page.evaluate(
716
            evaluate_form
717
            + """
718
            var form = new FormData();
719
            xmlhttp = new XMLHttpRequest();
720
            xmlhttp.open("POST", `{}`, true);
721
            xmlhttp.send(form);
722
            xmlhttp.responseText;
723
        """.format(
724
                url
725
            )
726
        )
727
        return response
728
729
    async def _get_account_id_async(self):
730
        if await self._need_login_async():
731
            return None
732
        else:
733
            await self._navigate_async("https://www.epicgames.com/account/personal")
734
            account_id = (
735
                await self._get_text_async("#personalView div.paragraph-container p")
736
            ).split(": ")[1]
737
            return account_id
738
739
    async def _get_async(
740
        self, url: str, arguments: Dict[str, str] = None, sleep: Union[int, float] = 2
741
    ):
742
        args = ""
743
        if arguments != None:
744
            args = "?"
745
            for key, value in arguments.items():
746
                args += "{}={}&".format(key, value)
747
            args = args.rstrip("&")
748
        await self._navigate_async(url + args)
749
        response_text = await self._get_text_async("body")
750
        await asyncio.sleep(sleep)
751
        return response_text
752
753
    async def _get_game_infos_async(self, url_slug: str):
754
        game_infos = {}
755
        response = await self._get_json_async(
756
            "https://store-content.ak.epicgames.com/api/en-US/content/products/{}".format(
757
                url_slug
758
            )
759
        )
760
        game_infos["product_name"] = response["productName"]
761
        game_infos["namespace"] = response["namespace"]
762
        game_infos["pages"] = []
763
        for page in response["pages"]:
764
            game_info_page = {}
765
            if page["offer"]["hasOffer"]:
766
                game_info_page["offer_id"] = page["offer"]["id"]
767
                game_info_page["namespace"] = page["offer"]["namespace"]
768
                game_infos["pages"].append(game_info_page)
769
        return game_infos
770
771
    def _get_purchase_url(self, namespace: str, offer_id: str):
772
        purchase_url = "https://www.epicgames.com/store/purchase?lang=en-US&namespace={}&offers={}".format(
773
            namespace, offer_id
774
        )
775
        return purchase_url
776
777
    async def _get_weekly_free_base_games_async(self) -> List[Item]:
778
        response_text = await self._get_async(
779
            "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
780
        )
781
        response_json = json.loads(response_text)
782
        base_games = []
783
        for item in response_json["data"]["Catalog"]["searchStore"]["elements"]:
784
            if {"path": "freegames"} in item["categories"]:
785
                if (
786
                    item["price"]["totalPrice"]["discountPrice"] == 0
787
                    and item["price"]["totalPrice"]["originalPrice"] != 0
788
                ):
789
                    if item["offerType"] == "BASE_GAME":
790
                        base_game = Item(
791
                            item["title"], item["id"], item["namespace"], "BASE_GAME"
792
                        )
793
                        base_games.append(base_game)
794
        return base_games
795
796
    async def _get_weekly_free_items_async(
797
        self, user_country: str = "CN"
798
    ) -> List[Item]:
799
        try:
800
            user_country = await self._get_user_country_async()
801
        except:
802
            pass
803
        response_text = await self._get_async(
804
            f"https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions?country={user_country}&allowCountries={user_country}"
805
        )
806
        response_json = json.loads(response_text)
807
        items = []
808
        for item in response_json["data"]["Catalog"]["searchStore"]["elements"]:
809
            if item["status"] == "ACTIVE":
810
                if {"path": "freegames"} in item["categories"]:
811
                    if item["price"]["totalPrice"]["discountPrice"] == 0:
812
                        if item["promotions"] != None:
813
                            if (
814
                                item["promotions"]["promotionalOffers"] != []
815
                                and item["promotions"]["promotionalOffers"] != None
816
                            ):
817
                                items.append(
818
                                    Item(
819
                                        item["title"],
820
                                        item["id"],
821
                                        item["namespace"],
822
                                        item["offerType"],
823
                                    )
824
                                )
825
        return items
826
827
    async def _get_free_dlcs_async(self, namespace: str) -> List[Item]:
828
        args = {
829
            "query": "query searchStoreQuery($namespace: String, $category: String, $freeGame: Boolean, $count: Int){Catalog{searchStore(namespace: $namespace, category: $category, freeGame: $freeGame, count: $count){elements{title id namespace}}}}",
830
            "variables": '{{"namespace": "{}", "category": "digitalextras/book|addons|digitalextras/soundtrack|digitalextras/video", "freeGame": true, "count": 1000}}'.format(
831
                namespace
832
            ),
833
        }
834
        response = await self._get_json_async("https://www.epicgames.com/graphql", args)
835
        free_dlcs = []
836
        for item in response["data"]["Catalog"]["searchStore"]["elements"]:
837
            free_dlc = Item(item["title"], item["id"], item["namespace"], "DLC")
838
            free_dlcs.append(free_dlc)
839
        return free_dlcs
840
841
    async def _get_free_base_game_async(self, namespace: str) -> Optional[Item]:
842
        args = {
843
            "query": "query searchStoreQuery($namespace: String, $category: String, $freeGame: Boolean, $count: Int){Catalog{searchStore(namespace: $namespace, category: $category, freeGame: $freeGame, count: $count){elements{title id namespace}}}}",
844
            "variables": '{{"namespace": "{}", "category": "games/edition/base", "freeGame": true, "count": 1000}}'.format(
845
                namespace
846
            ),
847
        }
848
        response = await self._get_json_async("https://www.epicgames.com/graphql", args)
849
        if len(response["data"]["Catalog"]["searchStore"]["elements"]) > 0:
850
            base_game_info = response["data"]["Catalog"]["searchStore"]["elements"][0]
851
            base_game = Item(
852
                base_game_info["title"],
853
                base_game_info["id"],
854
                base_game_info["namespace"],
855
                "BASE_GAME",
856
            )
857
            return base_game
858
859
    async def _get_weekly_free_games_async(self) -> List[Game]:
860
        free_items = await self._get_weekly_free_items_async()
861
        free_games = []
862
        for item in free_items:
863
            if item.type == "BASE_GAME":
864
                free_dlcs = await self._get_free_dlcs_async(item.namespace)
865
                free_games.append(Game(item, free_dlcs))
866
            elif item.type == "DLC":
867
                free_base_game = await self._get_free_base_game_async(item.namespace)
868
                if free_base_game != None:
869
                    free_dlcs = await self._get_free_dlcs_async(
870
                        free_base_game.namespace
871
                    )
872
                    free_games.append(Game(free_base_game, free_dlcs))
873
            else:
874
                free_base_game = await self._get_free_base_game_async(item.namespace)
875
                if free_base_game == None:
876
                    free_games.append(Game(item))
877
                else:
878
                    free_dlcs = await self._get_free_dlcs_async(
879
                        free_base_game.namespace
880
                    )
881
                    free_games.append(Game(free_base_game, free_dlcs))
882
        return free_games
883
884
    async def _claim_async(self, item: Item) -> None:
885
        async def findx_async(
886
            items: List[Dict[str, Union[str, bool, int]]], timeout: int
887
        ) -> int:
888
            for _ in range(int(timeout / 1000 / (len(items)))):
889
                for i in range(0, len(items)):
890
                    if items[i]["exist"]:
891
                        if await self._find_async(
892
                            items[i]["selector"],
893
                            timeout=1000,
894
                            frame=self.page.frames[items[i]["frame"]],
895
                        ):
896
                            return i
897
                    else:
898
                        if not await self._find_async(
899
                            items[i]["selector"],
900
                            timeout=1000,
901
                            frame=self.page.frames[items[i]["frame"]],
902
                        ):
903
                            return i
904
            return -1
905
906
        await self._navigate_async(item.purchase_url, timeout=self.timeout)
907
        await self._click_async(
908
            "#purchase-app button[class*=confirm]:not([disabled])", timeout=self.timeout
909
        )
910
        await self._try_click_async(
911
            "#purchaseAppContainer div.payment-overlay button.payment-btn--primary"
912
        )
913
        result = await findx_async(
914
            [
915
                {
916
                    "selector": "#purchase-app div[class*=alert]",
917
                    "exist": True,
918
                    "frame": 0,
919
                },
920
                {"selector": "div.MuiDialog-root", "exist": True, "frame": 1},
921
                {"selector": "#purchase-app > div", "exist": False, "frame": 0},
922
            ],
923
            timeout=self.timeout,
924
        )
925
        if result == 0:
926
            message = await self._get_text_async(
927
                "#purchase-app div[class*=alert]:not([disabled])"
928
            )
929
            raise PermissionError(message)
930
        elif result == 1:
931
            raise PermissionError("CAPTCHA is required for unknown reasons")
932
        elif result == -1:
933
            raise TimeoutError("Timeout when claiming")
934
        else:
935
            await asyncio.sleep(2)
936
937
    async def _screenshot_async(self, path: str) -> None:
938
        await self.page.screenshot({"path": path})
939
940
    def add_quit_signal(self):
941
        signal.signal(signal.SIGINT, self._quit)
942
        signal.signal(signal.SIGTERM, self._quit)
943
        if "SIGBREAK" in dir(signal):
944
            signal.signal(signal.SIGBREAK, self._quit)
945
        if "SIGHUP" in dir(signal):
946
            signal.signal(signal.SIGHUP, self._quit)
947
948
    async def _is_owned_async(self, offer_id: str, namespace: str) -> bool:
949
        args = {
950
            "query": "query launcherQuery($namespace: String!, $offerId: String!){Launcher{entitledOfferItems(namespace: $namespace, offerId: $offerId){entitledToAllItemsInOffer}}}",
951
            "variables": '{{"namespace": "{}", "offerId": "{}"}}'.format(
952
                namespace, offer_id
953
            ),
954
        }
955
        response = await self._get_json_async("https://www.epicgames.com/graphql", args)
956
        try:
957
            owned = response["data"]["Launcher"]["entitledOfferItems"][
958
                "entitledToAllItemsInOffer"
959
            ]
960
        except:
961
            raise ValueError("The returned data seems to be incorrect.")
962
        return owned
963
964
    async def _get_user_country_async(self) -> None:
965
        response = await self._get_json_async(
966
            "https://www.epicgames.com/account/v2/personal/ajaxGet"
967
        )
968
        try:
969
            country = response["userInfo"]["country"]["value"]
970
        except:
971
            raise ValueError("The returned data seems to be incorrect.")
972
        return country
973
974
    async def _try_get_webpage_content_async(self) -> Optional[str]:
975
        try:
976
            if self.browser_opened:
977
                webpage_content = await self._get_text_async("body")
978
                return webpage_content
979
        except:
980
            pass
981
982
    def _async_auto_retry(
983
        self,
984
        retries: int,
985
        error_message: str,
986
        error_notification: str,
987
        raise_error: bool = True,
988
    ) -> None:
989
        def retry(func: Callable) -> Callable:
990
            async def wrapper(*arg, **kw):
991
                for i in range(retries):
992
                    try:
993
                        await func(*arg, **kw)
994
                        break
995
                    except Exception as e:
996
                        if i < retries - 1:
997
                            self.log(f"{e}", level="warning")
998
                        else:
999
                            self.log(f"{error_message}{e}", "error")
1000
                            self.claimer_notifications.notify(
1001
                                NOTIFICATION_TITLE_ERROR, f"{error_notification}{e}"
1002
                            )
1003
                            await self._screenshot_async("screenshot.png")
1004
                            if raise_error:
1005
                                raise e
1006
1007
            return wrapper
1008
1009
        return retry
1010
1011
    async def _run_once_async(
1012
        self,
1013
        interactive: bool = True,
1014
        email: str = None,
1015
        password: str = None,
1016
        verification_code: str = None,
1017
        retries: int = 3,
1018
        raise_error: bool = False,
1019
    ) -> List[str]:
1020
        @self._async_auto_retry(
1021
            retries,
1022
            "Failed to open the browser: ",
1023
            NOTIFICATION_CONTENT_OPEN_BROWSER_FAILED,
1024
        )
1025
        async def run_open_browser():
1026
            if not self.browser_opened:
1027
                await self._open_browser_async()
1028
1029
        @self._async_auto_retry(
1030
            retries, "Failed to login: ", NOTIFICATION_CONTENT_LOGIN_FAILED
1031
        )
1032
        async def run_login(
1033
            interactive: bool,
1034
            email: Optional[str],
1035
            password: Optional[str],
1036
            verification_code: str = None,
1037
        ):
1038
            if await self._need_login_async():
1039
                if interactive:
1040
                    self.log("Need login")
1041
                    self.claimer_notifications.notify(
1042
                        NOTIFICATION_TITLE_NEED_LOGIN, NOTIFICATION_CONTENT_NEED_LOGIN
1043
                    )
1044
                    await self._close_browser_async()
1045
                    email = input("Email: ")
1046
                    password = getpass("Password: ")
1047
                    await self._open_browser_async()
1048
                    await self._login_async(email, password)
1049
                    self.log("Login successful")
1050
                else:
1051
                    await self._login_async(
1052
                        email, password, verification_code, interactive=False
1053
                    )
1054
1055
        async def run_claim() -> List[str]:
1056
            claimed_item_titles = []
1057
            owned_item_titles = []
1058
1059
            @self._async_auto_retry(
1060
                retries,
1061
                "Failed to claim one item: ",
1062
                NOTIFICATION_CONTENT_CLAIM_FAILED,
1063
                raise_error=False,
1064
            )
1065
            async def retried_claim(item: Item):
1066
                if not await self._is_owned_async(item.offer_id, item.namespace):
1067
                    await self._claim_async(item)
1068
                    claimed_item_titles.append(item.title)
1069
                    self.log(f"Successfully claimed: {item.title}")
1070
                else:
1071
                    owned_item_titles.append(item.title)
1072
1073
            free_games = await self._get_weekly_free_games_async()
1074
            item_amount = 0
1075
            for game in free_games:
1076
                item_amount += game.item_amount
1077
            for game in free_games:
1078
                await retried_claim(game.base_game)
1079
                for dlc in game.dlcs:
1080
                    await retried_claim(dlc)
1081
            if len(owned_item_titles) == item_amount:
1082
                self.log("All available free games are already in your library")
1083
            if len(claimed_item_titles) != 0:
1084
                claimed_item_titles_string = ""
1085
                for title in claimed_item_titles:
1086
                    claimed_item_titles_string += f"{title}, "
1087
                claimed_item_titles_string = claimed_item_titles_string.rstrip(", ")
1088
                self.claimer_notifications.notify(
1089
                    NOTIFICATION_TITLE_CLAIM_SUCCEED,
1090
                    f"{NOTIFICATION_CONTENT_CLAIM_SUCCEED}{claimed_item_titles_string}",
1091
                )
1092
            if len(claimed_item_titles) + len(owned_item_titles) < item_amount:
1093
                raise PermissionError("Failed to claim some items")
1094
            return claimed_item_titles
1095
1096
        claimed_item_titles = []
1097
        if raise_error:
1098
            await run_open_browser()
1099
            await run_login(interactive, email, password, verification_code)
1100
            claimed_item_titles = await run_claim()
1101
        else:
1102
            try:
1103
                await run_open_browser()
1104
                await run_login(interactive, email, password, verification_code)
1105
                claimed_item_titles = await run_claim()
1106
            except:
1107
                pass
1108
        try:
1109
            await self._close_browser_async()
1110
        except:
1111
            pass
1112
        return claimed_item_titles
1113
1114
    async def _load_cookies_async(self, path: str) -> None:
1115
        with open(path, "r") as cookies_file:
1116
            cookies = cookies_file.read()
1117
            for cookie in json.loads(cookies):
1118
                await self.page.setCookie(cookie)
1119
1120
    async def _save_cookies_async(self, path: str) -> None:
1121
        dir = os.path.dirname(path)
1122
        if not dir == "" and not os.path.exists(dir):
1123
            os.mkdir(dir)
1124
        with open(path, "w") as cookies_file:
1125
            await self.page.cookies()
1126
            cookies = await self.page.cookies()
1127
            cookies_file.write(json.dumps(cookies, separators=(",", ": "), indent=4))
1128
1129
    def open_browser(self) -> None:
1130
        return self._loop.run_until_complete(self._open_browser_async())
1131
1132
    def close_browser(self) -> None:
1133
        return self._loop.run_until_complete(self._close_browser_async())
1134
1135
    def need_login(self) -> bool:
1136
        return self._loop.run_until_complete(self._need_login_async())
1137
1138
    def login(
1139
        self,
1140
        email: str,
1141
        password: str,
1142
        verifacation_code: str = None,
1143
        interactive: bool = True,
1144
        remember_me: bool = True,
1145
    ) -> None:
1146
        return self._loop.run_until_complete(
1147
            self._login_async(
1148
                email, password, verifacation_code, interactive, remember_me
1149
            )
1150
        )
1151
1152
    def get_weekly_free_games(self) -> List[Game]:
1153
        return self._loop.run_until_complete(self._get_weekly_free_games_async())
1154
1155
    def run_once(
1156
        self,
1157
        interactive: bool = True,
1158
        email: str = None,
1159
        password: str = None,
1160
        verification_code: str = None,
1161
        retries: int = 3,
1162
        raise_error: bool = False,
1163
    ) -> List[str]:
1164
        return self._loop.run_until_complete(
1165
            self._run_once_async(
1166
                interactive, email, password, verification_code, retries, raise_error
1167
            )
1168
        )
1169
1170
    def scheduled_run(
1171
        self,
1172
        at: str,
1173
        interactive: bool = True,
1174
        email: str = None,
1175
        password: str = None,
1176
        verification_code: str = None,
1177
        retries: int = 3,
1178
    ) -> None:
1179
        self.add_quit_signal()
1180
        schedule.every().day.at(at).do(
1181
            self.run_once, interactive, email, password, verification_code, retries
1182
        )
1183
        while True:
1184
            schedule.run_pending()
1185
            time.sleep(1)
1186
1187
    def load_cookies(self, path: str) -> None:
1188
        return self._loop.run_until_complete(self._load_cookies_async(path))
1189
1190
    def save_cookies(self, path: str) -> None:
1191
        return self._loop.run_until_complete(self._save_cookies_async(path))
1192
1193
    def navigate(self, url: str, timeout: int = 30000, reload: bool = True) -> None:
1194
        return self._loop.run_until_complete(self._navigate_async(url, timeout, reload))
1195
1196
    def find(self, selector: str, timeout: int = None, frame: Frame = None) -> bool:
1197
        return self._loop.run_until_complete(self._find_async(selector, timeout, frame))
1198
1199
1200
def login(cookies_path: str) -> None:
1201
    claimer = EpicgamesClaimer(
1202
        headless=False,
1203
        sandbox=True,
1204
        browser_args=["--disable-infobars", "--no-first-run"],
1205
    )
1206
    claimer.log("Creating user data, please log in in the browser ...")
1207
    claimer.navigate("https://www.epicgames.com/store", timeout=0)
1208
    claimer.find("#user[data-component=SignedIn]", timeout=0)
1209
    claimer.save_cookies(cookies_path)
1210
    claimer.log("Login successful")
1211
    claimer.close_browser()
1212
1213
1214
def get_args(run_by_main_script: bool = False) -> argparse.Namespace:
1215
    def update_args_from_env(args: argparse.Namespace) -> argparse.Namespace:
1216
        for key in args.__dict__.keys():
1217
            env = os.environ.get(key.upper())
1218
            if env != None:
1219
                if type(args.__dict__[key]) == int:
1220
                    args.__setattr__(key, int(env))
1221
                elif type(args.__dict__[key]) == bool:
1222
                    if env == "true":
1223
                        args.__setattr__(key, True)
1224
                    elif env == "false":
1225
                        args.__setattr__(key, False)
1226
                else:
1227
                    args.__setattr__(key, env)
1228
        return args
1229
1230
    parser = argparse.ArgumentParser(
1231
        description="Claim weekly free games from Epic Games Store."
1232
    )
1233
    parser.add_argument(
1234
        "-n", "--no-headless", action="store_true", help="run the browser with GUI"
1235
    )
1236
    parser.add_argument(
1237
        "-c", "--chromium-path", type=str, help="set path to browser executable"
1238
    )
1239
    parser.add_argument(
1240
        "-r",
1241
        "--run-at",
1242
        type=str,
1243
        help="set daily check and claim time, format to HH:MM, default to the current time",
1244
    )
1245
    parser.add_argument(
1246
        "-o", "--once", action="store_true", help="claim once then exit"
1247
    )
1248
    if run_by_main_script:
1249
        parser.add_argument(
1250
            "-a", "--auto-update", action="store_true", help="enable auto update"
1251
        )
1252
    if not run_by_main_script:
1253
        parser.add_argument(
1254
            "-e",
1255
            "--external-schedule",
1256
            action="store_true",
1257
            help="run in external schedule mode",
1258
        )
1259
    parser.add_argument(
1260
        "-u", "--email", "--username", type=str, help="set username/email"
1261
    )
1262
    parser.add_argument("-p", "--password", type=str, help="set password")
1263
    parser.add_argument(
1264
        "-t", "--verification-code", type=str, help="set verification code (2FA)"
1265
    )
1266
    parser.add_argument("--cookies", type=str, help="set path to cookies file")
1267
    parser.add_argument(
1268
        "-l", "--login", action="store_true", help="create logged-in user data and quit"
1269
    )
1270
    parser.add_argument("-d", "--debug", action="store_true", help="enable debug mode")
1271
    parser.add_argument(
1272
        "-dt",
1273
        "--debug-timeout",
1274
        type=int,
1275
        default=180000,
1276
        help="set timeout in milliseconds",
1277
    )
1278
    parser.add_argument(
1279
        "-dr", "--debug-retries", type=int, default=3, help="set the number of retries"
1280
    )
1281
    parser.add_argument(
1282
        "-dp",
1283
        "--debug-push-test",
1284
        action="store_true",
1285
        help="Push a notification for testing and quit",
1286
    )
1287
    parser.add_argument(
1288
        "-ds",
1289
        "--debug-show-args",
1290
        action="store_true",
1291
        help="Push a notification for testing and quit",
1292
    )
1293
    parser.add_argument(
1294
        "-ps", "--push-serverchan-sendkey", type=str, help="set ServerChan sendkey"
1295
    )
1296
    parser.add_argument(
1297
        "-pbu",
1298
        "--push-bark-url",
1299
        type=str,
1300
        default="https://api.day.app/push",
1301
        help="set Bark server address",
1302
    )
1303
    parser.add_argument(
1304
        "-pbk", "--push-bark-device-key", type=str, help="set Bark device key"
1305
    )
1306
    parser.add_argument(
1307
        "-ptt", "--push-telegram-bot-token", type=str, help="set Telegram bot token"
1308
    )
1309
    parser.add_argument(
1310
        "-pti", "--push-telegram-chat-id", type=str, help="set Telegram chat ID"
1311
    )
1312
    parser.add_argument(
1313
        "-pwx", "--push-wechat-qywx-am", type=str, help="set WeChat QYWX"
1314
    )
1315
    parser.add_argument(
1316
        "-pda",
1317
        "--push-dingtalk-access-token",
1318
        type=str,
1319
        help="set DingTalk access token",
1320
    )
1321
    parser.add_argument(
1322
        "-pds", "--push-dingtalk-secret", type=str, help="set DingTalk secret"
1323
    )
1324
    parser.add_argument(
1325
        "-ns",
1326
        "--no-startup-notification",
1327
        action="store_true",
1328
        help="disable pushing a notification at startup",
1329
    )
1330
    parser.add_argument(
1331
        "-v",
1332
        "--version",
1333
        action="version",
1334
        version=__version__,
1335
        help="print version information and quit",
1336
    )
1337
    args = parser.parse_args()
1338
    args = update_args_from_env(args)
1339
    if args.run_at == None:
1340
        localtime = time.localtime()
1341
        args.run_at = "{0:02d}:{1:02d}".format(localtime.tm_hour, localtime.tm_min)
1342
    if args.email != None and args.password == None:
1343
        raise ValueError("Must input both username and password.")
1344
    if args.email == None and args.password != None:
1345
        raise ValueError("Must input both username and password.")
1346
    args.interactive = True if args.email == None else False
1347
    args.data_dir = (
1348
        "User_Data/Default" if args.interactive else "User_Data/{}".format(args.email)
1349
    )
1350
    if args.debug_push_test:
1351
        test_notifications = Notifications(
1352
            serverchan_sendkey=args.push_serverchan_sendkey,
1353
            bark_push_url=args.push_bark_url,
1354
            bark_device_key=args.push_bark_device_key,
1355
            telegram_bot_token=args.push_telegram_bot_token,
1356
            telegram_chat_id=args.push_telegram_chat_id,
1357
        )
1358
        test_notifications.notify(NOTIFICATION_TITLE_TEST, NOTIFICATION_CONTENT_TEST)
1359
        exit()
1360
    if args.debug_show_args:
1361
        print(args)
1362
        exit()
1363
    if args.login:
1364
        login("User_Data/Default/cookies.json")
1365
        exit()
1366
    return args
1367
1368
1369
def main(
1370
    args: argparse.Namespace = None, raise_error: bool = False
1371
) -> Optional[List[str]]:
1372
    if args == None:
1373
        args = get_args()
1374
    claimer_notifications = Notifications(
1375
        serverchan_sendkey=args.push_serverchan_sendkey,
1376
        bark_push_url=args.push_bark_url,
1377
        bark_device_key=args.push_bark_device_key,
1378
        telegram_bot_token=args.push_telegram_bot_token,
1379
        telegram_chat_id=args.push_telegram_chat_id,
1380
        wechat_qywx_am=args.push_wechat_qywx_am,
1381
        dingtalk_access_token=args.push_dingtalk_access_token,
1382
        dingtalk_secret=args.push_dingtalk_secret,
1383
    )
1384
    claimer = EpicgamesClaimer(
1385
        args.data_dir,
1386
        headless=not args.no_headless,
1387
        chromium_path=args.chromium_path,
1388
        claimer_notifications=claimer_notifications,
1389
        timeout=args.debug_timeout,
1390
        debug=args.debug,
1391
        cookies=args.cookies,
1392
    )
1393
    if args.once:
1394
        return claimer.run_once(
1395
            args.interactive,
1396
            args.email,
1397
            args.password,
1398
            args.verification_code,
1399
            retries=args.debug_retries,
1400
            raise_error=raise_error,
1401
        )
1402
    elif args.external_schedule:
1403
        if not args.no_startup_notification:
1404
            claimer_notifications.notify(
1405
                NOTIFICATION_TITLE_START, NOTIFICATION_CONTENT_START
1406
            )
1407
        claimer.run_once(
1408
            args.interactive,
1409
            args.email,
1410
            args.password,
1411
            args.verification_code,
1412
            retries=args.debug_retries,
1413
            raise_error=raise_error,
1414
        )
1415
    else:
1416
        if not args.no_startup_notification:
1417
            claimer_notifications.notify(
1418
                NOTIFICATION_TITLE_START, NOTIFICATION_CONTENT_START
1419
            )
1420
        claimer.run_once(
1421
            args.interactive,
1422
            args.email,
1423
            args.password,
1424
            args.verification_code,
1425
            retries=args.debug_retries,
1426
        )
1427
        claimer.scheduled_run(
1428
            args.run_at,
1429
            args.interactive,
1430
            args.email,
1431
            args.password,
1432
            args.verification_code,
1433
        )
1434
1435
1436
def start() -> None:
1437
    cwd = os.getcwd()
1438
    sys.path.append(cwd)
1439
    ENV = get_env_str()
1440
    if ENV == "v2p":
1441
        os.chdir("/usr/local/app/script/Lists")
1442
    elif ENV == "ql":
1443
        os.chdir("/ql/config")
1444
    else:
1445
        os.chdir("/tmp")
1446
    args = get_args()
1447
    args.chromium_path = "chromium-browser"
1448
    args.cookies = "User_Data/Default/cookies.json"
1449
    args.interactive = False
1450
    args.once = True
1451
    data = get_data()
1452
    check_item = data.get("EPIC", [])[0]
1453
    args.email = check_item.get("email")
1454
    args.password = check_item.get("password")
1455
    claimed_item_titles = main(args, raise_error=True)
1456
    result_message = (
1457
        f"{NOTIFICATION_CONTENT_CLAIM_SUCCEED}{claimed_item_titles}"
1458
        if len(claimed_item_titles)
1459
        else NOTIFICATION_CONTENT_OWNED_ALL
1460
    )
1461
    send("Epicgames", result_message)
1462
1463
1464
if __name__ == "__main__":
1465
    start()
1466