mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 11:34:18 +00:00
Closes #112: Priority by guild and by most connected members
This commit is contained in:
parent
5ebdf49766
commit
5bbae7da8c
3 changed files with 68 additions and 60 deletions
|
@ -1,5 +1,5 @@
|
|||
from royalnet.commands import *
|
||||
from typing import TYPE_CHECKING, Optional, List
|
||||
from typing import TYPE_CHECKING, Optional, List, Union
|
||||
import asyncio
|
||||
|
||||
try:
|
||||
|
@ -21,36 +21,85 @@ class SummonCommand(Command):
|
|||
async def run(self, args: CommandArgs, data: CommandData) -> None:
|
||||
# This command only runs on Discord!
|
||||
if self.interface.name != "discord":
|
||||
# TODO: use a Herald Event to remotely connect the bot
|
||||
raise UnsupportedError()
|
||||
# noinspection PyUnresolvedReferences
|
||||
message: discord.Message = data.message
|
||||
member: Union[discord.User, discord.Member] = message.author
|
||||
serf: DiscordSerf = self.interface.bot
|
||||
client: discord.Client = serf.client
|
||||
channel_name: Optional[str] = args.joined()
|
||||
|
||||
# If the channel name was passed as an argument...
|
||||
if channel_name != "":
|
||||
# Try to find the specified channel
|
||||
channels: List[discord.abc.GuildChannel] = serf.client.find_channel(channel_name)
|
||||
# TODO: if there are multiple channels, try to find the most appropriate one
|
||||
# TODO: ensure that the channel is a voice channel
|
||||
if len(channels) != 1:
|
||||
raise CommandError("Couldn't decide on a channel to connect to.")
|
||||
else:
|
||||
# Try to find all possible channels
|
||||
channels: List[discord.VoiceChannel] = []
|
||||
for ch in client.get_all_channels():
|
||||
guild: discord.Guild = ch.guild
|
||||
# Ensure the channel is a voice channel
|
||||
if not isinstance(ch, discord.VoiceChannel):
|
||||
continue
|
||||
# Ensure the channel starts with the requested name
|
||||
ch_name: str = ch.name
|
||||
if not ch_name.startswith(channel_name):
|
||||
continue
|
||||
# Ensure that the command author can access the channel
|
||||
if guild.get_member(member.id) is None:
|
||||
continue
|
||||
member_permissions: discord.Permissions = ch.permissions_for(member)
|
||||
if not (member_permissions.connect and member_permissions.speak):
|
||||
continue
|
||||
# Ensure that the bot can access the channel
|
||||
bot_member = guild.get_member(client.user.id)
|
||||
bot_permissions: discord.Permissions = ch.permissions_for(bot_member)
|
||||
if not (bot_permissions.connect and bot_permissions.speak):
|
||||
continue
|
||||
# Found one!
|
||||
channels.append(ch)
|
||||
|
||||
# Ensure at least a single channel is returned
|
||||
if len(channels) == 0:
|
||||
raise InvalidInputError("Could not find any channel to connect to.")
|
||||
elif len(channels) == 1:
|
||||
channel = channels[0]
|
||||
else:
|
||||
# Give priority to channels in the current guild
|
||||
filter_by_guild = False
|
||||
for ch in channels:
|
||||
if ch.guild == message.guild:
|
||||
filter_by_guild = True
|
||||
break
|
||||
if filter_by_guild:
|
||||
new_channels = []
|
||||
for ch in channels:
|
||||
if ch.guild == message.guild:
|
||||
new_channels.append(ch)
|
||||
channels = new_channels
|
||||
|
||||
# Give priority to channels with the most people
|
||||
def people_count(c: discord.VoiceChannel):
|
||||
return len(c.members)
|
||||
channels.sort(key=people_count, reverse=True)
|
||||
|
||||
channel = channels[0]
|
||||
|
||||
else:
|
||||
# Try to use the channel in which the command author is in
|
||||
voice: Optional[discord.VoiceState] = message.author.voice
|
||||
if voice is None:
|
||||
raise CommandError("You must be connected to a voice channel to summon the bot without any arguments.")
|
||||
raise UserError("You must be connected to a voice channel to summon the bot without any arguments.")
|
||||
channel: discord.VoiceChannel = voice.channel
|
||||
|
||||
# Try to connect to the voice channel
|
||||
try:
|
||||
client = await channel.connect()
|
||||
voice: discord.VoiceClient = await channel.connect()
|
||||
except asyncio.TimeoutError:
|
||||
raise ExternalError("Timed out while trying to connect to the channel")
|
||||
except discord.opus.OpusNotLoaded:
|
||||
raise ConfigurationError("[c]libopus[/c] is not loaded in the serf")
|
||||
except discord.ClientException as e:
|
||||
# TODO: handle this someway
|
||||
raise
|
||||
await asyncio.sleep(6)
|
||||
breakpoint()
|
||||
# The bot is already connected to a voice channel
|
||||
# TODO: safely move the bot somewhere else
|
||||
raise CommandError("The bot is already connected in another channel.")
|
||||
|
||||
await data.reply(f"✅ Connected to <#{channel.id}>!")
|
||||
|
|
|
@ -142,47 +142,6 @@ class DiscordSerf(Serf):
|
|||
"""Change the bot presence to ``online`` when the bot is ready."""
|
||||
await cli.change_presence(status=discord.Status.online)
|
||||
|
||||
def find_guild(cli, name: str) -> List[discord.Guild]:
|
||||
"""Find the :class:`discord.Guild`s with the specified name (case insensitive).
|
||||
|
||||
Returns:
|
||||
A :class:`list` of :class:`discord.Guild` having the specified name."""
|
||||
all_guilds: List[discord.Guild] = cli.guilds
|
||||
matching_channels: List[discord.Guild] = []
|
||||
for guild in all_guilds:
|
||||
if guild.name.lower() == name.lower():
|
||||
matching_channels.append(guild)
|
||||
return matching_channels
|
||||
|
||||
def find_channel(cli,
|
||||
name: str,
|
||||
guild: Optional[discord.Guild] = None) -> List[discord.abc.GuildChannel]:
|
||||
"""Find the :class:`TextChannel`s, :class:`VoiceChannel`s or :class:`CategoryChannel`s with the
|
||||
specified name (case insensitive).
|
||||
|
||||
You can specify a guild to only search in that specific guild."""
|
||||
if guild is not None:
|
||||
all_channels = guild.channels
|
||||
else:
|
||||
all_channels: List[discord.abc.GuildChannel] = cli.get_all_channels()
|
||||
matching_channels: List[discord.abc.GuildChannel] = []
|
||||
for channel in all_channels:
|
||||
if not (isinstance(channel, discord.TextChannel)
|
||||
or isinstance(channel, discord.VoiceChannel)
|
||||
or isinstance(channel, discord.CategoryChannel)):
|
||||
continue
|
||||
if channel.name.lower() == name.lower():
|
||||
matching_channels.append(channel)
|
||||
return matching_channels
|
||||
|
||||
def find_voice_client(cli, guild: discord.Guild) -> Optional[discord.VoiceClient]:
|
||||
"""Find the :class:`discord.VoiceClient` belonging to a specific :py:class:`discord.Guild`."""
|
||||
# TODO: the bug I was looking for might be here
|
||||
for voice_client in cli.voice_clients:
|
||||
if voice_client.guild == guild:
|
||||
return voice_client
|
||||
return None
|
||||
|
||||
return DiscordClient
|
||||
|
||||
async def run(self):
|
||||
|
|
|
@ -294,16 +294,16 @@ class Serf:
|
|||
except UserError as e:
|
||||
await data.reply(f"⚠️ {e.message}")
|
||||
except UnsupportedError as e:
|
||||
await data.reply(f"🚫 {e.message}")
|
||||
await data.reply(f"⚠️ {e.message}")
|
||||
except ExternalError as e:
|
||||
await data.reply(f"🚫 {e.message}")
|
||||
await data.reply(f"⚠️ {e.message}")
|
||||
except ConfigurationError as e:
|
||||
await data.reply(f"⛔️ {e.message}")
|
||||
await data.reply(f"⚠️ {e.message}")
|
||||
except CommandError as e:
|
||||
await data.reply(f"⛔️ {e.message}")
|
||||
await data.reply(f"⚠️ {e.message}")
|
||||
except Exception as e:
|
||||
self.sentry_exc(e)
|
||||
error_message = f"🦀 [b]{e.__class__.__name__}[/b] 🦀\n" \
|
||||
error_message = f"⛔ [b]{e.__class__.__name__}[/b]\n" \
|
||||
'\n'.join(e.args)
|
||||
await data.reply(error_message)
|
||||
|
||||
|
|
Loading…
Reference in a new issue