mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
commit
6925c51b8e
16 changed files with 291 additions and 320 deletions
|
@ -1,8 +1,9 @@
|
|||
"""Video and audio downloading related classes, mainly used for Discord voice bots."""
|
||||
|
||||
from .playmodes import PlayMode, Playlist, Pool
|
||||
from .youtubedl import YtdlFile, YtdlInfo
|
||||
from .royalpcmfile import RoyalPCMFile
|
||||
from .royalpcmaudio import RoyalPCMAudio
|
||||
from . import playmodes
|
||||
from .ytdlinfo import YtdlInfo
|
||||
from .ytdlfile import YtdlFile
|
||||
from .fileaudiosource import FileAudioSource
|
||||
from .ytdldiscord import YtdlDiscord
|
||||
|
||||
__all__ = ["PlayMode", "Playlist", "Pool", "YtdlFile", "YtdlInfo", "RoyalPCMFile", "RoyalPCMAudio"]
|
||||
__all__ = ["playmodes", "YtdlInfo", "YtdlFile", "FileAudioSource", "YtdlDiscord"]
|
||||
|
|
18
royalnet/audio/errors.py
Normal file
18
royalnet/audio/errors.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
class YtdlError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NotFoundError(YtdlError):
|
||||
pass
|
||||
|
||||
|
||||
class MultipleFilesError(YtdlError):
|
||||
pass
|
||||
|
||||
|
||||
class MissingInfoError(YtdlError):
|
||||
pass
|
||||
|
||||
|
||||
class AlreadyDownloadedError(YtdlError):
|
||||
pass
|
41
royalnet/audio/fileaudiosource.py
Normal file
41
royalnet/audio/fileaudiosource.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
import discord
|
||||
|
||||
|
||||
class FileAudioSource(discord.AudioSource):
|
||||
"""A :py:class:`discord.AudioSource` that uses a :py:class:`io.BufferedIOBase` as an input instead of memory.
|
||||
|
||||
The stream should be in the usual PCM encoding.
|
||||
|
||||
Warning:
|
||||
This AudioSource will consume (and close) the passed stream."""
|
||||
|
||||
def __init__(self, file):
|
||||
self.file = file
|
||||
|
||||
def __repr__(self):
|
||||
if self.file.seekable():
|
||||
return f"<{self.__class__.__name__} @{self.file.tell()}>"
|
||||
else:
|
||||
return f"<{self.__class__.__name__}>"
|
||||
|
||||
def is_opus(self):
|
||||
"""This audio file isn't Opus-encoded, but PCM-encoded.
|
||||
|
||||
Returns:
|
||||
``False``."""
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
"""Reads 20ms worth of audio.
|
||||
|
||||
If the audio is complete, then returning an empty :py:class:`bytes`-like object to signal this is the way to do so."""
|
||||
# If the stream is closed, it should stop playing immediatly
|
||||
if self.file.closed:
|
||||
return b""
|
||||
data: bytes = self.file.read(discord.opus.Encoder.FRAME_SIZE)
|
||||
# If there is no more data to be streamed
|
||||
if len(data) != discord.opus.Encoder.FRAME_SIZE:
|
||||
# Close the file
|
||||
self.file.close()
|
||||
return b""
|
||||
return data
|
|
@ -1,7 +1,8 @@
|
|||
import math
|
||||
import random
|
||||
import typing
|
||||
from .royalpcmaudio import RoyalPCMAudio
|
||||
from .ytdldiscord import YtdlDiscord
|
||||
from .fileaudiosource import FileAudioSource
|
||||
|
||||
|
||||
class PlayMode:
|
||||
|
@ -9,14 +10,14 @@ class PlayMode:
|
|||
|
||||
def __init__(self):
|
||||
"""Create a new PlayMode and initialize the generator inside."""
|
||||
self.now_playing: typing.Optional[RoyalPCMAudio] = None
|
||||
self.now_playing: typing.Optional[YtdlDiscord] = None
|
||||
self.generator: typing.AsyncGenerator = self._generate_generator()
|
||||
|
||||
async def next(self) -> typing.Optional[RoyalPCMAudio]:
|
||||
"""Get the next :py:class:`royalnet.audio.RoyalPCMAudio` from the list and advance it.
|
||||
async def next(self) -> typing.Optional[FileAudioSource]:
|
||||
"""Get the next :py:class:`royalnet.audio.FileAudioSource` from the list and advance it.
|
||||
|
||||
Returns:
|
||||
The next :py:class:`royalnet.audio.RoyalPCMAudio`."""
|
||||
The next :py:class:`royalnet.audio.FileAudioSource`."""
|
||||
return await self.generator.__anext__()
|
||||
|
||||
def videos_left(self) -> typing.Union[int, float]:
|
||||
|
@ -27,30 +28,30 @@ class PlayMode:
|
|||
raise NotImplementedError()
|
||||
|
||||
async def _generate_generator(self):
|
||||
"""Factory function for an async generator that changes the ``now_playing`` property either to a :py:class:`discord.audio.RoyalPCMAudio` or to ``None``, then yields the value it changed it to.
|
||||
"""Factory function for an async generator that changes the ``now_playing`` property either to a :py:class:`royalnet.audio.FileAudioSource` or to ``None``, then yields the value it changed it to.
|
||||
|
||||
Yields:
|
||||
The :py:class:`royalnet.audio.RoyalPCMAudio` to be played next."""
|
||||
The :py:class:`royalnet.audio.FileAudioSource` to be played next."""
|
||||
raise NotImplementedError()
|
||||
# This is needed to make the coroutine an async generator
|
||||
# noinspection PyUnreachableCode
|
||||
yield NotImplemented
|
||||
|
||||
def add(self, item: RoyalPCMAudio) -> None:
|
||||
"""Add a new :py:class:`royalnet.audio.RoyalPCMAudio` to the PlayMode.
|
||||
def add(self, item: YtdlDiscord) -> None:
|
||||
"""Add a new :py:class:`royalnet.audio.YtdlDiscord` to the PlayMode.
|
||||
|
||||
Args:
|
||||
item: The item to add to the PlayMode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete(self) -> None:
|
||||
"""Delete all :py:class:`royalnet.audio.RoyalPCMAudio` contained inside this PlayMode."""
|
||||
"""Delete all :py:class:`royalnet.audio.YtdlDiscord` contained inside this PlayMode."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def queue_preview(self) -> typing.List[RoyalPCMAudio]:
|
||||
def queue_preview(self) -> typing.List[YtdlDiscord]:
|
||||
"""Display all the videos in the PlayMode as a list, if possible.
|
||||
|
||||
To be used with `queue` commands, for example.
|
||||
To be used with ``queue`` commands, for example.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: If a preview can't be generated.
|
||||
|
@ -61,9 +62,9 @@ class PlayMode:
|
|||
|
||||
|
||||
class Playlist(PlayMode):
|
||||
"""A video list. :py:class:`royalnet.audio.RoyalPCMAudio` played are removed from the list."""
|
||||
"""A video list. :py:class:`royalnet.audio.YtdlDiscord` played are removed from the list."""
|
||||
|
||||
def __init__(self, starting_list: typing.List[RoyalPCMAudio] = None):
|
||||
def __init__(self, starting_list: typing.List[YtdlDiscord] = None):
|
||||
"""Create a new Playlist.
|
||||
|
||||
Args:
|
||||
|
@ -71,7 +72,7 @@ class Playlist(PlayMode):
|
|||
super().__init__()
|
||||
if starting_list is None:
|
||||
starting_list = []
|
||||
self.list: typing.List[RoyalPCMAudio] = starting_list
|
||||
self.list: typing.List[YtdlDiscord] = starting_list
|
||||
|
||||
def videos_left(self) -> typing.Union[int, float]:
|
||||
return len(self.list)
|
||||
|
@ -82,9 +83,10 @@ class Playlist(PlayMode):
|
|||
next_video = self.list.pop(0)
|
||||
except IndexError:
|
||||
self.now_playing = None
|
||||
yield None
|
||||
else:
|
||||
self.now_playing = next_video
|
||||
yield self.now_playing
|
||||
yield next_video.spawn_audiosource()
|
||||
if self.now_playing is not None:
|
||||
self.now_playing.delete()
|
||||
|
||||
|
@ -97,14 +99,14 @@ class Playlist(PlayMode):
|
|||
while self.list:
|
||||
self.list.pop(0).delete()
|
||||
|
||||
def queue_preview(self) -> typing.List[RoyalPCMAudio]:
|
||||
def queue_preview(self) -> typing.List[YtdlDiscord]:
|
||||
return self.list
|
||||
|
||||
|
||||
class Pool(PlayMode):
|
||||
"""A :py:class:`royalnet.audio.RoyalPCMAudio` pool. :py:class:`royalnet.audio.RoyalPCMAudio` are selected in random order and are not repeated until every song has been played at least once."""
|
||||
|
||||
def __init__(self, starting_pool: typing.List[RoyalPCMAudio] = None):
|
||||
def __init__(self, starting_pool: typing.List[YtdlDiscord] = None):
|
||||
"""Create a new Pool.
|
||||
|
||||
Args:
|
||||
|
@ -112,8 +114,8 @@ class Pool(PlayMode):
|
|||
super().__init__()
|
||||
if starting_pool is None:
|
||||
starting_pool = []
|
||||
self.pool: typing.List[RoyalPCMAudio] = starting_pool
|
||||
self._pool_copy: typing.List[RoyalPCMAudio] = []
|
||||
self.pool: typing.List[YtdlDiscord] = starting_pool
|
||||
self._pool_copy: typing.List[YtdlDiscord] = []
|
||||
|
||||
def videos_left(self) -> typing.Union[int, float]:
|
||||
return math.inf
|
||||
|
@ -129,7 +131,7 @@ class Pool(PlayMode):
|
|||
while self._pool_copy:
|
||||
next_video = self._pool_copy.pop(0)
|
||||
self.now_playing = next_video
|
||||
yield next_video
|
||||
yield next_video.spawn_audiosource()
|
||||
|
||||
def add(self, item) -> None:
|
||||
self.pool.append(item)
|
||||
|
@ -142,7 +144,7 @@ class Pool(PlayMode):
|
|||
self.pool = None
|
||||
self._pool_copy = None
|
||||
|
||||
def queue_preview(self) -> typing.List[RoyalPCMAudio]:
|
||||
def queue_preview(self) -> typing.List[YtdlDiscord]:
|
||||
preview_pool = self.pool.copy()
|
||||
random.shuffle(preview_pool)
|
||||
return preview_pool
|
|
@ -1,72 +0,0 @@
|
|||
from discord import AudioSource
|
||||
from discord.opus import Encoder as OpusEncoder
|
||||
import typing
|
||||
from .royalpcmfile import RoyalPCMFile
|
||||
|
||||
|
||||
class RoyalPCMAudio(AudioSource):
|
||||
"""A :py:class:`discord.AudioSource` that keeps data in a file instead of in memory."""
|
||||
|
||||
def __init__(self, rpf: "RoyalPCMFile"):
|
||||
"""Create a :py:class:`discord.audio.RoyalPCMAudio` from a :py:class:`royalnet.audio.RoyalPCMFile`.
|
||||
|
||||
Warning:
|
||||
Not recommended, use :py:func:`royalnet.audio.RoyalPCMAudio.create_from_url` or :py:func:`royalnet.audio.RoyalPCMAudio.create_from_ytsearch` instead."""
|
||||
self.rpf: "RoyalPCMFile" = rpf
|
||||
self._file = open(self.rpf.audio_filename, "rb")
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url: str) -> typing.List["RoyalPCMAudio"]:
|
||||
"""Download a file with youtube_dl and create a list of RoyalPCMAudios.
|
||||
|
||||
Parameters:
|
||||
url: The url of the file to download.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video."""
|
||||
rpf_list = RoyalPCMFile.create_from_url(url)
|
||||
return [RoyalPCMAudio(rpf) for rpf in rpf_list]
|
||||
|
||||
@staticmethod
|
||||
def create_from_ytsearch(search: str, amount: int = 1) -> typing.List["RoyalPCMAudio"]:
|
||||
"""Search a string on YouTube and download the first ``amount`` number of videos, then download those with youtube_dl and create a list of RoyalPCMAudios.
|
||||
|
||||
Parameters:
|
||||
search: The string to search on YouTube.
|
||||
amount: The number of videos to download.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video."""
|
||||
rpf_list = RoyalPCMFile.create_from_ytsearch(search, amount)
|
||||
return [RoyalPCMAudio(rpf) for rpf in rpf_list]
|
||||
|
||||
def is_opus(self):
|
||||
"""This audio file isn't Opus-encoded, but PCM-encoded.
|
||||
|
||||
Returns:
|
||||
``False``."""
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
"""Reads 20ms worth of audio.
|
||||
|
||||
If the audio is complete, then returning an empty :py:class:`bytes`-like object to signal this is the way to do so."""
|
||||
data: bytes = self._file.read(OpusEncoder.FRAME_SIZE)
|
||||
# If the file was externally closed, it means it was deleted
|
||||
if self._file.closed:
|
||||
return b""
|
||||
if len(data) != OpusEncoder.FRAME_SIZE:
|
||||
# Close the file as soon as the playback is finished
|
||||
self._file.close()
|
||||
# Reopen the file, so it can be reused
|
||||
self._file = open(self.rpf.audio_filename, "rb")
|
||||
return b""
|
||||
return data
|
||||
|
||||
def delete(self):
|
||||
"""Permanently delete the downloaded file."""
|
||||
self._file.close()
|
||||
self.rpf.delete_audio_file()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<RoyalPCMAudio {self.rpf.audio_filename}>"
|
|
@ -1,102 +0,0 @@
|
|||
import logging
|
||||
import ffmpeg
|
||||
import os
|
||||
import typing
|
||||
import time
|
||||
import datetime
|
||||
from .youtubedl import YtdlFile, YtdlInfo
|
||||
from ..error import FileTooBigError
|
||||
from ..utils import fileformat
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoyalPCMFile(YtdlFile):
|
||||
ytdl_args = {
|
||||
"format": "bestaudio" # Fetch the best audio format available
|
||||
}
|
||||
|
||||
def __init__(self, info: "YtdlInfo", **ytdl_args):
|
||||
# Preemptively initialize info to be able to generate the filename
|
||||
self.info = info
|
||||
# If the video is longer than 3 hours, don't download it
|
||||
if self.info.duration >= datetime.timedelta(hours=3):
|
||||
raise FileTooBigError("File is over 3 hours long")
|
||||
# Set the time to generate the filename
|
||||
self._time = time.time()
|
||||
# Ensure the file doesn't already exist
|
||||
if os.path.exists(self.ytdl_filename) or os.path.exists(self.audio_filename):
|
||||
raise FileExistsError("Can't overwrite file")
|
||||
# Overwrite the new ytdl_args
|
||||
self.ytdl_args = {**self.ytdl_args, **ytdl_args}
|
||||
log.info(f"Now downloading {info.webpage_url}")
|
||||
super().__init__(info, outtmpl=self.ytdl_filename, **self.ytdl_args)
|
||||
# Find the audio_filename with a regex (should be video.opus)
|
||||
log.info(f"Converting {self.video_filename}...")
|
||||
# Convert the video to pcm
|
||||
try:
|
||||
ffmpeg.input(f"./{self.video_filename}") \
|
||||
.output(self.audio_filename, format="s16le", ac=2, ar="48000") \
|
||||
.overwrite_output() \
|
||||
.run(quiet=True)
|
||||
except ffmpeg.Error as exc:
|
||||
log.error(f"FFmpeg error: {exc.stderr}")
|
||||
raise
|
||||
# Delete the video file
|
||||
log.info(f"Deleting {self.video_filename}")
|
||||
self.delete_video_file()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<RoyalPCMFile {self.audio_filename}>"
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url: str, **ytdl_args) -> typing.List["RoyalPCMFile"]:
|
||||
"""Download a file with youtube_dl and create a list of :py:class:`discord.audio.RoyalPCMFile`.
|
||||
|
||||
Parameters:
|
||||
url: The url of the file to download.
|
||||
ytdl_args: Extra arguments to be passed to YoutubeDL while downloading.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video."""
|
||||
info_list = YtdlInfo.create_from_url(url)
|
||||
return [RoyalPCMFile(info, **ytdl_args) for info in info_list]
|
||||
|
||||
@staticmethod
|
||||
def create_from_ytsearch(search: str, amount: int = 1, **ytdl_args) -> typing.List["RoyalPCMFile"]:
|
||||
"""Search a string on YouTube and download the first ``amount`` number of videos, then download those with youtube_dl and create a list of :py:class:`discord.audio.RoyalPCMFile`.
|
||||
|
||||
Parameters:
|
||||
search: The string to search on YouTube.
|
||||
amount: The number of videos to download.
|
||||
ytdl_args: Extra arguments to be passed to YoutubeDL while downloading.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMFiles, each corresponding to a downloaded video."""
|
||||
url = f"ytsearch{amount}:{search}"
|
||||
info_list = YtdlInfo.create_from_url(url)
|
||||
return [RoyalPCMFile(info, **ytdl_args) for info in info_list]
|
||||
|
||||
@property
|
||||
def ytdl_filename(self) -> str:
|
||||
"""
|
||||
Returns:
|
||||
The name of the downloaded video file, as a :py:class:`str`.
|
||||
|
||||
Warning:
|
||||
It's going to be deleted as soon as the :py:func:`royalnet.audio.RoyalPCMFile.__init__` function has completed, so it's probably not going to be very useful...
|
||||
"""
|
||||
return f"./downloads/{fileformat(self.info.title)}-{fileformat(str(int(self._time)))}.ytdl"
|
||||
|
||||
@property
|
||||
def audio_filename(self) -> str:
|
||||
"""
|
||||
Returns:
|
||||
The name of the downloaded and PCM-converted audio file."""
|
||||
return f"./downloads/{fileformat(self.info.title)}-{fileformat(str(int(self._time)))}.pcm"
|
||||
|
||||
def delete_audio_file(self):
|
||||
"""Delete the PCM-converted audio file."""
|
||||
log.info(f"Deleting {self.audio_filename}")
|
||||
os.remove(self.audio_filename)
|
76
royalnet/audio/ytdldiscord.py
Normal file
76
royalnet/audio/ytdldiscord.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
import typing
|
||||
import discord
|
||||
import re
|
||||
import ffmpeg
|
||||
import os
|
||||
from .ytdlinfo import YtdlInfo
|
||||
from .ytdlfile import YtdlFile
|
||||
from .fileaudiosource import FileAudioSource
|
||||
|
||||
|
||||
class YtdlDiscord:
|
||||
def __init__(self, ytdl_file: YtdlFile):
|
||||
self.ytdl_file: YtdlFile = ytdl_file
|
||||
self.pcm_filename: typing.Optional[str] = None
|
||||
self._fas_spawned: typing.List[FileAudioSource] = []
|
||||
|
||||
def pcm_available(self):
|
||||
return self.pcm_filename is not None and os.path.exists(self.pcm_filename)
|
||||
|
||||
def convert_to_pcm(self) -> None:
|
||||
if not self.ytdl_file.is_downloaded():
|
||||
raise FileNotFoundError("File hasn't been downloaded yet")
|
||||
destination_filename = re.sub(r"\.[^.]+$", ".pcm", self.ytdl_file.filename)
|
||||
(
|
||||
ffmpeg.input(self.ytdl_file.filename)
|
||||
.output(destination_filename, format="s16le", ac=2, ar="48000")
|
||||
.overwrite_output()
|
||||
.run(quiet=True)
|
||||
)
|
||||
self.pcm_filename = destination_filename
|
||||
|
||||
def ready_up(self):
|
||||
if not self.ytdl_file.has_info():
|
||||
self.ytdl_file.update_info()
|
||||
if not self.ytdl_file.is_downloaded():
|
||||
self.ytdl_file.download_file()
|
||||
if not self.pcm_available():
|
||||
self.convert_to_pcm()
|
||||
|
||||
def spawn_audiosource(self) -> discord.AudioSource:
|
||||
if not self.pcm_available():
|
||||
raise FileNotFoundError("File hasn't been converted to PCM yet")
|
||||
stream = open(self.pcm_filename, "rb")
|
||||
source = FileAudioSource(stream)
|
||||
# FIXME: it's a intentional memory leak
|
||||
self._fas_spawned.append(source)
|
||||
return source
|
||||
|
||||
def delete(self) -> None:
|
||||
if self.pcm_available():
|
||||
for source in self._fas_spawned:
|
||||
if not source.file.closed:
|
||||
source.file.close()
|
||||
os.remove(self.pcm_filename)
|
||||
self.pcm_filename = None
|
||||
self.ytdl_file.delete()
|
||||
|
||||
@classmethod
|
||||
def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlDiscord"]:
|
||||
files = YtdlFile.download_from_url(url, **ytdl_args)
|
||||
dfiles = []
|
||||
for file in files:
|
||||
dfile = YtdlDiscord(file)
|
||||
dfiles.append(dfile)
|
||||
return dfiles
|
||||
|
||||
@classmethod
|
||||
def create_and_ready_from_url(cls, url, **ytdl_args) -> typing.List["YtdlDiscord"]:
|
||||
dfiles = cls.create_from_url(url, **ytdl_args)
|
||||
for dfile in dfiles:
|
||||
dfile.ready_up()
|
||||
return dfiles
|
||||
|
||||
@property
|
||||
def info(self) -> typing.Optional[YtdlInfo]:
|
||||
return self.ytdl_file.info
|
70
royalnet/audio/ytdlfile.py
Normal file
70
royalnet/audio/ytdlfile.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
import contextlib
|
||||
import os
|
||||
import typing
|
||||
import youtube_dl
|
||||
from .ytdlinfo import YtdlInfo
|
||||
from .errors import NotFoundError, MultipleFilesError, MissingInfoError, AlreadyDownloadedError
|
||||
|
||||
|
||||
class YtdlFile:
|
||||
"""Information about a youtube-dl downloaded file."""
|
||||
|
||||
_default_ytdl_args = {
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
"outtmpl": "%(title)s-%(id)s.%(ext)s" # Use the default outtmpl.
|
||||
}
|
||||
|
||||
def __init__(self,
|
||||
url: str,
|
||||
info: typing.Optional[YtdlInfo] = None,
|
||||
filename: typing.Optional[str] = None):
|
||||
self.url: str = url
|
||||
self.info: typing.Optional[YtdlInfo] = info
|
||||
self.filename: typing.Optional[str] = filename
|
||||
|
||||
def has_info(self) -> bool:
|
||||
return self.info is not None
|
||||
|
||||
def is_downloaded(self) -> bool:
|
||||
return self.filename is not None and os.path.exists(self.filename)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open(self):
|
||||
if not self.is_downloaded():
|
||||
raise FileNotFoundError("The file hasn't been downloaded yet.")
|
||||
with open(self.filename, "r") as file:
|
||||
yield file
|
||||
|
||||
def update_info(self, **ytdl_args) -> None:
|
||||
infos = YtdlInfo.retrieve_for_url(self.url, **ytdl_args)
|
||||
if len(infos) == 0:
|
||||
raise NotFoundError()
|
||||
elif len(infos) > 1:
|
||||
raise MultipleFilesError()
|
||||
self.info = infos[0]
|
||||
|
||||
def download_file(self, **ytdl_args) -> None:
|
||||
if not self.has_info():
|
||||
raise MissingInfoError()
|
||||
if self.is_downloaded():
|
||||
raise AlreadyDownloadedError()
|
||||
with youtube_dl.YoutubeDL({**self._default_ytdl_args, **ytdl_args}) as ytdl:
|
||||
ytdl.download([self.info.webpage_url])
|
||||
self.filename = ytdl.prepare_filename(self.info.__dict__)
|
||||
|
||||
def delete(self):
|
||||
if self.is_downloaded():
|
||||
os.remove(self.filename)
|
||||
self.filename = None
|
||||
|
||||
@classmethod
|
||||
def download_from_url(cls, url: str, **ytdl_args) -> typing.List["YtdlFile"]:
|
||||
infos = YtdlInfo.retrieve_for_url(url, **ytdl_args)
|
||||
files = []
|
||||
for info in infos:
|
||||
file = YtdlFile(url=info.webpage_url, info=info)
|
||||
file.download_file(**ytdl_args)
|
||||
files.append(file)
|
||||
return files
|
|
@ -1,95 +1,33 @@
|
|||
import typing
|
||||
import logging as _logging
|
||||
import discord
|
||||
import os
|
||||
import dateparser
|
||||
import datetime
|
||||
from youtube_dl import YoutubeDL
|
||||
from ..utils import ytdldateformat
|
||||
|
||||
log = _logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloaderError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InterruptDownload(DownloaderError):
|
||||
"""Raised from a progress_hook to interrupt the video download."""
|
||||
|
||||
|
||||
class YtdlFile:
|
||||
"""A wrapper around a youtube_dl downloaded file."""
|
||||
|
||||
ytdl_args = {
|
||||
"logger": log, # Log messages to a logging.Logger instance.
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
}
|
||||
|
||||
def __init__(self, info: "YtdlInfo", outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args):
|
||||
self.info: "YtdlInfo" = info
|
||||
self.video_filename: str
|
||||
# Create a local args copy
|
||||
ytdl_args["outtmpl"] = outtmpl
|
||||
self.ytdl_args = {**self.ytdl_args, **ytdl_args}
|
||||
# Create the ytdl
|
||||
ytdl = YoutubeDL(ytdl_args)
|
||||
# Find the file name
|
||||
self.video_filename = ytdl.prepare_filename(self.info.__dict__)
|
||||
# Download the file
|
||||
ytdl.download([self.info.webpage_url])
|
||||
# Final checks
|
||||
assert os.path.exists(self.video_filename)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<YtdlFile {self.video_filename}>"
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url, outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args) -> typing.List["YtdlFile"]:
|
||||
"""Download the videos at the specified url.
|
||||
|
||||
Parameters:
|
||||
url: The url to download the videos from.
|
||||
outtmpl: The filename that the downloaded videos are going to have. The name can be formatted according to the `outtmpl documentation <https://github.com/ytdl-org/youtube-dl/blob/master/README.md#output-template>`_.
|
||||
ytdl_args: Other arguments to be passed to the YoutubeDL object.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of YtdlFiles."""
|
||||
info_list = YtdlInfo.create_from_url(url)
|
||||
return [info.download(outtmpl, **ytdl_args) for info in info_list]
|
||||
|
||||
def _stop_download(self):
|
||||
"""I have no clue of what this does, or why is it here. Possibly remove it?
|
||||
|
||||
Raises:
|
||||
InterruptDownload: ...uhhh, always?"""
|
||||
raise InterruptDownload()
|
||||
|
||||
def delete_video_file(self):
|
||||
"""Delete the file located at ``self.video_filename``.
|
||||
|
||||
Note:
|
||||
No checks are done when deleting, so it may try to delete a non-existing file and raise an exception or do some other weird stuff with weird filenames."""
|
||||
os.remove(self.video_filename)
|
||||
import dateparser
|
||||
import youtube_dl
|
||||
import discord
|
||||
import royalnet.utils as u
|
||||
|
||||
|
||||
class YtdlInfo:
|
||||
"""A wrapper around youtube_dl extracted info."""
|
||||
|
||||
_default_ytdl_args = {
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
"outtmpl": "%(title)s-%(id)s.%(ext)s" # Use the default outtmpl.
|
||||
}
|
||||
|
||||
def __init__(self, info: typing.Dict[str, typing.Any]):
|
||||
"""Create a YtdlInfo from the dict returned by the :py:func:`youtube_dl.YoutubeDL.extract_info` function.
|
||||
|
||||
Warning:
|
||||
Does not download the info, for that use :py:func:`royalnet.audio.YtdlInfo.create_from_url`."""
|
||||
Does not download the info, for that use :py:func:`royalnet.audio.YtdlInfo.retrieve_for_url`."""
|
||||
self.id: typing.Optional[str] = info.get("id")
|
||||
self.uploader: typing.Optional[str] = info.get("uploader")
|
||||
self.uploader_id: typing.Optional[str] = info.get("uploader_id")
|
||||
self.uploader_url: typing.Optional[str] = info.get("uploader_url")
|
||||
self.channel_id: typing.Optional[str] = info.get("channel_id")
|
||||
self.channel_url: typing.Optional[str] = info.get("channel_url")
|
||||
self.upload_date: typing.Optional[datetime.datetime] = dateparser.parse(ytdldateformat(info.get("upload_date")))
|
||||
self.upload_date: typing.Optional[datetime.datetime] = dateparser.parse(u.ytdldateformat(info.get("upload_date")))
|
||||
self.license: typing.Optional[str] = info.get("license")
|
||||
self.creator: typing.Optional[...] = info.get("creator")
|
||||
self.title: typing.Optional[str] = info.get("title")
|
||||
|
@ -140,26 +78,22 @@ class YtdlInfo:
|
|||
self.abr: typing.Optional[int] = info.get("abr")
|
||||
self.ext: typing.Optional[str] = info.get("ext")
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url, **ytdl_args) -> typing.List["YtdlInfo"]:
|
||||
@classmethod
|
||||
def retrieve_for_url(cls, url, **ytdl_args) -> typing.List["YtdlInfo"]:
|
||||
"""Fetch the info for an url through YoutubeDL.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` containing the infos for the requested videos."""
|
||||
# So many redundant options!
|
||||
ytdl = YoutubeDL({
|
||||
"logger": log, # Log messages to a logging.Logger instance.
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
**ytdl_args
|
||||
})
|
||||
ytdl = youtube_dl.YoutubeDL({**cls._default_ytdl_args, **ytdl_args})
|
||||
first_info = ytdl.extract_info(url=url, download=False)
|
||||
# If it is a playlist, create multiple videos!
|
||||
if "entries" in first_info:
|
||||
return [YtdlInfo(second_info) for second_info in first_info["entries"]]
|
||||
return [YtdlInfo(first_info)]
|
||||
|
||||
def download(self, outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args) -> YtdlFile:
|
||||
return YtdlFile(self, outtmpl, **ytdl_args)
|
||||
|
||||
def to_discord_embed(self) -> discord.Embed:
|
||||
"""Return this info as a :py:class:`discord.Embed`."""
|
||||
embed = discord.Embed(title=self.title,
|
||||
colour=discord.Colour(0xcc0000),
|
||||
url=self.webpage_url)
|
||||
|
@ -179,6 +113,7 @@ class YtdlInfo:
|
|||
return f"<YtdlInfo id={self.id} ...>"
|
||||
|
||||
def __str__(self):
|
||||
"""Return the video name."""
|
||||
if self.title:
|
||||
return self.title
|
||||
if self.webpage_url:
|
|
@ -8,7 +8,7 @@ from ..utils import asyncify, Call, Command, discord_escape
|
|||
from ..error import UnregisteredError, NoneFoundError, TooManyFoundError, InvalidConfigError, RoyalnetResponseError
|
||||
from ..network import RoyalnetConfig, Request, ResponseSuccess, ResponseError
|
||||
from ..database import DatabaseConfig
|
||||
from ..audio import PlayMode, Playlist, RoyalPCMAudio
|
||||
from ..audio import playmodes, YtdlDiscord
|
||||
|
||||
log = _logging.getLogger(__name__)
|
||||
|
||||
|
@ -31,7 +31,7 @@ class DiscordBot(GenericBot):
|
|||
def _init_voice(self):
|
||||
"""Initialize the variables needed for the connection to voice chat."""
|
||||
log.debug(f"Creating music_data dict")
|
||||
self.music_data: typing.Dict[discord.Guild, PlayMode] = {}
|
||||
self.music_data: typing.Dict[discord.Guild, playmodes.PlayMode] = {}
|
||||
|
||||
def _call_factory(self) -> typing.Type[Call]:
|
||||
log.debug(f"Creating DiscordCall")
|
||||
|
@ -100,7 +100,7 @@ class DiscordBot(GenericBot):
|
|||
# Create a music_data entry, if it doesn't exist; default is a Playlist
|
||||
if not self.music_data.get(channel.guild):
|
||||
log.debug(f"Creating music_data for {channel.guild}")
|
||||
self.music_data[channel.guild] = Playlist()
|
||||
self.music_data[channel.guild] = playmodes.Playlist()
|
||||
|
||||
@staticmethod
|
||||
async def on_message(message: discord.Message):
|
||||
|
@ -216,12 +216,12 @@ class DiscordBot(GenericBot):
|
|||
await self.client.connect()
|
||||
# TODO: how to stop?
|
||||
|
||||
async def add_to_music_data(self, audio_sources: typing.List[RoyalPCMAudio], guild: discord.Guild):
|
||||
"""Add a file to the corresponding music_data object."""
|
||||
async def add_to_music_data(self, dfiles: typing.List[YtdlDiscord], guild: discord.Guild):
|
||||
"""Add a list of :py:class:`royalnet.audio.YtdlDiscord` to the corresponding music_data object."""
|
||||
guild_music_data = self.music_data[guild]
|
||||
for audio_source in audio_sources:
|
||||
log.debug(f"Adding {audio_source} to music_data")
|
||||
guild_music_data.add(audio_source)
|
||||
for dfile in dfiles:
|
||||
log.debug(f"Adding {dfile} to music_data")
|
||||
guild_music_data.add(dfile)
|
||||
if guild_music_data.now_playing is None:
|
||||
await self.advance_music_data(guild)
|
||||
|
||||
|
@ -229,7 +229,7 @@ class DiscordBot(GenericBot):
|
|||
"""Try to play the next song, while it exists. Otherwise, just return."""
|
||||
guild_music_data = self.music_data[guild]
|
||||
voice_client = self.client.find_voice_client_by_guild(guild)
|
||||
next_source: RoyalPCMAudio = await guild_music_data.next()
|
||||
next_source: discord.AudioSource = await guild_music_data.next()
|
||||
await self.update_activity_with_source_title()
|
||||
if next_source is None:
|
||||
log.debug(f"Ending playback chain")
|
||||
|
@ -252,16 +252,14 @@ class DiscordBot(GenericBot):
|
|||
log.debug(f"Updating current Activity: setting to None, as multiple guilds are using the bot")
|
||||
await self.client.change_presence(status=discord.Status.online)
|
||||
return
|
||||
# FIXME: PyCharm faulty inspection?
|
||||
# noinspection PyUnresolvedReferences
|
||||
play_mode: PlayMode = list(self.music_data.items())[0][1]
|
||||
play_mode: playmodes.PlayMode = self.music_data[list(self.music_data)[0]]
|
||||
now_playing = play_mode.now_playing
|
||||
if now_playing is None:
|
||||
# No songs are playing now
|
||||
log.debug(f"Updating current Activity: setting to None, as nothing is currently being played")
|
||||
await self.client.change_presence(status=discord.Status.online)
|
||||
return
|
||||
log.debug(f"Updating current Activity: listening to {now_playing.rpf.info.title}")
|
||||
await self.client.change_presence(activity=discord.Activity(name=now_playing.rpf.info.title,
|
||||
log.debug(f"Updating current Activity: listening to {now_playing.info.title}")
|
||||
await self.client.change_presence(activity=discord.Activity(name=now_playing.info.title,
|
||||
type=discord.ActivityType.listening),
|
||||
status=discord.Status.online)
|
||||
|
|
|
@ -94,7 +94,7 @@ class CvNH(NetworkHandler):
|
|||
except AttributeError:
|
||||
pass
|
||||
elif member.activity.type == discord.ActivityType.streaming:
|
||||
message += f" | 📡 {member.activity.url})"
|
||||
message += f" | 📡 {member.activity.url}"
|
||||
elif member.activity.type == discord.ActivityType.listening:
|
||||
if isinstance(member.activity, discord.Spotify):
|
||||
if member.activity.title == member.activity.album:
|
||||
|
|
|
@ -6,11 +6,17 @@ import pickle
|
|||
from ..utils import Command, Call, NetworkHandler, asyncify
|
||||
from ..network import Request, ResponseSuccess
|
||||
from ..error import TooManyFoundError, NoneFoundError
|
||||
from ..audio import RoyalPCMAudio
|
||||
from ..audio import YtdlDiscord
|
||||
if typing.TYPE_CHECKING:
|
||||
from ..bots import DiscordBot
|
||||
|
||||
|
||||
ytdl_args = {
|
||||
"format": "bestaudio",
|
||||
"outtmpl": f"./downloads/%(autonumber)s_%(title)s.%(ext)s"
|
||||
}
|
||||
|
||||
|
||||
class PlayNH(NetworkHandler):
|
||||
message_type = "music_play"
|
||||
|
||||
|
@ -32,16 +38,16 @@ class PlayNH(NetworkHandler):
|
|||
raise Exception("No music_data for this guild")
|
||||
# Start downloading
|
||||
if data["url"].startswith("http://") or data["url"].startswith("https://"):
|
||||
audio_sources: typing.List[RoyalPCMAudio] = await asyncify(RoyalPCMAudio.create_from_url, data["url"])
|
||||
dfiles: typing.List[YtdlDiscord] = await asyncify(YtdlDiscord.create_and_ready_from_url, data["url"], **ytdl_args)
|
||||
else:
|
||||
audio_sources = await asyncify(RoyalPCMAudio.create_from_ytsearch, data["url"])
|
||||
await bot.add_to_music_data(audio_sources, guild)
|
||||
dfiles = await asyncify(YtdlDiscord.create_and_ready_from_url, f"ytsearch:{data['url']}", **ytdl_args)
|
||||
await bot.add_to_music_data(dfiles, guild)
|
||||
# Create response dictionary
|
||||
response = {
|
||||
"videos": [{
|
||||
"title": source.rpf.info.title,
|
||||
"discord_embed_pickle": str(pickle.dumps(source.rpf.info.to_discord_embed()))
|
||||
} for source in audio_sources]
|
||||
"title": dfile.info.title,
|
||||
"discord_embed_pickle": str(pickle.dumps(dfile.info.to_discord_embed()))
|
||||
} for dfile in dfiles]
|
||||
}
|
||||
return ResponseSuccess(response)
|
||||
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import typing
|
||||
import asyncio
|
||||
from ..utils import Command, Call, NetworkHandler
|
||||
from ..network import Request, ResponseSuccess
|
||||
from ..error import NoneFoundError, TooManyFoundError, CurrentlyDisabledError
|
||||
from ..audio import Playlist, Pool
|
||||
from ..error import NoneFoundError, TooManyFoundError
|
||||
from ..audio.playmodes import Playlist, Pool
|
||||
if typing.TYPE_CHECKING:
|
||||
from ..bots import DiscordBot
|
||||
|
||||
|
@ -30,8 +29,7 @@ class PlaymodeNH(NetworkHandler):
|
|||
if data["mode_name"] == "playlist":
|
||||
bot.music_data[guild] = Playlist()
|
||||
elif data["mode_name"] == "pool":
|
||||
# bot.music_data[guild] = Pool()
|
||||
raise CurrentlyDisabledError("Bug: https://github.com/royal-games/royalnet/issues/61")
|
||||
bot.music_data[guild] = Pool()
|
||||
else:
|
||||
raise ValueError("No such PlayMode")
|
||||
return ResponseSuccess()
|
||||
|
|
|
@ -37,8 +37,8 @@ class QueueNH(NetworkHandler):
|
|||
"type": playmode.__class__.__name__,
|
||||
"queue":
|
||||
{
|
||||
"strings": [str(element.rpf.info) for element in queue],
|
||||
"pickled_embeds": str(pickle.dumps([element.rpf.info.to_discord_embed() for element in queue]))
|
||||
"strings": [str(dfile.info) for dfile in queue],
|
||||
"pickled_embeds": str(pickle.dumps([dfile.info.to_discord_embed() for dfile in queue]))
|
||||
}
|
||||
})
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ class VideoinfoCommand(Command):
|
|||
@classmethod
|
||||
async def common(cls, call: Call):
|
||||
url = call.args[0]
|
||||
info_list = await asyncify(YtdlInfo.create_from_url, url)
|
||||
info_list = await asyncify(YtdlInfo.retrieve_for_url, url)
|
||||
for info in info_list:
|
||||
info_dict = info.__dict__
|
||||
message = f"🔍 Dati di [b]{info}[/b]:\n"
|
||||
|
|
|
@ -23,7 +23,7 @@ log.setLevel(logging.INFO)
|
|||
commands = [PingCommand, ShipCommand, SmecdsCommand, ColorCommand, CiaoruoziCommand, SyncCommand,
|
||||
DiarioCommand, RageCommand, ReminderCommand, KvactiveCommand, KvCommand,
|
||||
KvrollCommand, SummonCommand, PlayCommand, SkipCommand, PlaymodeCommand,
|
||||
VideochannelCommand, CvCommand, PauseCommand, QueueCommand, RoyalnetprofileCommand]
|
||||
VideochannelCommand, CvCommand, PauseCommand, QueueCommand, RoyalnetprofileCommand, VideoinfoCommand]
|
||||
|
||||
# noinspection PyUnreachableCode
|
||||
if __debug__:
|
||||
|
|
Loading…
Reference in a new issue