mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
ok im getting tired
This commit is contained in:
parent
bb0507d79e
commit
d570e09e90
3 changed files with 46 additions and 27 deletions
|
@ -1,7 +1,8 @@
|
||||||
import math
|
import math
|
||||||
import random
|
import random
|
||||||
import typing
|
import typing
|
||||||
from .royalpcmaudio import RoyalPCMAudio
|
from .ytdldiscord import YtdlDiscord
|
||||||
|
from .fileaudiosource import FileAudioSource
|
||||||
|
|
||||||
|
|
||||||
class PlayMode:
|
class PlayMode:
|
||||||
|
@ -9,14 +10,14 @@ class PlayMode:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""Create a new PlayMode and initialize the generator inside."""
|
"""Create a new PlayMode and initialize the generator inside."""
|
||||||
self.now_playing: typing.Optional[RoyalPCMAudio] = None
|
self.now_playing: typing.Optional[YtdlDiscord] = None
|
||||||
self.generator: typing.AsyncGenerator = self._generate_generator()
|
self.generator: typing.AsyncGenerator = self._generate_generator()
|
||||||
|
|
||||||
async def next(self) -> typing.Optional[RoyalPCMAudio]:
|
async def next(self) -> typing.Optional[FileAudioSource]:
|
||||||
"""Get the next :py:class:`royalnet.audio.RoyalPCMAudio` from the list and advance it.
|
"""Get the next :py:class:`royalnet.audio.FileAudioSource` from the list and advance it.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The next :py:class:`royalnet.audio.RoyalPCMAudio`."""
|
The next :py:class:`royalnet.audio.FileAudioSource`."""
|
||||||
return await self.generator.__anext__()
|
return await self.generator.__anext__()
|
||||||
|
|
||||||
def videos_left(self) -> typing.Union[int, float]:
|
def videos_left(self) -> typing.Union[int, float]:
|
||||||
|
@ -27,30 +28,30 @@ class PlayMode:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
async def _generate_generator(self):
|
async def _generate_generator(self):
|
||||||
"""Factory function for an async generator that changes the ``now_playing`` property either to a :py:class:`discord.audio.RoyalPCMAudio` or to ``None``, then yields the value it changed it to.
|
"""Factory function for an async generator that changes the ``now_playing`` property either to a :py:class:`royalnet.audio.FileAudioSource` or to ``None``, then yields the value it changed it to.
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
The :py:class:`royalnet.audio.RoyalPCMAudio` to be played next."""
|
The :py:class:`royalnet.audio.FileAudioSource` to be played next."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
# This is needed to make the coroutine an async generator
|
# This is needed to make the coroutine an async generator
|
||||||
# noinspection PyUnreachableCode
|
# noinspection PyUnreachableCode
|
||||||
yield NotImplemented
|
yield NotImplemented
|
||||||
|
|
||||||
def add(self, item: RoyalPCMAudio) -> None:
|
def add(self, item: YtdlDiscord) -> None:
|
||||||
"""Add a new :py:class:`royalnet.audio.RoyalPCMAudio` to the PlayMode.
|
"""Add a new :py:class:`royalnet.audio.YtdlDiscord` to the PlayMode.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
item: The item to add to the PlayMode."""
|
item: The item to add to the PlayMode."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def delete(self) -> None:
|
def delete(self) -> None:
|
||||||
"""Delete all :py:class:`royalnet.audio.RoyalPCMAudio` contained inside this PlayMode."""
|
"""Delete all :py:class:`royalnet.audio.YtdlDiscord` contained inside this PlayMode."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def queue_preview(self) -> typing.List[RoyalPCMAudio]:
|
def queue_preview(self) -> typing.List[YtdlDiscord]:
|
||||||
"""Display all the videos in the PlayMode as a list, if possible.
|
"""Display all the videos in the PlayMode as a list, if possible.
|
||||||
|
|
||||||
To be used with `queue` commands, for example.
|
To be used with ``queue`` commands, for example.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
NotImplementedError: If a preview can't be generated.
|
NotImplementedError: If a preview can't be generated.
|
||||||
|
@ -61,9 +62,9 @@ class PlayMode:
|
||||||
|
|
||||||
|
|
||||||
class Playlist(PlayMode):
|
class Playlist(PlayMode):
|
||||||
"""A video list. :py:class:`royalnet.audio.RoyalPCMAudio` played are removed from the list."""
|
"""A video list. :py:class:`royalnet.audio.YtdlDiscord` played are removed from the list."""
|
||||||
|
|
||||||
def __init__(self, starting_list: typing.List[RoyalPCMAudio] = None):
|
def __init__(self, starting_list: typing.List[YtdlDiscord] = None):
|
||||||
"""Create a new Playlist.
|
"""Create a new Playlist.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -71,7 +72,7 @@ class Playlist(PlayMode):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
if starting_list is None:
|
if starting_list is None:
|
||||||
starting_list = []
|
starting_list = []
|
||||||
self.list: typing.List[RoyalPCMAudio] = starting_list
|
self.list: typing.List[YtdlDiscord] = starting_list
|
||||||
|
|
||||||
def videos_left(self) -> typing.Union[int, float]:
|
def videos_left(self) -> typing.Union[int, float]:
|
||||||
return len(self.list)
|
return len(self.list)
|
||||||
|
@ -97,14 +98,14 @@ class Playlist(PlayMode):
|
||||||
while self.list:
|
while self.list:
|
||||||
self.list.pop(0).delete()
|
self.list.pop(0).delete()
|
||||||
|
|
||||||
def queue_preview(self) -> typing.List[RoyalPCMAudio]:
|
def queue_preview(self) -> typing.List[YtdlDiscord]:
|
||||||
return self.list
|
return self.list
|
||||||
|
|
||||||
|
|
||||||
class Pool(PlayMode):
|
class Pool(PlayMode):
|
||||||
"""A :py:class:`royalnet.audio.RoyalPCMAudio` pool. :py:class:`royalnet.audio.RoyalPCMAudio` are selected in random order and are not repeated until every song has been played at least once."""
|
"""A :py:class:`royalnet.audio.RoyalPCMAudio` pool. :py:class:`royalnet.audio.RoyalPCMAudio` are selected in random order and are not repeated until every song has been played at least once."""
|
||||||
|
|
||||||
def __init__(self, starting_pool: typing.List[RoyalPCMAudio] = None):
|
def __init__(self, starting_pool: typing.List[YtdlDiscord] = None):
|
||||||
"""Create a new Pool.
|
"""Create a new Pool.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -112,8 +113,8 @@ class Pool(PlayMode):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
if starting_pool is None:
|
if starting_pool is None:
|
||||||
starting_pool = []
|
starting_pool = []
|
||||||
self.pool: typing.List[RoyalPCMAudio] = starting_pool
|
self.pool: typing.List[YtdlDiscord] = starting_pool
|
||||||
self._pool_copy: typing.List[RoyalPCMAudio] = []
|
self._pool_copy: typing.List[YtdlDiscord] = []
|
||||||
|
|
||||||
def videos_left(self) -> typing.Union[int, float]:
|
def videos_left(self) -> typing.Union[int, float]:
|
||||||
return math.inf
|
return math.inf
|
||||||
|
@ -142,7 +143,7 @@ class Pool(PlayMode):
|
||||||
self.pool = None
|
self.pool = None
|
||||||
self._pool_copy = None
|
self._pool_copy = None
|
||||||
|
|
||||||
def queue_preview(self) -> typing.List[RoyalPCMAudio]:
|
def queue_preview(self) -> typing.List[YtdlDiscord]:
|
||||||
preview_pool = self.pool.copy()
|
preview_pool = self.pool.copy()
|
||||||
random.shuffle(preview_pool)
|
random.shuffle(preview_pool)
|
||||||
return preview_pool
|
return preview_pool
|
||||||
|
|
|
@ -2,6 +2,7 @@ import typing
|
||||||
import discord
|
import discord
|
||||||
import re
|
import re
|
||||||
import ffmpeg
|
import ffmpeg
|
||||||
|
import os
|
||||||
from .ytdlfile import YtdlFile
|
from .ytdlfile import YtdlFile
|
||||||
from .fileaudiosource import FileAudioSource
|
from .fileaudiosource import FileAudioSource
|
||||||
|
|
||||||
|
@ -11,6 +12,9 @@ class YtdlDiscord:
|
||||||
self.ytdl_file: YtdlFile = ytdl_file
|
self.ytdl_file: YtdlFile = ytdl_file
|
||||||
self.pcm_filename: typing.Optional[str] = None
|
self.pcm_filename: typing.Optional[str] = None
|
||||||
|
|
||||||
|
def pcm_available(self):
|
||||||
|
return self.pcm_filename is not None and os.path.exists(self.pcm_filename)
|
||||||
|
|
||||||
def convert_to_pcm(self) -> None:
|
def convert_to_pcm(self) -> None:
|
||||||
if not self.ytdl_file.is_downloaded():
|
if not self.ytdl_file.is_downloaded():
|
||||||
raise FileNotFoundError("File hasn't been downloaded yet")
|
raise FileNotFoundError("File hasn't been downloaded yet")
|
||||||
|
@ -23,6 +27,22 @@ class YtdlDiscord:
|
||||||
)
|
)
|
||||||
self.pcm_filename = destination_filename
|
self.pcm_filename = destination_filename
|
||||||
|
|
||||||
|
def ready_up(self):
|
||||||
|
if not self.ytdl_file.has_info():
|
||||||
|
self.ytdl_file.update_info()
|
||||||
|
if not self.ytdl_file.is_downloaded():
|
||||||
|
self.ytdl_file.download_file()
|
||||||
|
if not self.pcm_available():
|
||||||
|
self.convert_to_pcm()
|
||||||
|
|
||||||
def to_audiosource(self) -> discord.AudioSource:
|
def to_audiosource(self) -> discord.AudioSource:
|
||||||
|
if not self.pcm_available():
|
||||||
|
raise FileNotFoundError("File hasn't been converted to PCM yet")
|
||||||
stream = open(self.pcm_filename, "rb")
|
stream = open(self.pcm_filename, "rb")
|
||||||
return FileAudioSource(stream)
|
return FileAudioSource(stream)
|
||||||
|
|
||||||
|
def delete(self) -> None:
|
||||||
|
if self.pcm_available():
|
||||||
|
os.remove(self.pcm_filename)
|
||||||
|
self.pcm_filename = None
|
||||||
|
self.ytdl_file.delete()
|
||||||
|
|
|
@ -54,12 +54,10 @@ class YtdlFile:
|
||||||
ytdl.download([self.info.webpage_url])
|
ytdl.download([self.info.webpage_url])
|
||||||
self.filename = ytdl.prepare_filename(self.info.__dict__)
|
self.filename = ytdl.prepare_filename(self.info.__dict__)
|
||||||
|
|
||||||
def delete_file(self):
|
def delete(self):
|
||||||
"""Delete the file located at ``self.filename``."""
|
if self.is_downloaded():
|
||||||
if not self.is_downloaded():
|
os.remove(self.filename)
|
||||||
raise FileNotFoundError("The file hasn't been downloaded yet.")
|
self.filename = None
|
||||||
os.remove(self.filename)
|
|
||||||
self.filename = None
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def download_from_url(cls, url: str, **ytdl_args) -> typing.List["YtdlFile"]:
|
def download_from_url(cls, url: str, **ytdl_args) -> typing.List["YtdlFile"]:
|
||||||
|
|
Loading…
Reference in a new issue