Passed
Branch BonHowi (00e5ad)
by Bartosz
01:23
created

LeaderboardsCog.update_event_leaderboards()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 12
rs 9.85
c 0
b 0
f 0
cc 1
nop 3
1
import discord
2
import pandas as pd
3
from discord.ext import commands, tasks
4
from discord.utils import get
5
from discord_slash import cog_ext, SlashContext
6
import cogs.cogbase as cogbase
7
from cogs.databasecog import DatabaseCog
8
from modules.utils import get_dominant_color
9
10
11
class LeaderboardsCog(cogbase.BaseCog):
12
    def __init__(self, base):
13
        super().__init__(base)
14
15
        self.common_total = 0
16
        self.update_leaderboards_loop.start()
17
18
    def cog_unload(self):
19
        self.update_leaderboards_loop.cancel()
20
21
    # Send leaderboards to specified channel
22
    async def update_leaderboard(self, channel: int, ch_type: str) -> None:
23
        top_ch = self.bot.get_channel(channel)
24
        spots_df = await DatabaseCog.db_get_spots_df()
25
        monsters_df = await DatabaseCog.db_get_monster_spots_df()
26
        # Hard coded because there is only one interesting monster
27
        event_monster_df = monsters_df.filter(["member_id", "Nightmare"], axis=1)
28
        spots_df = pd.merge(spots_df, event_monster_df, on=["member_id"])
29
        spots_df = spots_df.drop(spots_df[spots_df.member_id == self.bot.user.id].index)
30
        spots_df["total"] = spots_df["legendary"] * self.legend_multiplier + spots_df["rare"] + (spots_df["Nightmare"])
31
        spots_df_top = spots_df.sort_values(ch_type, ascending=False).head(15)
32
        spots_df_top = spots_df_top.reset_index(drop=True)
33
        await self.print_leaderboard(top_ch, spots_df_top, ch_type)
34
35
        self.create_log_msg(f"Leaderboards updated - {ch_type}")
36
37
    async def update_event_leaderboards(self, channel: int, event_monster: str) -> None:
38
        top_ch = self.bot.get_channel(channel)
39
        spots_df = await DatabaseCog.db_get_monster_spots_df()
40
        event_monster_df = spots_df.filter(["member_id", event_monster], axis=1)
41
        member_names_df = await DatabaseCog.db_get_member_names()
42
43
        event_df = pd.merge(event_monster_df, member_names_df, on=["member_id"])
44
        event_df = event_df.drop(event_df[event_df.member_id == self.bot.user.id].index)
45
        event_df = event_df.sort_values(event_monster, ascending=False).head(15)
46
        event_df = event_df.reset_index(drop=True)
47
        await self.print_leaderboard(top_ch, event_df, event_monster)
48
        self.create_log_msg(f"Leaderboards updated - event")
49
50
    async def print_leaderboard(self, leaderboard_ch, monster_df, monster_type):
51
        try:
52
            await leaderboard_ch.purge()
53
        except discord.errors.NotFound:
54
            pass
55
56
        top_print = []
57
        top_user_id = int(monster_df.at[0, 'member_id'])
58
        for index, row in monster_df.iterrows():
59
            if row[monster_type] == 0:
60
                member_stats = ""
61
            elif row['member_id'] == top_user_id:
62
                member_stats = [f"**[{index + 1}]  {row['display_name']} - {row[monster_type]}**"]
63
            else:
64
                member_stats = [f"**[{index + 1}]**  {row['display_name']} - {row[monster_type]}"]
65
            top_print.append(member_stats)
66
        top_print = ['\n'.join([elem for elem in sublist]) for sublist in top_print]
67
        top_print = "\n".join(top_print)
68
        ch_type = ''.join([i for i in monster_type if not i.isdigit()])
69
70
        top_user = get(self.bot.get_all_members(), id=top_user_id)
71
        top_user_color = get_dominant_color(top_user.avatar_url)
72
        embed_command = discord.Embed(title=f"TOP 15 {ch_type.upper()}", description=top_print,
73
                                      color=top_user_color)
74
        member = self.bot.get_user(monster_df['member_id'].iloc[0])
75
        embed_command.set_thumbnail(url=f'{member.avatar_url}')
76
        dt_string = self.bot.get_current_time()
77
        embed_command.set_footer(text=f"{dt_string}")
78
        await leaderboard_ch.send(embed=embed_command)
79
80
    # Update member spotting role(total/common)
81
    async def update_role(self, guild, guild_member, spot_roles: dict, common: bool) -> None:
82
        roles_type = "common" if common else "total"
83
        try:
84
            spots_df = await DatabaseCog.db_get_member_stats(guild_member.id)
85
            if not common:
86
                spots_df["total"] = spots_df["legendary"] * self.legend_multiplier + spots_df["rare"]
87
            roles_list = [key for (key, value) in spot_roles.items() if spots_df.loc[0, roles_type] >= value]
88
            if roles_list:
89
                await self.update_role_ext(guild, roles_list, guild_member)
90
        except KeyError as e:
91
            print(e)
92
93
    async def update_role_ext(self, guild, roles_list: list, guild_member) -> None:
94
        await self.create_new_role(guild, roles_list[-1])
95
        role_new = get(guild.roles, name=roles_list[-1])
96
        if role_new not in guild_member.roles:
97
            await guild_member.add_roles(role_new)
98
        if len(roles_list) > 1:
99
            await self.create_new_role(guild, roles_list[-2])
100
            role_old = get(guild.roles, name=roles_list[-2])
101
            if role_old in guild_member.roles:
102
                await guild_member.remove_roles(role_old)
103
104
    # Update members' spotting roles
105
    async def update_member_roles(self) -> None:
106
        guild = self.bot.get_guild(self.bot.guild[0])
107
        spot_roles_total = self.bot.config["total_milestones"][0]
108
        spot_roles_common = self.bot.config["common_milestones"][0]
109
        for guild_member in guild.members:
110
            await self.update_role(guild, guild_member, spot_roles_total, False)
111
            await self.update_role(guild, guild_member, spot_roles_common, True)
112
113
    async def update_leaderboards(self) -> None:
114
        await self.update_leaderboard(self.bot.ch_leaderboards, "total")
115
        await self.update_leaderboard(self.bot.ch_leaderboards_common, "common")
116
        await self.update_event_leaderboards(self.bot.ch_leaderboards_event, "Nightmare")
117
        self.create_log_msg(f"All leaderboards updated")
118
        await self.update_member_roles()
119
        self.create_log_msg(f"Members' roles updated")
120
121
    @tasks.loop(minutes=15)
122
    async def update_leaderboards_loop(self) -> None:
123
        await self.update_leaderboards()
124
125
    @tasks.loop(minutes=15)
126
    async def update_leaderboards_loop(self) -> None:
127
        await self.update_leaderboards()
128
129
    @update_leaderboards_loop.before_loop
130
    async def before_update_leaderboards_loop(self) -> None:
131
        self.create_log_msg(f"Waiting until Bot is ready")
132
        common_ch = self.bot.get_channel(self.bot.ch_common)
133
        try:
134
            async for _ in common_ch.history(limit=None, oldest_first=True):
135
                self.common_total += 1
136
        except AttributeError:
137
            pass
138
        await self.bot.wait_until_ready()
139
140
    @cog_ext.cog_slash(name="reloadLeaderboards", guild_ids=cogbase.GUILD_IDS,
141
                       description="Reload leaderboards",
142
                       default_permission=False,
143
                       permissions=cogbase.PERMISSION_MODS)
144
    async def reload_leaderboards(self, ctx: SlashContext) -> None:
145
        await ctx.send(f"Leaderboards reloaded", hidden=True)
146
        await self.update_leaderboards()
147
148
149
def setup(bot: commands.Bot) -> None:
150
    bot.add_cog(LeaderboardsCog(bot))
151