1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 13:34:28 +00:00
This commit is contained in:
Steffo 2020-08-21 02:34:29 +02:00
parent 3f1c8a57d3
commit 3f68729bc3
13 changed files with 22 additions and 605 deletions

View file

@ -53,12 +53,12 @@ class LinkerCommand(rc.Command, metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Updatable]: async def get_updatables_of_user(self, session, user: rbt.User) -> List[Updatable]:
"""Get the updatables of a specific user.""" """Get the updatables of a specific user."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def get_updatables(self, session) -> List[Updatable]: async def get_updatables(self, session) -> List[Updatable]:
"""Return a list of all objects that should be updated at this updater cycle.""" """Return a list of all objects that should be updated at this updater cycle."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def create(self, async def create(self,
@ -69,37 +69,37 @@ class LinkerCommand(rc.Command, metaclass=abc.ABCMeta):
"""Create a new updatable object for a user. """Create a new updatable object for a user.
This function is responsible for adding the object to the session.""" This function is responsible for adding the object to the session."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def update(self, session, obj, change: Callable[[str, Any], Awaitable[None]]): async def update(self, session, obj, change: Callable[[str, Any], Awaitable[None]]):
"""Update a single updatable object. Use the change method to change values on the object!""" """Update a single updatable object. Use the change method to change values on the object!"""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def on_increase(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None: async def on_increase(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
"""Called when the attribute has increased from the old value.""" """Called when the attribute has increased from the old value."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def on_unchanged(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None: async def on_unchanged(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
"""Called when the attribute stayed the same as the old value.""" """Called when the attribute stayed the same as the old value."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def on_decrease(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None: async def on_decrease(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
"""Called when the attribute has decreased from the old value.""" """Called when the attribute has decreased from the old value."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def on_first(self, session, obj: Updatable, attribute: str, old: None, new: Any) -> None: async def on_first(self, session, obj: Updatable, attribute: str, old: None, new: Any) -> None:
"""Called when the attribute changed from None.""" """Called when the attribute changed from None."""
... raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def on_reset(self, session, obj: Updatable, attribute: str, old: Any, new: None) -> None: async def on_reset(self, session, obj: Updatable, attribute: str, old: Any, new: None) -> None:
"""Called when the attribute changed to None.""" """Called when the attribute changed to None."""
... raise NotImplementedError()
async def _change(self, async def _change(self,
session, session,

View file

@ -1,27 +1,27 @@
import discord import discord
from typing import * from typing import *
from royalnet.serf.discord.discordserf import DiscordSerf from royalnet.serf.discord.discordserf import DiscordSerf
from royalnet.commands import * import royalnet.commands as rc
class DiscordCvEvent(Event): class DiscordCvEvent(rc.HeraldEvent):
name = "discord_cv" name = "discord_cv"
async def run(self, guild_id: Optional[int] = None, **kwargs) -> dict: async def run(self, guild_id: Optional[int] = None, **kwargs) -> dict:
if not self.interface.name == "discord": if not isinstance(self.parent, DiscordSerf):
raise UnsupportedError() raise rc.UnsupportedError()
# noinspection PyTypeChecker # noinspection PyTypeChecker
serf: DiscordSerf = self.interface.serf serf: DiscordSerf = self.parent
client: discord.Client = serf.client client: discord.Client = serf.client
if guild_id is None: if guild_id is None:
guilds: List[discord.Guild] = client.guilds guilds: List[discord.Guild] = client.guilds
if len(guilds) == 0: if len(guilds) == 0:
raise ConfigurationError("Il bot non è in nessun Server.") raise rc.ConfigurationError("Il bot non è in nessun Server.")
elif len(guilds) > 1: elif len(guilds) > 1:
raise UserError("Non hai specificato di quale Server vuoi vedere le informazioni!") raise rc.UserError("Non hai specificato di quale Server vuoi vedere le informazioni!")
else: else:
guild = guilds[0] guild = guilds[0]
else: else:

View file

@ -1,100 +0,0 @@
import datetime
from typing import *
import discord
import royalnet.commands as rc
import royalnet.serf.discord as rsd
import royalnet.bard.discord as rbd
from ..utils import RoyalQueue, RoyalPool
class DiscordLazyPlayEvent(rc.Event):
name = "discord_lazy_play"
async def run(self,
urls: List[str],
guild_id: Optional[int] = None,
user: Optional[str] = None,
force_color: Optional[int] = None,
**kwargs) -> dict:
if not isinstance(self.serf, rsd.DiscordSerf):
raise rc.UnsupportedError()
serf: rsd.DiscordSerf = self.serf
client: discord.Client = self.serf.client
guild: discord.Guild = client.get_guild(guild_id) if guild_id is not None else None
candidate_players: List[rsd.VoicePlayer] = serf.find_voice_players(guild)
if len(candidate_players) == 0:
raise rc.UserError("Il bot non è in nessun canale vocale.\n"
"Evocalo prima con [c]summon[/c]!")
elif len(candidate_players) == 1:
voice_player = candidate_players[0]
else:
raise rc.CommandError("Non so in che Server riprodurre questo file...\n"
"Invia il comando su Discord, per favore!")
added: List[rbd.YtdlDiscord] = []
too_long: List[rbd.YtdlDiscord] = []
for url in urls:
ytds = await rbd.YtdlDiscord.from_url(url)
if isinstance(voice_player.playing, RoyalQueue):
for index, ytd in enumerate(ytds):
if ytd.info.duration >= datetime.timedelta(seconds=self.config["Play"]["max_song_duration"]):
too_long.append(ytd)
continue
added.append(ytd)
voice_player.playing.contents.append(ytd)
if not voice_player.voice_client.is_playing():
await voice_player.start()
elif isinstance(voice_player.playing, RoyalPool):
for index, ytd in enumerate(ytds):
if ytd.info.duration >= datetime.timedelta(seconds=self.config["Play"]["max_song_duration"]):
too_long.append(ytd)
continue
added.append(ytd)
voice_player.playing.full_pool.append(ytd)
voice_player.playing.remaining_pool.append(ytd)
if not voice_player.voice_client.is_playing():
await voice_player.start()
else:
raise rc.CommandError(f"Non so come aggiungere musica a [c]{voice_player.playing.__class__.__qualname__}[/c]!")
main_channel: discord.TextChannel = client.get_channel(self.config["Discord"]["main_channel_id"])
if len(added) > 0:
if user:
await main_channel.send(rsd.escape(f"▶️ {user} ha aggiunto {len(added)} file _(lazy)_ alla coda:"))
else:
await main_channel.send(rsd.escape(f"▶️ Aggiunt{'o' if len(added) == 1 else 'i'} {len(added)} file "
f"[i](lazy)[/i] alla coda:"))
for ytd in added[:5]:
embed: discord.Embed = ytd.embed()
if force_color:
embed._colour = discord.Colour(force_color)
await main_channel.send(embed=embed)
if len(added) > 5:
await main_channel.send(f"e altri {len(added) - 5}!")
if len(too_long) > 0:
if user:
await main_channel.send(rsd.escape(
f"{len(too_long)} file non {'è' if len(too_long) == 1 else 'sono'}"
f" stat{'o' if len(too_long) == 1 else 'i'} scaricat{'o' if len(too_long) == 1 else 'i'}"
f" perchè durava{'' if len(too_long) == 1 else 'no'}"
f" più di [c]{self.config['Play']['max_song_duration']}[/c] secondi."
))
if len(added) + len(too_long) == 0:
raise rc.InvalidInputError("Non è stato aggiunto nessun file alla coda.")
return {
"added": [{
"title": ytd.info.title,
} for ytd in added],
"too_long": [{
"title": ytd.info.title,
} for ytd in too_long]
}

View file

@ -1,41 +0,0 @@
import discord
from typing import *
import royalnet.commands as rc
from royalnet.serf.discord import *
class DiscordPauseEvent(rc.Event):
name = "discord_pause"
async def run(self,
guild_id: Optional[int] = None,
**kwargs) -> dict:
if not isinstance(self.serf, DiscordSerf):
raise rc.UnsupportedError()
client: discord.Client = self.serf.client
if len(self.serf.voice_players) == 1:
voice_player: VoicePlayer = self.serf.voice_players[0]
else:
if guild_id is None:
# TODO: trovare un modo per riprodurre canzoni su più server da Telegram
raise rc.InvalidInputError("Non so in che Server riprodurre questo file...\n"
"Invia il comando su Discord, per favore!")
guild: discord.Guild = client.get_guild(guild_id)
if guild is None:
raise rc.InvalidInputError("Impossibile trovare il Server specificato.")
candidate_players = self.serf.find_voice_players(guild)
if len(candidate_players) == 0:
raise rc.UserError("Il bot non è in nessun canale vocale.\n"
"Evocalo prima con [c]summon[/c]!")
elif len(candidate_players) == 1:
voice_player = candidate_players[0]
else:
raise rc.CommandError("Non so su che Server saltare canzone...\n"
"Invia il comando su Discord, per favore!")
if voice_player.voice_client.is_paused():
voice_player.voice_client.resume()
return {"action": "resumed"}
else:
voice_player.voice_client.pause()
return {"action": "paused"}

View file

@ -1,102 +0,0 @@
import datetime
from typing import *
import discord
import royalnet.commands as rc
import royalnet.serf.discord as rsd
import royalnet.bard.discord as rbd
from ..utils import RoyalQueue, RoyalPool
class DiscordPlayEvent(rc.Event):
name = "discord_play"
async def run(self,
urls: List[str],
guild_id: Optional[int] = None,
user: Optional[str] = None,
force_color: Optional[int] = None,
**kwargs) -> dict:
if not isinstance(self.serf, rsd.DiscordSerf):
raise rc.UnsupportedError()
serf: rsd.DiscordSerf = self.serf
client: discord.Client = self.serf.client
guild: discord.Guild = client.get_guild(guild_id) if guild_id is not None else None
candidate_players: List[rsd.VoicePlayer] = serf.find_voice_players(guild)
if len(candidate_players) == 0:
raise rc.UserError("Il bot non è in nessun canale vocale.\n"
"Evocalo prima con [c]summon[/c]!")
elif len(candidate_players) == 1:
voice_player = candidate_players[0]
else:
raise rc.CommandError("Non so in che Server riprodurre questo file...\n"
"Invia il comando su Discord, per favore!")
added: List[rbd.YtdlDiscord] = []
too_long: List[rbd.YtdlDiscord] = []
for url in urls:
ytds = await rbd.YtdlDiscord.from_url(url)
if isinstance(voice_player.playing, RoyalQueue):
for index, ytd in enumerate(ytds):
if ytd.info.duration >= datetime.timedelta(seconds=self.config["Play"]["max_song_duration"]):
too_long.append(ytd)
continue
await ytd.convert_to_pcm()
added.append(ytd)
voice_player.playing.contents.append(ytd)
if not voice_player.voice_client.is_playing():
await voice_player.start()
elif isinstance(voice_player.playing, RoyalPool):
for index, ytd in enumerate(ytds):
if ytd.info.duration >= datetime.timedelta(seconds=self.config["Play"]["max_song_duration"]):
too_long.append(ytd)
continue
await ytd.convert_to_pcm()
added.append(ytd)
voice_player.playing.full_pool.append(ytd)
voice_player.playing.remaining_pool.append(ytd)
if not voice_player.voice_client.is_playing():
await voice_player.start()
else:
raise rc.CommandError(f"Non so come aggiungere musica a [c]{voice_player.playing.__class__.__qualname__}[/c]!")
main_channel: discord.TextChannel = client.get_channel(self.config["Discord"]["main_channel_id"])
if len(added) > 0:
if user:
await main_channel.send(rsd.escape(f"▶️ {user} ha aggiunto {len(added)} file alla coda:"))
else:
await main_channel.send(rsd.escape(f"▶️ Aggiunt{'o' if len(added) == 1 else 'i'} {len(added)} file alla"
f" coda:"))
for ytd in added[:5]:
embed: discord.Embed = ytd.embed()
if force_color:
embed._colour = discord.Colour(force_color)
await main_channel.send(embed=embed)
if len(added) > 5:
await main_channel.send(f"e altri {len(added) - 5}!")
if len(too_long) > 0:
if user:
await main_channel.send(rsd.escape(
f"{len(too_long)} file non {'è' if len(too_long) == 1 else 'sono'}"
f" stat{'o' if len(too_long) == 1 else 'i'} scaricat{'o' if len(too_long) == 1 else 'i'}"
f" perchè durava{'' if len(too_long) == 1 else 'no'}"
f" più di [c]{self.config['Play']['max_song_duration']}[/c] secondi."
))
if len(added) + len(too_long) == 0:
raise rc.InvalidInputError("Non è stato aggiunto nessun file alla coda.")
return {
"added": [{
"title": ytd.info.title,
} for ytd in added],
"too_long": [{
"title": ytd.info.title,
} for ytd in too_long]
}

View file

@ -1,48 +0,0 @@
import datetime
from typing import *
import discord
import royalnet.commands as rc
import royalnet.serf.discord as rsd
import royalnet.bard.discord as rbd
from ..utils import RoyalQueue, RoyalPool
class DiscordPlaymodeEvent(rc.Event):
name = "discord_playmode"
async def run(self,
playable_string: str,
guild_id: Optional[int] = None,
user: Optional[str] = None,
**kwargs) -> dict:
if not isinstance(self.serf, rsd.DiscordSerf):
raise rc.UnsupportedError()
serf: rsd.DiscordSerf = self.serf
client: discord.Client = self.serf.client
guild: discord.Guild = client.get_guild(guild_id) if guild_id is not None else None
candidate_players: List[rsd.VoicePlayer] = serf.find_voice_players(guild)
if len(candidate_players) == 0:
raise rc.UserError("Il bot non è in nessun canale vocale.\n"
"Evocalo prima con [c]summon[/c]!")
elif len(candidate_players) == 1:
voice_player = candidate_players[0]
else:
raise rc.CommandError("Non so a che Server cambiare Playable...\n"
"Invia il comando su Discord, per favore!")
if playable_string.upper() == "QUEUE":
playable = await RoyalQueue.create()
elif playable_string.upper() == "POOL":
playable = await RoyalPool.create()
else:
raise rc.InvalidInputError(f"Unknown playable '{playable_string.upper()}'")
await voice_player.change_playing(playable)
return {
"name": f"{playable.__class__.__qualname__}"
}

View file

@ -1,55 +0,0 @@
import discord
import pickle
import base64
from typing import *
import royalnet.commands as rc
from royalnet.serf.discord import *
from ..utils import RoyalQueue
class DiscordQueueEvent(rc.Event):
name = "discord_queue"
async def run(self,
guild_id: Optional[int] = None,
**kwargs) -> dict:
if not isinstance(self.serf, DiscordSerf):
raise rc.UnsupportedError()
client: discord.Client = self.serf.client
if len(self.serf.voice_players) == 1:
voice_player: VoicePlayer = self.serf.voice_players[0]
else:
if guild_id is None:
# TODO: trovare un modo per riprodurre canzoni su più server da Telegram
raise rc.InvalidInputError("Non so in che Server riprodurre questo file...\n"
"Invia il comando su Discord, per favore!")
guild: discord.Guild = client.get_guild(guild_id)
if guild is None:
raise rc.InvalidInputError("Impossibile trovare il Server specificato.")
candidate_players = self.serf.find_voice_players(guild)
if len(candidate_players) == 0:
raise rc.UserError("Il bot non è in nessun canale vocale.\n"
"Evocalo prima con [c]summon[/c]!")
elif len(candidate_players) == 1:
voice_player = candidate_players[0]
else:
raise rc.CommandError("Non so di che Server visualizzare la coda...\n"
"Invia il comando su Discord, per favore!")
if isinstance(voice_player.playing, RoyalQueue):
now_playing = voice_player.playing.now_playing
return {
"type": f"{voice_player.playing.__class__.__qualname__}",
"now_playing": {
"title": now_playing.info.title,
"stringified_base64_pickled_discord_embed": str(base64.b64encode(pickle.dumps(now_playing.embed())),
encoding="ascii")
} if now_playing is not None else None,
"next_up": [{
"title": ytd.info.title,
"stringified_base64_pickled_discord_embed": str(base64.b64encode(pickle.dumps(ytd.embed())),
encoding="ascii")
} for ytd in voice_player.playing.contents]
}
else:
raise rc.CommandError(f"Non so come visualizzare il contenuto di un "
f"[c]{voice_player.playing.__class__.__qualname__}[/c].")

View file

@ -1,38 +0,0 @@
import discord
from typing import *
import royalnet.commands as rc
from royalnet.serf.discord import *
class DiscordSkipEvent(rc.Event):
name = "discord_skip"
async def run(self,
guild_id: Optional[int] = None,
**kwargs) -> dict:
if not isinstance(self.serf, DiscordSerf):
raise rc.UnsupportedError()
client: discord.Client = self.serf.client
if len(self.serf.voice_players) == 1:
voice_player: VoicePlayer = self.serf.voice_players[0]
else:
if guild_id is None:
# TODO: trovare un modo per riprodurre canzoni su più server da Telegram
raise rc.InvalidInputError("Non so in che Server riprodurre questo file...\n"
"Invia il comando su Discord, per favore!")
guild: discord.Guild = client.get_guild(guild_id)
if guild is None:
raise rc.InvalidInputError("Impossibile trovare il Server specificato.")
candidate_players = self.serf.find_voice_players(guild)
if len(candidate_players) == 0:
raise rc.UserError("Il bot non è in nessun canale vocale.\n"
"Evocalo prima con [c]summon[/c]!")
elif len(candidate_players) == 1:
voice_player = candidate_players[0]
else:
raise rc.CommandError("Non so su che Server saltare canzone...\n"
"Invia il comando su Discord, per favore!")
# Stop the playback of the current song
voice_player.voice_client.stop()
# Done!
return {}

View file

@ -1,83 +0,0 @@
from typing import *
from royalnet.commands import *
from royalnet.serf.discord import *
from royalnet.serf.discord.errors import *
from ..utils import RoyalQueue
import discord
class DiscordSummonEvent(Event):
name = "discord_summon"
async def run(self, *,
channel_name: str = "",
channel_id: Optional[int] = None,
guild_id: Optional[int] = None,
user_id: Optional[int] = None,
**kwargs):
if not isinstance(self.serf, DiscordSerf):
raise UnsupportedError()
# Find the guild
if guild_id is not None:
guild: Optional[discord.Guild] = self.serf.client.get_guild(guild_id)
else:
guild = None
# Find the member
if user_id is not None and guild is not None:
member: Optional[Union[discord.User, discord.Member]] = guild.get_member(user_id=user_id)
else:
member = None
# From channel id
if channel_id is not None:
client: discord.Client = self.serf.client
channel = client.get_channel(channel_id)
# Find channel
elif channel_name != "":
# Find accessible_to
accessible_to = [self.serf.client.user]
if member is not None:
accessible_to.append(member)
# Find the channel
channel: Optional["discord.VoiceChannel"] = self.serf.find_channel(channel_type=discord.VoiceChannel,
name=channel_name,
guild=guild,
accessible_to=accessible_to,
required_permissions=["connect", "speak"]
)
elif not (guild_id is None and user_id is None):
if member.voice is not None:
channel = member.voice.channel
else:
channel = None
else:
raise InvalidInputError("Non so a cosa devo connettermi! Specifica il nome di un canale, o entra tu stesso"
" in un canale vocale prima di invocare [c]summon[/c]!")
if channel is None:
raise InvalidInputError("Non ho trovato nessun canale in cui connettermi.")
# Create a new VoicePlayer
vp = VoicePlayer(loop=self.loop)
vp.playing = await RoyalQueue.create()
# Connect to the channel
try:
await vp.connect(channel)
except OpusNotLoadedError:
raise ConfigurationError("libopus non è disponibile sul sistema in cui sta venendo eseguito questo bot,"
" pertanto non è possibile con")
except DiscordTimeoutError:
raise ExternalError("Timeout durante la connessione al canale."
" Forse il bot non ha i permessi per entrarci?")
except GuildAlreadyConnectedError:
raise UserError("Il bot è già connesso in un canale vocale nel Server!\n"
"Spostalo manualmente, o disconnettilo e riinvoca [c]summon[/c]!")
# Add the created VoicePlayer to the list
self.serf.voice_players.append(vp)
# Reply to the request
return {
"channel": {
"id": channel.id,
"name": channel.name,
"guild": {
"name": channel.guild.name,
},
}
}

View file

@ -4,7 +4,7 @@ import royalnet.commands as rc
import datetime import datetime
class PongEvent(rc.Event): class PongEvent(rc.HeraldEvent):
name = "pong" name = "pong"
async def run(self, **kwargs) -> dict: async def run(self, **kwargs) -> dict:

View file

@ -2,21 +2,21 @@ import logging
import telegram import telegram
from typing import * from typing import *
from royalnet.serf.telegram.telegramserf import TelegramSerf, escape from royalnet.serf.telegram.telegramserf import TelegramSerf, escape
from royalnet.commands import * import royalnet.commands as rc
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class TelegramMessageEvent(Event): class TelegramMessageEvent(rc.HeraldEvent):
name = "telegram_message" name = "telegram_message"
async def run(self, chat_id, text, **kwargs) -> dict: async def run(self, chat_id, text, **kwargs) -> dict:
if not self.interface.name == "telegram": if not isinstance(self.parent, TelegramSerf):
raise UnsupportedError() raise rc.UnsupportedError()
# noinspection PyTypeChecker # noinspection PyTypeChecker
serf: TelegramSerf = self.interface.serf serf: TelegramSerf = self.parent
log.debug("Forwarding message from Herald to Telegram.") log.debug("Forwarding message from Herald to Telegram.")
await serf.api_call(serf.client.send_message, await serf.api_call(serf.client.send_message,

View file

@ -1,54 +0,0 @@
import logging
from typing import Optional, List, AsyncGenerator, Tuple, Any, Dict
from royalnet.bard.discord import YtdlDiscord
from royalnet.serf.discord import Playable
import discord
import random
log = logging.getLogger(__name__)
class RoyalPool(Playable):
"""A pool of :class:`YtdlDiscord` that will be played in a loop."""
def __init__(self, start_with: Optional[List[YtdlDiscord]] = None):
super().__init__()
self.full_pool: List[YtdlDiscord] = []
self.remaining_pool: List[YtdlDiscord] = []
self.now_playing: Optional[YtdlDiscord] = None
if start_with is not None:
self.full_pool = [*self.full_pool, *start_with]
log.debug(f"Created new {self.__class__.__qualname__} containing: {self.full_pool}")
async def _generator(self) \
-> AsyncGenerator[Optional["discord.AudioSource"], Tuple[Tuple[Any, ...], Dict[str, Any]]]:
yield
while True:
if len(self.remaining_pool) == 0:
if len(self.full_pool) == 0:
log.debug(f"Nothing in the pool, yielding None...")
yield None
continue
else:
self.remaining_pool = self.full_pool.copy()
random.shuffle(self.remaining_pool)
log.debug(f"Dequeuing an item...")
# Get the first YtdlDiscord of the queue
self.now_playing: YtdlDiscord = self.remaining_pool.pop(0)
log.debug(f"Yielding FileAudioSource from: {self.now_playing}")
# Create a FileAudioSource from the YtdlDiscord
# If the file hasn't been fetched / downloaded / converted yet, it will do so before yielding
async with self.now_playing.spawn_audiosource() as fas:
# Yield the resulting AudioSource
yield fas
async def destroy(self):
for file in self.full_pool:
log.debug(f"Deleting: {file}")
await file.delete_asap()
log.debug(f"Deleting: {file.ytdl_file}")
await file.ytdl_file.delete_asap()
log.debug(f"Deleted successfully!")
self.full_pool = []
self.remaining_pool = []
self.now_playing = None

View file

@ -1,62 +0,0 @@
import logging
from typing import Optional, List, AsyncGenerator, Tuple, Any, Dict
from royalnet.bard.discord import YtdlDiscord
from royalnet.serf.discord import Playable
import discord
log = logging.getLogger(__name__)
class RoyalQueue(Playable):
"""A queue of :class:`YtdlDiscord` to be played in sequence."""
def __init__(self, start_with: Optional[List[YtdlDiscord]] = None):
super().__init__()
self.contents: List[YtdlDiscord] = []
self.now_playing: Optional[YtdlDiscord] = None
if start_with is not None:
self.contents = [*self.contents, *start_with]
log.debug(f"Created new {self.__class__.__qualname__} containing: {self.contents}")
async def _generator(self) \
-> AsyncGenerator[Optional["discord.AudioSource"], Tuple[Tuple[Any, ...], Dict[str, Any]]]:
yield
while True:
log.debug(f"Dequeuing an item...")
try:
# Try to get the first YtdlDiscord of the queue
self.now_playing: YtdlDiscord = self.contents.pop(0)
except IndexError:
# If there isn't anything, yield None
log.debug(f"Nothing to dequeue, yielding None.")
yield None
continue
log.debug(f"Yielding FileAudioSource from: {self.now_playing}")
# Create a FileAudioSource from the YtdlDiscord
# If the file hasn't been fetched / downloaded / converted yet, it will do so before yielding
async with self.now_playing.spawn_audiosource() as fas:
# Yield the resulting AudioSource
yield fas
# Delete the YtdlDiscord file
log.debug(f"Deleting: {self.now_playing}")
await self.now_playing.delete_asap()
log.debug(f"Deleting: {self.now_playing.ytdl_file}")
await self.now_playing.ytdl_file.delete_asap()
log.debug(f"Deleted successfully!")
self.now_playing = None
async def destroy(self):
if self.now_playing is not None:
log.debug(f"Deleting: {self.now_playing}")
await self.now_playing.delete_asap()
log.debug(f"Deleting: {self.now_playing.ytdl_file}")
await self.now_playing.ytdl_file.delete_asap()
log.debug(f"Deleted successfully!")
self.now_playing = None
for file in self.contents:
log.debug(f"Deleting: {file}")
await file.delete_asap()
log.debug(f"Deleting: {file.ytdl_file}")
await file.ytdl_file.delete_asap()
log.debug(f"Deleted successfully!")
self.contents = []