From 638e113edbe341e9ba941a2b43a3270d1aaa1e63 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Tue, 30 Jul 2019 22:34:11 +0200 Subject: [PATCH] More progress has been done --- royalnet/audio/__init__.py | 2 +- royalnet/audio/errors.py | 18 ++++++ royalnet/audio/royalpcmaudio.py | 72 ---------------------- royalnet/audio/royalpcmfile.py | 102 -------------------------------- royalnet/audio/youtubedl.py | 71 ---------------------- royalnet/audio/ytdldiscord.py | 2 + royalnet/audio/ytdlfile.py | 72 ++++++++++++++++++++++ royalnet/audio/ytdlinfo.py | 18 +++--- 8 files changed, 103 insertions(+), 254 deletions(-) create mode 100644 royalnet/audio/errors.py delete mode 100644 royalnet/audio/royalpcmaudio.py delete mode 100644 royalnet/audio/royalpcmfile.py delete mode 100644 royalnet/audio/youtubedl.py create mode 100644 royalnet/audio/ytdldiscord.py create mode 100644 royalnet/audio/ytdlfile.py diff --git a/royalnet/audio/__init__.py b/royalnet/audio/__init__.py index 96b3f35b..0fbf4e24 100644 --- a/royalnet/audio/__init__.py +++ b/royalnet/audio/__init__.py @@ -1,7 +1,7 @@ """Video and audio downloading related classes, mainly used for Discord voice bots.""" from .playmodes import PlayMode, Playlist, Pool -from .youtubedl import YtdlFile, YtdlInfo +from .ytdlfile import YtdlFile, YtdlInfo from .royalpcmfile import RoyalPCMFile from .royalpcmaudio import RoyalPCMAudio diff --git a/royalnet/audio/errors.py b/royalnet/audio/errors.py new file mode 100644 index 00000000..e6fddb89 --- /dev/null +++ b/royalnet/audio/errors.py @@ -0,0 +1,18 @@ +class YtdlError(Exception): + pass + + +class NotFoundError(YtdlError): + pass + + +class MultipleFilesError(YtdlError): + pass + + +class MissingInfoError(YtdlError): + pass + + +class AlreadyDownloadedError(YtdlError): + pass diff --git a/royalnet/audio/royalpcmaudio.py b/royalnet/audio/royalpcmaudio.py deleted file mode 100644 index 76ddc7e9..00000000 --- a/royalnet/audio/royalpcmaudio.py +++ /dev/null @@ -1,72 +0,0 @@ -from discord import AudioSource -from discord.opus import Encoder as OpusEncoder -import typing -from .royalpcmfile import RoyalPCMFile - - -class RoyalPCMAudio(AudioSource): - """A :py:class:`discord.AudioSource` that keeps data in a file instead of in memory.""" - - def __init__(self, rpf: "RoyalPCMFile"): - """Create a :py:class:`discord.audio.RoyalPCMAudio` from a :py:class:`royalnet.audio.RoyalPCMFile`. - - Warning: - Not recommended, use :py:func:`royalnet.audio.RoyalPCMAudio.create_from_url` or :py:func:`royalnet.audio.RoyalPCMAudio.create_from_ytsearch` instead.""" - self.rpf: "RoyalPCMFile" = rpf - self._file = open(self.rpf.audio_filename, "rb") - - @staticmethod - def create_from_url(url: str) -> typing.List["RoyalPCMAudio"]: - """Download a file with youtube_dl and create a list of RoyalPCMAudios. - - Parameters: - url: The url of the file to download. - - Returns: - A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video.""" - rpf_list = RoyalPCMFile.create_from_url(url) - return [RoyalPCMAudio(rpf) for rpf in rpf_list] - - @staticmethod - def create_from_ytsearch(search: str, amount: int = 1) -> typing.List["RoyalPCMAudio"]: - """Search a string on YouTube and download the first ``amount`` number of videos, then download those with youtube_dl and create a list of RoyalPCMAudios. - - Parameters: - search: The string to search on YouTube. - amount: The number of videos to download. - - Returns: - A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video.""" - rpf_list = RoyalPCMFile.create_from_ytsearch(search, amount) - return [RoyalPCMAudio(rpf) for rpf in rpf_list] - - def is_opus(self): - """This audio file isn't Opus-encoded, but PCM-encoded. - - Returns: - ``False``.""" - return False - - def read(self): - """Reads 20ms worth of audio. - - If the audio is complete, then returning an empty :py:class:`bytes`-like object to signal this is the way to do so.""" - data: bytes = self._file.read(OpusEncoder.FRAME_SIZE) - # If the file was externally closed, it means it was deleted - if self._file.closed: - return b"" - if len(data) != OpusEncoder.FRAME_SIZE: - # Close the file as soon as the playback is finished - self._file.close() - # Reopen the file, so it can be reused - self._file = open(self.rpf.audio_filename, "rb") - return b"" - return data - - def delete(self): - """Permanently delete the downloaded file.""" - self._file.close() - self.rpf.delete_audio_file() - - def __repr__(self): - return f"" diff --git a/royalnet/audio/royalpcmfile.py b/royalnet/audio/royalpcmfile.py deleted file mode 100644 index 2f7d4171..00000000 --- a/royalnet/audio/royalpcmfile.py +++ /dev/null @@ -1,102 +0,0 @@ -import logging -import ffmpeg -import os -import typing -import time -import datetime -from .youtubedl import YtdlFile, YtdlInfo -from ..error import FileTooBigError -from ..utils import fileformat - - -log = logging.getLogger(__name__) - - -class RoyalPCMFile(YtdlFile): - ytdl_args = { - "format": "bestaudio" # Fetch the best audio format available - } - - def __init__(self, info: "YtdlInfo", **ytdl_args): - # Preemptively initialize info to be able to generate the filename - self.info = info - # If the video is longer than 3 hours, don't download it - if self.info.duration >= datetime.timedelta(hours=3): - raise FileTooBigError("File is over 3 hours long") - # Set the time to generate the filename - self._time = time.time() - # Ensure the file doesn't already exist - if os.path.exists(self.ytdl_filename) or os.path.exists(self.audio_filename): - raise FileExistsError("Can't overwrite file") - # Overwrite the new ytdl_args - self.ytdl_args = {**self.ytdl_args, **ytdl_args} - log.info(f"Now downloading {info.webpage_url}") - super().__init__(info, outtmpl=self.ytdl_filename, **self.ytdl_args) - # Find the audio_filename with a regex (should be video.opus) - log.info(f"Converting {self.video_filename}...") - # Convert the video to pcm - try: - ffmpeg.input(f"./{self.video_filename}") \ - .output(self.audio_filename, format="s16le", ac=2, ar="48000") \ - .overwrite_output() \ - .run(quiet=True) - except ffmpeg.Error as exc: - log.error(f"FFmpeg error: {exc.stderr}") - raise - # Delete the video file - log.info(f"Deleting {self.video_filename}") - self.delete_video_file() - - def __repr__(self): - return f"" - - @staticmethod - def create_from_url(url: str, **ytdl_args) -> typing.List["RoyalPCMFile"]: - """Download a file with youtube_dl and create a list of :py:class:`discord.audio.RoyalPCMFile`. - - Parameters: - url: The url of the file to download. - ytdl_args: Extra arguments to be passed to YoutubeDL while downloading. - - Returns: - A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video.""" - info_list = YtdlInfo.create_from_url(url) - return [RoyalPCMFile(info, **ytdl_args) for info in info_list] - - @staticmethod - def create_from_ytsearch(search: str, amount: int = 1, **ytdl_args) -> typing.List["RoyalPCMFile"]: - """Search a string on YouTube and download the first ``amount`` number of videos, then download those with youtube_dl and create a list of :py:class:`discord.audio.RoyalPCMFile`. - - Parameters: - search: The string to search on YouTube. - amount: The number of videos to download. - ytdl_args: Extra arguments to be passed to YoutubeDL while downloading. - - Returns: - A :py:class:`list` of RoyalPCMFiles, each corresponding to a downloaded video.""" - url = f"ytsearch{amount}:{search}" - info_list = YtdlInfo.create_from_url(url) - return [RoyalPCMFile(info, **ytdl_args) for info in info_list] - - @property - def ytdl_filename(self) -> str: - """ - Returns: - The name of the downloaded video file, as a :py:class:`str`. - - Warning: - It's going to be deleted as soon as the :py:func:`royalnet.audio.RoyalPCMFile.__init__` function has completed, so it's probably not going to be very useful... - """ - return f"./downloads/{fileformat(self.info.title)}-{fileformat(str(int(self._time)))}.ytdl" - - @property - def audio_filename(self) -> str: - """ - Returns: - The name of the downloaded and PCM-converted audio file.""" - return f"./downloads/{fileformat(self.info.title)}-{fileformat(str(int(self._time)))}.pcm" - - def delete_audio_file(self): - """Delete the PCM-converted audio file.""" - log.info(f"Deleting {self.audio_filename}") - os.remove(self.audio_filename) diff --git a/royalnet/audio/youtubedl.py b/royalnet/audio/youtubedl.py deleted file mode 100644 index 2a25dd78..00000000 --- a/royalnet/audio/youtubedl.py +++ /dev/null @@ -1,71 +0,0 @@ -from .ytdlinfo import YtdlInfo - - -class YtdlFile: - """Information about a youtube-dl downloaded file.""" - - def __init__(self, - url: str, - info: YtdlInfo = None, - filename: str): - ... - - - - -class OldYtdlFile: - """A wrapper around a youtube_dl downloaded file.""" - - ytdl_args = { - "logger": log, # Log messages to a logging.Logger instance. - "quiet": True, # Do not print messages to stdout. - "noplaylist": True, # Download single video instead of a playlist if in doubt. - "no_warnings": True, # Do not print out anything for warnings. - } - - def __init__(self, info: "YtdlInfo", outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args): - self.info: "YtdlInfo" = info - self.video_filename: str - # Create a local args copy - ytdl_args["outtmpl"] = outtmpl - self.ytdl_args = {**self.ytdl_args, **ytdl_args} - # Create the ytdl - ytdl = YoutubeDL(ytdl_args) - # Find the file name - self.video_filename = ytdl.prepare_filename(self.info.__dict__) - # Download the file - ytdl.download([self.info.webpage_url]) - # Final checks - assert os.path.exists(self.video_filename) - - def __repr__(self): - return f"" - - @staticmethod - def create_from_url(url, outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args) -> typing.List["YtdlFile"]: - """Download the videos at the specified url. - - Parameters: - url: The url to download the videos from. - outtmpl: The filename that the downloaded videos are going to have. The name can be formatted according to the `outtmpl documentation `_. - ytdl_args: Other arguments to be passed to the YoutubeDL object. - - Returns: - A :py:class:`list` of YtdlFiles.""" - info_list = YtdlInfo.create_from_url(url) - return [info.download(outtmpl, **ytdl_args) for info in info_list] - - def _stop_download(self): - """I have no clue of what this does, or why is it here. Possibly remove it? - - Raises: - InterruptDownload: ...uhhh, always?""" - raise InterruptDownload() - - def delete_video_file(self): - """Delete the file located at ``self.video_filename``. - - Note: - No checks are done when deleting, so it may try to delete a non-existing file and raise an exception or do some other weird stuff with weird filenames.""" - os.remove(self.video_filename) - diff --git a/royalnet/audio/ytdldiscord.py b/royalnet/audio/ytdldiscord.py new file mode 100644 index 00000000..2271322f --- /dev/null +++ b/royalnet/audio/ytdldiscord.py @@ -0,0 +1,2 @@ +import discord +from .ytdlfile import YtdlFile \ No newline at end of file diff --git a/royalnet/audio/ytdlfile.py b/royalnet/audio/ytdlfile.py new file mode 100644 index 00000000..88647a27 --- /dev/null +++ b/royalnet/audio/ytdlfile.py @@ -0,0 +1,72 @@ +import contextlib +import os +import typing +import youtube_dl +from .ytdlinfo import YtdlInfo +from .errors import NotFoundError, MultipleFilesError, MissingInfoError, AlreadyDownloadedError + + +class YtdlFile: + """Information about a youtube-dl downloaded file.""" + + _default_ytdl_args = { + "quiet": True, # Do not print messages to stdout. + "noplaylist": True, # Download single video instead of a playlist if in doubt. + "no_warnings": True, # Do not print out anything for warnings. + "outtmpl": "%(title)s-%(id)s.%(ext)s" # Use the default outtmpl. + } + + def __init__(self, + url: str, + info: YtdlInfo = None, + filename: str = None): + self.url: str = url + self.info: YtdlInfo = info + self.filename: str = filename + + def has_info(self) -> bool: + return self.info is not None + + def is_downloaded(self) -> bool: + return self.filename is not None and os.path.exists(self.filename) + + @contextlib.contextmanager + def open(self): + if not self.is_downloaded(): + raise FileNotFoundError("The file hasn't been downloaded yet.") + with open(self.filename, "r") as file: + yield file + + def update_info(self, **ytdl_args) -> None: + infos = YtdlInfo.retrieve_for_url(self.url, **ytdl_args) + if len(infos) == 0: + raise NotFoundError() + elif len(infos) > 1: + raise MultipleFilesError() + self.info = infos[0] + + def download_file(self, **ytdl_args) -> None: + if not self.has_info(): + raise MissingInfoError() + if self.is_downloaded(): + raise AlreadyDownloadedError() + with youtube_dl.YoutubeDL({**self._default_ytdl_args, **ytdl_args}) as ytdl: + ytdl.download([self.info.webpage_url]) + self.filename = ytdl.prepare_filename(self.info.__dict__) + + def delete_file(self): + """Delete the file located at ``self.filename``.""" + if not self.is_downloaded(): + raise FileNotFoundError("The file hasn't been downloaded yet.") + os.remove(self.filename) + self.filename = None + + @classmethod + def download_from_url(cls, url: str, **ytdl_args) -> typing.List["YtdlFile"]: + infos = YtdlInfo.retrieve_for_url(url, **ytdl_args) + files = [] + for info in infos: + file = YtdlFile(url=info.webpage_url, info=info) + file.download_file(**ytdl_args) + files.append(file) + return files diff --git a/royalnet/audio/ytdlinfo.py b/royalnet/audio/ytdlinfo.py index 6012e470..b89b22d4 100644 --- a/royalnet/audio/ytdlinfo.py +++ b/royalnet/audio/ytdlinfo.py @@ -9,6 +9,13 @@ import royalnet.utils as u class YtdlInfo: """A wrapper around youtube_dl extracted info.""" + _default_ytdl_args = { + "quiet": True, # Do not print messages to stdout. + "noplaylist": True, # Download single video instead of a playlist if in doubt. + "no_warnings": True, # Do not print out anything for warnings. + "outtmpl": "%(title)s-%(id)s.%(ext)s" # Use the default outtmpl. + } + def __init__(self, info: typing.Dict[str, typing.Any]): """Create a YtdlInfo from the dict returned by the :py:func:`youtube_dl.YoutubeDL.extract_info` function. @@ -71,19 +78,14 @@ class YtdlInfo: self.abr: typing.Optional[int] = info.get("abr") self.ext: typing.Optional[str] = info.get("ext") - @staticmethod - def retrieve_for_url(url, **ytdl_args) -> typing.List["YtdlInfo"]: + @classmethod + def retrieve_for_url(cls, url, **ytdl_args) -> typing.List["YtdlInfo"]: """Fetch the info for an url through YoutubeDL. Returns: A :py:class:`list` containing the infos for the requested videos.""" # So many redundant options! - ytdl = youtube_dl.YoutubeDL({ - "quiet": True, # Do not print messages to stdout. - "noplaylist": True, # Download single video instead of a playlist if in doubt. - "no_warnings": True, # Do not print out anything for warnings. - **ytdl_args - }) + ytdl = youtube_dl.YoutubeDL({**cls._default_ytdl_args, **ytdl_args}) first_info = ytdl.extract_info(url=url, download=False) # If it is a playlist, create multiple videos! if "entries" in first_info: