Passed
Push — main ( c507f0...00b29a )
by Bartosz
02:41 queued 01:13
created

LeaderboardsCog.reload_leaderboards()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import discord
2
import pandas as pd
3
from discord.ext import commands, tasks
4
from discord.utils import get
5
from discord_slash import cog_ext, SlashContext
6
import cogs.cogbase as cogbase
7
from cogs.databasecog import DatabaseCog
8
from modules.utils import get_dominant_color
9
10
11
class LeaderboardsCog(cogbase.BaseCog):
12
    def __init__(self, base):
13
        super().__init__(base)
14
15
        self.common_total = 0
16
        self.update_leaderboards_loop.start()
17
18
    def cog_unload(self):
19
        self.update_leaderboards_loop.cancel()
20
21
    # Send leaderboards to specified channel
22
    async def update_leaderboard(self, channel: int, ch_type: str) -> None:
23
        top_ch = self.bot.get_channel(channel)
24
        spots_df = await DatabaseCog.db_get_spots_df()
25
        monsters_df = await DatabaseCog.db_get_monster_spots_df()
26
        # Hard coded because there is only one interesting monster
27
        event_monster_df = monsters_df.filter(["member_id", "Nightmare"], axis=1)
28
        spots_df = pd.merge(spots_df, event_monster_df, on=["member_id"])
29
        spots_df = spots_df.drop(spots_df[spots_df.member_id == self.bot.user.id].index)
30
        spots_df["total"] = spots_df["legendary"] * self.legend_multiplier + spots_df["rare"] + (spots_df["Nightmare"])
31
        spots_df_top = spots_df.sort_values(ch_type, ascending=False).head(15)
32
        spots_df_top = spots_df_top.reset_index(drop=True)
33
        try:
34
            await top_ch.purge()
35
        except discord.errors.NotFound:
36
            pass
37
        top_print = []
38
        top_user_id = int(spots_df_top.at[0, 'member_id'])
39
        for index, row in spots_df_top.iterrows():
40
            if row[ch_type] == 0:
41
                member_stats = ""
42
            elif row['member_id'] == top_user_id:
43
                member_stats = [f"**[{index + 1}]**  {row['display_name']} - **{row[ch_type]}**"]
44
            else:
45
                member_stats = [f"**[{index + 1}]**  {row['display_name']} - {row[ch_type]}"]
46
            top_print.append(member_stats)
47
        top_print = ['\n'.join([elem for elem in sublist]) for sublist in top_print]
48
        top_print = "\n".join(top_print)
49
        ch_type = ''.join([i for i in ch_type if not i.isdigit()])
50
51
        top_user = get(self.bot.get_all_members(), id=top_user_id)
52
        top_user_color = get_dominant_color(top_user.avatar_url)
53
        embed_command = discord.Embed(title=f"TOP 15 {ch_type.upper()}", description=top_print,
54
                                      color=top_user_color)
55
        member = self.bot.get_user(spots_df_top['member_id'].iloc[0])
56
        embed_command.set_thumbnail(url=f'{member.avatar_url}')
57
        dt_string = self.bot.get_current_time()
58
        embed_command.set_footer(text=f"{dt_string}")
59
        await top_ch.send(embed=embed_command)
60
        self.create_log_msg(f"Leaderboards updated - {ch_type}")
61
62
    # Update member spotting role(total/common)
63
    async def update_role(self, guild, guild_member, spot_roles: dict, common: bool) -> None:
64
        roles_type = "common" if common else "total"
65
        try:
66
            spots_df = await DatabaseCog.db_get_member_stats(guild_member.id)
67
            if not common:
68
                spots_df["total"] = spots_df["legendary"] * self.legend_multiplier + spots_df["rare"]
69
            roles_list = [key for (key, value) in spot_roles.items() if spots_df.loc[0, roles_type] >= value]
70
            if roles_list:
71
                await self.update_role_ext(guild, roles_list, guild_member)
72
        except KeyError as e:
73
            print(e)
74
75
    async def update_role_ext(self, guild, roles_list: list, guild_member) -> None:
76
        await self.create_new_role(guild, roles_list[-1])
77
        role_new = get(guild.roles, name=roles_list[-1])
78
        if role_new not in guild_member.roles:
79
            await guild_member.add_roles(role_new)
80
        if len(roles_list) > 1:
81
            await self.create_new_role(guild, roles_list[-2])
82
            role_old = get(guild.roles, name=roles_list[-2])
83
            if role_old in guild_member.roles:
84
                await guild_member.remove_roles(role_old)
85
86
    # Update members' spotting roles
87
    async def update_member_roles(self) -> None:
88
        guild = self.bot.get_guild(self.bot.guild[0])
89
        spot_roles_total = self.bot.config["total_milestones"][0]
90
        spot_roles_common = self.bot.config["common_milestones"][0]
91
        for guild_member in guild.members:
92
            await self.update_role(guild, guild_member, spot_roles_total, False)
93
            await self.update_role(guild, guild_member, spot_roles_common, True)
94
95
    async def update_event_leaderboards(self, channel: int, event_monster: str) -> None:
96
        top_ch = self.bot.get_channel(channel)
97
        spots_df = await DatabaseCog.db_get_monster_spots_df()
98
        event_monster_df = spots_df.filter(["member_id", event_monster], axis=1)
99
        member_names_df = await DatabaseCog.db_get_member_names()
100
101
        event_df = pd.merge(event_monster_df, member_names_df, on=["member_id"])
102
        event_df = event_df.drop(event_df[event_df.member_id == self.bot.user.id].index)
103
        event_df = event_df.sort_values(event_monster, ascending=False).head(15)
104
        event_df = event_df.reset_index(drop=True)
105
        try:
106
            await top_ch.purge()
107
        except discord.errors.NotFound:
108
            pass
109
        top_print = []
110
        top_user_id = int(event_df.at[0, 'member_id'])
111
        for index, row in event_df.iterrows():
112
            if row[event_monster] == 0:
113
                member_stats = ""
114
            elif row['member_id'] == top_user_id:
115
                member_stats = [f"**[{index + 1}]**  {row['display_name']} - **{row[event_monster]}**"]
116
            else:
117
                member_stats = [f"**[{index + 1}]**  {row['display_name']} - {row[event_monster]}"]
118
            top_print.append(member_stats)
119
        top_print = ['\n'.join([elem for elem in sublist]) for sublist in top_print]
120
        top_print = "\n".join(top_print)
121
        ch_type = ''.join([i for i in event_monster if not i.isdigit()])
122
123
        top_user = get(self.bot.get_all_members(), id=top_user_id)
124
        top_user_color = get_dominant_color(top_user.avatar_url)
125
        embed_command = discord.Embed(title=f"TOP 15 {ch_type.upper()}", description=top_print,
126
                                      color=top_user_color)
127
        member = self.bot.get_user(event_df['member_id'].iloc[0])
128
        embed_command.set_thumbnail(url=f'{member.avatar_url}')
129
        dt_string = self.bot.get_current_time()
130
        embed_command.set_footer(text=f"{dt_string}")
131
        await top_ch.send(embed=embed_command)
132
        self.create_log_msg(f"Leaderboards updated - event")
133
134
    async def update_leaderboards(self) -> None:
135
        await self.update_leaderboard(self.bot.ch_leaderboards, "total")
136
        await self.update_leaderboard(self.bot.ch_leaderboards_common, "common")
137
        await self.update_event_leaderboards(self.bot.ch_leaderboards_event, "Nightmare")
138
        self.create_log_msg(f"All leaderboards updated")
139
        await self.update_member_roles()
140
        self.create_log_msg(f"Members' roles updated")
141
142
    @tasks.loop(minutes=15)
143
    async def update_leaderboards_loop(self) -> None:
144
        await self.update_leaderboards()
145
146
    @tasks.loop(minutes=15)
147
    async def update_leaderboards_loop(self) -> None:
148
        await self.update_leaderboards()
149
150
    @update_leaderboards_loop.before_loop
151
    async def before_update_leaderboards_loop(self) -> None:
152
        self.create_log_msg(f"Waiting until Bot is ready")
153
        common_ch = self.bot.get_channel(self.bot.ch_common)
154
        try:
155
            async for _ in common_ch.history(limit=None, oldest_first=True):
156
                self.common_total += 1
157
        except AttributeError:
158
            pass
159
        await self.bot.wait_until_ready()
160
161
    @cog_ext.cog_slash(name="reloadLeaderboards", guild_ids=cogbase.GUILD_IDS,
162
                       description="Reload leaderboards",
163
                       default_permission=False,
164
                       permissions=cogbase.PERMISSION_MODS)
165
    async def reload_leaderboards(self, ctx: SlashContext) -> None:
166
        await ctx.send(f"Leaderboards reloaded", hidden=True)
167
        await self.update_leaderboards()
168
169
170
def setup(bot: commands.Bot) -> None:
171
    bot.add_cog(LeaderboardsCog(bot))
172