mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
More progress has been done
This commit is contained in:
parent
a98b6c79fc
commit
638e113edb
8 changed files with 103 additions and 254 deletions
|
@ -1,7 +1,7 @@
|
|||
"""Video and audio downloading related classes, mainly used for Discord voice bots."""
|
||||
|
||||
from .playmodes import PlayMode, Playlist, Pool
|
||||
from .youtubedl import YtdlFile, YtdlInfo
|
||||
from .ytdlfile import YtdlFile, YtdlInfo
|
||||
from .royalpcmfile import RoyalPCMFile
|
||||
from .royalpcmaudio import RoyalPCMAudio
|
||||
|
||||
|
|
18
royalnet/audio/errors.py
Normal file
18
royalnet/audio/errors.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
class YtdlError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NotFoundError(YtdlError):
|
||||
pass
|
||||
|
||||
|
||||
class MultipleFilesError(YtdlError):
|
||||
pass
|
||||
|
||||
|
||||
class MissingInfoError(YtdlError):
|
||||
pass
|
||||
|
||||
|
||||
class AlreadyDownloadedError(YtdlError):
|
||||
pass
|
|
@ -1,72 +0,0 @@
|
|||
from discord import AudioSource
|
||||
from discord.opus import Encoder as OpusEncoder
|
||||
import typing
|
||||
from .royalpcmfile import RoyalPCMFile
|
||||
|
||||
|
||||
class RoyalPCMAudio(AudioSource):
|
||||
"""A :py:class:`discord.AudioSource` that keeps data in a file instead of in memory."""
|
||||
|
||||
def __init__(self, rpf: "RoyalPCMFile"):
|
||||
"""Create a :py:class:`discord.audio.RoyalPCMAudio` from a :py:class:`royalnet.audio.RoyalPCMFile`.
|
||||
|
||||
Warning:
|
||||
Not recommended, use :py:func:`royalnet.audio.RoyalPCMAudio.create_from_url` or :py:func:`royalnet.audio.RoyalPCMAudio.create_from_ytsearch` instead."""
|
||||
self.rpf: "RoyalPCMFile" = rpf
|
||||
self._file = open(self.rpf.audio_filename, "rb")
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url: str) -> typing.List["RoyalPCMAudio"]:
|
||||
"""Download a file with youtube_dl and create a list of RoyalPCMAudios.
|
||||
|
||||
Parameters:
|
||||
url: The url of the file to download.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video."""
|
||||
rpf_list = RoyalPCMFile.create_from_url(url)
|
||||
return [RoyalPCMAudio(rpf) for rpf in rpf_list]
|
||||
|
||||
@staticmethod
|
||||
def create_from_ytsearch(search: str, amount: int = 1) -> typing.List["RoyalPCMAudio"]:
|
||||
"""Search a string on YouTube and download the first ``amount`` number of videos, then download those with youtube_dl and create a list of RoyalPCMAudios.
|
||||
|
||||
Parameters:
|
||||
search: The string to search on YouTube.
|
||||
amount: The number of videos to download.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video."""
|
||||
rpf_list = RoyalPCMFile.create_from_ytsearch(search, amount)
|
||||
return [RoyalPCMAudio(rpf) for rpf in rpf_list]
|
||||
|
||||
def is_opus(self):
|
||||
"""This audio file isn't Opus-encoded, but PCM-encoded.
|
||||
|
||||
Returns:
|
||||
``False``."""
|
||||
return False
|
||||
|
||||
def read(self):
|
||||
"""Reads 20ms worth of audio.
|
||||
|
||||
If the audio is complete, then returning an empty :py:class:`bytes`-like object to signal this is the way to do so."""
|
||||
data: bytes = self._file.read(OpusEncoder.FRAME_SIZE)
|
||||
# If the file was externally closed, it means it was deleted
|
||||
if self._file.closed:
|
||||
return b""
|
||||
if len(data) != OpusEncoder.FRAME_SIZE:
|
||||
# Close the file as soon as the playback is finished
|
||||
self._file.close()
|
||||
# Reopen the file, so it can be reused
|
||||
self._file = open(self.rpf.audio_filename, "rb")
|
||||
return b""
|
||||
return data
|
||||
|
||||
def delete(self):
|
||||
"""Permanently delete the downloaded file."""
|
||||
self._file.close()
|
||||
self.rpf.delete_audio_file()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<RoyalPCMAudio {self.rpf.audio_filename}>"
|
|
@ -1,102 +0,0 @@
|
|||
import logging
|
||||
import ffmpeg
|
||||
import os
|
||||
import typing
|
||||
import time
|
||||
import datetime
|
||||
from .youtubedl import YtdlFile, YtdlInfo
|
||||
from ..error import FileTooBigError
|
||||
from ..utils import fileformat
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoyalPCMFile(YtdlFile):
|
||||
ytdl_args = {
|
||||
"format": "bestaudio" # Fetch the best audio format available
|
||||
}
|
||||
|
||||
def __init__(self, info: "YtdlInfo", **ytdl_args):
|
||||
# Preemptively initialize info to be able to generate the filename
|
||||
self.info = info
|
||||
# If the video is longer than 3 hours, don't download it
|
||||
if self.info.duration >= datetime.timedelta(hours=3):
|
||||
raise FileTooBigError("File is over 3 hours long")
|
||||
# Set the time to generate the filename
|
||||
self._time = time.time()
|
||||
# Ensure the file doesn't already exist
|
||||
if os.path.exists(self.ytdl_filename) or os.path.exists(self.audio_filename):
|
||||
raise FileExistsError("Can't overwrite file")
|
||||
# Overwrite the new ytdl_args
|
||||
self.ytdl_args = {**self.ytdl_args, **ytdl_args}
|
||||
log.info(f"Now downloading {info.webpage_url}")
|
||||
super().__init__(info, outtmpl=self.ytdl_filename, **self.ytdl_args)
|
||||
# Find the audio_filename with a regex (should be video.opus)
|
||||
log.info(f"Converting {self.video_filename}...")
|
||||
# Convert the video to pcm
|
||||
try:
|
||||
ffmpeg.input(f"./{self.video_filename}") \
|
||||
.output(self.audio_filename, format="s16le", ac=2, ar="48000") \
|
||||
.overwrite_output() \
|
||||
.run(quiet=True)
|
||||
except ffmpeg.Error as exc:
|
||||
log.error(f"FFmpeg error: {exc.stderr}")
|
||||
raise
|
||||
# Delete the video file
|
||||
log.info(f"Deleting {self.video_filename}")
|
||||
self.delete_video_file()
|
||||
|
||||
def __repr__(self):
|
||||
return f"<RoyalPCMFile {self.audio_filename}>"
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url: str, **ytdl_args) -> typing.List["RoyalPCMFile"]:
|
||||
"""Download a file with youtube_dl and create a list of :py:class:`discord.audio.RoyalPCMFile`.
|
||||
|
||||
Parameters:
|
||||
url: The url of the file to download.
|
||||
ytdl_args: Extra arguments to be passed to YoutubeDL while downloading.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMAudios, each corresponding to a downloaded video."""
|
||||
info_list = YtdlInfo.create_from_url(url)
|
||||
return [RoyalPCMFile(info, **ytdl_args) for info in info_list]
|
||||
|
||||
@staticmethod
|
||||
def create_from_ytsearch(search: str, amount: int = 1, **ytdl_args) -> typing.List["RoyalPCMFile"]:
|
||||
"""Search a string on YouTube and download the first ``amount`` number of videos, then download those with youtube_dl and create a list of :py:class:`discord.audio.RoyalPCMFile`.
|
||||
|
||||
Parameters:
|
||||
search: The string to search on YouTube.
|
||||
amount: The number of videos to download.
|
||||
ytdl_args: Extra arguments to be passed to YoutubeDL while downloading.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of RoyalPCMFiles, each corresponding to a downloaded video."""
|
||||
url = f"ytsearch{amount}:{search}"
|
||||
info_list = YtdlInfo.create_from_url(url)
|
||||
return [RoyalPCMFile(info, **ytdl_args) for info in info_list]
|
||||
|
||||
@property
|
||||
def ytdl_filename(self) -> str:
|
||||
"""
|
||||
Returns:
|
||||
The name of the downloaded video file, as a :py:class:`str`.
|
||||
|
||||
Warning:
|
||||
It's going to be deleted as soon as the :py:func:`royalnet.audio.RoyalPCMFile.__init__` function has completed, so it's probably not going to be very useful...
|
||||
"""
|
||||
return f"./downloads/{fileformat(self.info.title)}-{fileformat(str(int(self._time)))}.ytdl"
|
||||
|
||||
@property
|
||||
def audio_filename(self) -> str:
|
||||
"""
|
||||
Returns:
|
||||
The name of the downloaded and PCM-converted audio file."""
|
||||
return f"./downloads/{fileformat(self.info.title)}-{fileformat(str(int(self._time)))}.pcm"
|
||||
|
||||
def delete_audio_file(self):
|
||||
"""Delete the PCM-converted audio file."""
|
||||
log.info(f"Deleting {self.audio_filename}")
|
||||
os.remove(self.audio_filename)
|
|
@ -1,71 +0,0 @@
|
|||
from .ytdlinfo import YtdlInfo
|
||||
|
||||
|
||||
class YtdlFile:
|
||||
"""Information about a youtube-dl downloaded file."""
|
||||
|
||||
def __init__(self,
|
||||
url: str,
|
||||
info: YtdlInfo = None,
|
||||
filename: str):
|
||||
...
|
||||
|
||||
|
||||
|
||||
|
||||
class OldYtdlFile:
|
||||
"""A wrapper around a youtube_dl downloaded file."""
|
||||
|
||||
ytdl_args = {
|
||||
"logger": log, # Log messages to a logging.Logger instance.
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
}
|
||||
|
||||
def __init__(self, info: "YtdlInfo", outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args):
|
||||
self.info: "YtdlInfo" = info
|
||||
self.video_filename: str
|
||||
# Create a local args copy
|
||||
ytdl_args["outtmpl"] = outtmpl
|
||||
self.ytdl_args = {**self.ytdl_args, **ytdl_args}
|
||||
# Create the ytdl
|
||||
ytdl = YoutubeDL(ytdl_args)
|
||||
# Find the file name
|
||||
self.video_filename = ytdl.prepare_filename(self.info.__dict__)
|
||||
# Download the file
|
||||
ytdl.download([self.info.webpage_url])
|
||||
# Final checks
|
||||
assert os.path.exists(self.video_filename)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<YtdlFile {self.video_filename}>"
|
||||
|
||||
@staticmethod
|
||||
def create_from_url(url, outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args) -> typing.List["YtdlFile"]:
|
||||
"""Download the videos at the specified url.
|
||||
|
||||
Parameters:
|
||||
url: The url to download the videos from.
|
||||
outtmpl: The filename that the downloaded videos are going to have. The name can be formatted according to the `outtmpl documentation <https://github.com/ytdl-org/youtube-dl/blob/master/README.md#output-template>`_.
|
||||
ytdl_args: Other arguments to be passed to the YoutubeDL object.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` of YtdlFiles."""
|
||||
info_list = YtdlInfo.create_from_url(url)
|
||||
return [info.download(outtmpl, **ytdl_args) for info in info_list]
|
||||
|
||||
def _stop_download(self):
|
||||
"""I have no clue of what this does, or why is it here. Possibly remove it?
|
||||
|
||||
Raises:
|
||||
InterruptDownload: ...uhhh, always?"""
|
||||
raise InterruptDownload()
|
||||
|
||||
def delete_video_file(self):
|
||||
"""Delete the file located at ``self.video_filename``.
|
||||
|
||||
Note:
|
||||
No checks are done when deleting, so it may try to delete a non-existing file and raise an exception or do some other weird stuff with weird filenames."""
|
||||
os.remove(self.video_filename)
|
||||
|
2
royalnet/audio/ytdldiscord.py
Normal file
2
royalnet/audio/ytdldiscord.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
import discord
|
||||
from .ytdlfile import YtdlFile
|
72
royalnet/audio/ytdlfile.py
Normal file
72
royalnet/audio/ytdlfile.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
import contextlib
|
||||
import os
|
||||
import typing
|
||||
import youtube_dl
|
||||
from .ytdlinfo import YtdlInfo
|
||||
from .errors import NotFoundError, MultipleFilesError, MissingInfoError, AlreadyDownloadedError
|
||||
|
||||
|
||||
class YtdlFile:
|
||||
"""Information about a youtube-dl downloaded file."""
|
||||
|
||||
_default_ytdl_args = {
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
"outtmpl": "%(title)s-%(id)s.%(ext)s" # Use the default outtmpl.
|
||||
}
|
||||
|
||||
def __init__(self,
|
||||
url: str,
|
||||
info: YtdlInfo = None,
|
||||
filename: str = None):
|
||||
self.url: str = url
|
||||
self.info: YtdlInfo = info
|
||||
self.filename: str = filename
|
||||
|
||||
def has_info(self) -> bool:
|
||||
return self.info is not None
|
||||
|
||||
def is_downloaded(self) -> bool:
|
||||
return self.filename is not None and os.path.exists(self.filename)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open(self):
|
||||
if not self.is_downloaded():
|
||||
raise FileNotFoundError("The file hasn't been downloaded yet.")
|
||||
with open(self.filename, "r") as file:
|
||||
yield file
|
||||
|
||||
def update_info(self, **ytdl_args) -> None:
|
||||
infos = YtdlInfo.retrieve_for_url(self.url, **ytdl_args)
|
||||
if len(infos) == 0:
|
||||
raise NotFoundError()
|
||||
elif len(infos) > 1:
|
||||
raise MultipleFilesError()
|
||||
self.info = infos[0]
|
||||
|
||||
def download_file(self, **ytdl_args) -> None:
|
||||
if not self.has_info():
|
||||
raise MissingInfoError()
|
||||
if self.is_downloaded():
|
||||
raise AlreadyDownloadedError()
|
||||
with youtube_dl.YoutubeDL({**self._default_ytdl_args, **ytdl_args}) as ytdl:
|
||||
ytdl.download([self.info.webpage_url])
|
||||
self.filename = ytdl.prepare_filename(self.info.__dict__)
|
||||
|
||||
def delete_file(self):
|
||||
"""Delete the file located at ``self.filename``."""
|
||||
if not self.is_downloaded():
|
||||
raise FileNotFoundError("The file hasn't been downloaded yet.")
|
||||
os.remove(self.filename)
|
||||
self.filename = None
|
||||
|
||||
@classmethod
|
||||
def download_from_url(cls, url: str, **ytdl_args) -> typing.List["YtdlFile"]:
|
||||
infos = YtdlInfo.retrieve_for_url(url, **ytdl_args)
|
||||
files = []
|
||||
for info in infos:
|
||||
file = YtdlFile(url=info.webpage_url, info=info)
|
||||
file.download_file(**ytdl_args)
|
||||
files.append(file)
|
||||
return files
|
|
@ -9,6 +9,13 @@ import royalnet.utils as u
|
|||
class YtdlInfo:
|
||||
"""A wrapper around youtube_dl extracted info."""
|
||||
|
||||
_default_ytdl_args = {
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
"outtmpl": "%(title)s-%(id)s.%(ext)s" # Use the default outtmpl.
|
||||
}
|
||||
|
||||
def __init__(self, info: typing.Dict[str, typing.Any]):
|
||||
"""Create a YtdlInfo from the dict returned by the :py:func:`youtube_dl.YoutubeDL.extract_info` function.
|
||||
|
||||
|
@ -71,19 +78,14 @@ class YtdlInfo:
|
|||
self.abr: typing.Optional[int] = info.get("abr")
|
||||
self.ext: typing.Optional[str] = info.get("ext")
|
||||
|
||||
@staticmethod
|
||||
def retrieve_for_url(url, **ytdl_args) -> typing.List["YtdlInfo"]:
|
||||
@classmethod
|
||||
def retrieve_for_url(cls, url, **ytdl_args) -> typing.List["YtdlInfo"]:
|
||||
"""Fetch the info for an url through YoutubeDL.
|
||||
|
||||
Returns:
|
||||
A :py:class:`list` containing the infos for the requested videos."""
|
||||
# So many redundant options!
|
||||
ytdl = youtube_dl.YoutubeDL({
|
||||
"quiet": True, # Do not print messages to stdout.
|
||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||
"no_warnings": True, # Do not print out anything for warnings.
|
||||
**ytdl_args
|
||||
})
|
||||
ytdl = youtube_dl.YoutubeDL({**cls._default_ytdl_args, **ytdl_args})
|
||||
first_info = ytdl.extract_info(url=url, download=False)
|
||||
# If it is a playlist, create multiple videos!
|
||||
if "entries" in first_info:
|
||||
|
|
Loading…
Reference in a new issue