api_rssbot   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 83
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 58
dl 0
loc 83
rs 10
c 0
b 0
f 0
wmc 13

2 Methods

Rating   Name   Duplication   Size   Complexity  
A RssRobot.remove_old_history() 0 7 1
D RssRobot.main() 0 54 12
1
# -*- encoding: utf-8 -*-
2
"""
3
cron: 22 6-22/2 * * *
4
new Env('RSS 订阅');
5
"""
6
7
from datetime import datetime, timedelta, timezone
8
from time import mktime
9
10
import feedparser
11
12
from notify_mtr import send
13
from utils_models import History, Rss, db
14
15
16
class RssRobot:
17
    def main(self):
18
        self.remove_old_history()
19
20
        rss_list = Rss.select()
21
        post_url_list = [
22
            rss_history.url
23
            for rss_history in History.select().where(
24
                History.publish_at == datetime.now().strftime("%Y-%m-%d")
25
            )
26
        ]
27
28
        no = 0
29
        msg = ""
30
        for rss in rss_list:
31
            rss_history_list = []
32
            feed = feedparser.parse(rss.feed)
33
            title = True
34
            c_no = 1
35
            for entry in feed.entries:
36
                pub_t = datetime.fromtimestamp(mktime(entry["published_parsed"]))
37
38
                # 此网站单独处理
39
                if rss.url == "https://www.foreverblog.cn":
40
                    pub_t = pub_t.replace(
41
                        year=datetime.now(timezone.utc).year
42
                    ) + timedelta(hours=8)
43
44
                elif rss.url == "https://www.zhihu.com":
45
                    entry.link = entry.link.split("/answer")[0]
46
47
                if (
48
                    entry.link not in post_url_list
49
                    and (
50
                        datetime.timestamp(datetime.now(timezone.utc))
51
                        - datetime.timestamp(pub_t)
52
                    )
53
                    < rss.before * 86400
54
                ):
55
                    if title:
56
                        msg += f"<b>{rss.title.strip()}</b>\n"
57
                        title = False
58
                    msg = f'{msg}{c_no}. <a href="{entry.link}">{entry.title}</a>\n'
59
                    no += 1
60
                    c_no += 1
61
                    if no % 20 == 0:
62
                        send("RSS 订阅", msg)
63
                        msg = ""
64
                        title = False
65
                    rss_history_list.append(History(url=entry.link))
66
            with db.atomic():
67
                History.bulk_create(rss_history_list, batch_size=10)
68
69
        if no % 20 != 0 and msg:
70
            send("RSS 订阅", msg)
71
72
    @staticmethod
73
    def remove_old_history():
74
        # 只保留最近一周的记录
75
        week_date_range = datetime.now() + timedelta(days=-7)
76
        History.delete().where(
77
            History.publish_at < week_date_range.strftime("%Y-%m-%d")
78
        ).execute()
79
80
81
if __name__ == "__main__":
82
    RssRobot().main()
83