mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
Add RoyalAudioFile, automatically downloads the opus file required for discord
This commit is contained in:
parent
992b643f5b
commit
6af6984380
4 changed files with 61 additions and 26 deletions
|
@ -8,3 +8,4 @@ Markdown>=3.1
|
||||||
dateparser>=0.7.1
|
dateparser>=0.7.1
|
||||||
discord.py>=1.0.1
|
discord.py>=1.0.1
|
||||||
youtube_dl>=2019.4.7
|
youtube_dl>=2019.4.7
|
||||||
|
ffmpeg-python>=0.1.17
|
||||||
|
|
|
@ -1,7 +0,0 @@
|
||||||
import discord
|
|
||||||
from .youtubedl import YtdlFile
|
|
||||||
|
|
||||||
|
|
||||||
class DiscordYtdlFile(YtdlFile):
|
|
||||||
def create_audio_source(self):
|
|
||||||
return discord.FFmpegPCMAudio(self.filename)
|
|
40
royalnet/audio/royalaudiofile.py
Normal file
40
royalnet/audio/royalaudiofile.py
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
import discord
|
||||||
|
import ffmpeg
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
import typing
|
||||||
|
import logging as _logging
|
||||||
|
from .youtubedl import YtdlFile, YtdlInfo
|
||||||
|
|
||||||
|
|
||||||
|
log = _logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RoyalAudioFile(YtdlFile):
|
||||||
|
ytdl_args = {
|
||||||
|
"logger": log, # Log messages to a logging.Logger instance.
|
||||||
|
"quiet": True, # Do not print messages to stdout.
|
||||||
|
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||||
|
"no_warnings": True, # Do not print out anything for warnings.,
|
||||||
|
"format": "bestaudio" # Fetch the best audio format available
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, info: "YtdlInfo", **ytdl_args):
|
||||||
|
super().__init__(info, outtmpl="%(id)s-%(title)s.%(ext)s", **ytdl_args)
|
||||||
|
# Find the audio_filename with a regex (should be video.opus)
|
||||||
|
self.audio_filename = re.sub(rf"\.{self.info.ext}$", ".opus", self.video_filename)
|
||||||
|
# Convert the video to opus
|
||||||
|
converter = ffmpeg.input(self.video_filename) \
|
||||||
|
.output(self.audio_filename)
|
||||||
|
converter.run()
|
||||||
|
# Delete the video file
|
||||||
|
self.delete_video_file()
|
||||||
|
|
||||||
|
def delete_audio_file(self):
|
||||||
|
# TODO: _might_ be unsafe, test this
|
||||||
|
os.remove(self.audio_filename)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create_from_url(url, **ytdl_args) -> typing.List["RoyalAudioFile"]:
|
||||||
|
info_list = YtdlInfo.create_from_url(url)
|
||||||
|
return [RoyalAudioFile(info) for info in info_list]
|
|
@ -16,27 +16,28 @@ class InterruptDownload(DownloaderError):
|
||||||
|
|
||||||
|
|
||||||
class YtdlFile:
|
class YtdlFile:
|
||||||
"""A wrapper around a youtube_dl downloaded file."""
|
ytdl_args = {
|
||||||
def __init__(self, info: "YtdlInfo", outtmpl="%(title)s-%(id)s.%(ext)s", progress_hooks=None, **ytdl_args):
|
|
||||||
if progress_hooks is None:
|
|
||||||
progress_hooks = []
|
|
||||||
self.info: "YtdlInfo" = info
|
|
||||||
self.filename: str
|
|
||||||
ytdl = YoutubeDL({
|
|
||||||
"logger": log, # Log messages to a logging.Logger instance.
|
"logger": log, # Log messages to a logging.Logger instance.
|
||||||
"quiet": True, # Do not print messages to stdout.
|
"quiet": True, # Do not print messages to stdout.
|
||||||
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
"noplaylist": True, # Download single video instead of a playlist if in doubt.
|
||||||
"no_warnings": True, # Do not print out anything for warnings.
|
"no_warnings": True, # Do not print out anything for warnings.
|
||||||
"outtmpl": outtmpl,
|
}
|
||||||
"progress_hooks": progress_hooks,
|
|
||||||
**ytdl_args
|
"""A wrapper around a youtube_dl downloaded file."""
|
||||||
})
|
def __init__(self, info: "YtdlInfo", outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args):
|
||||||
|
self.info: "YtdlInfo" = info
|
||||||
|
self.video_filename: str
|
||||||
|
# Create a local args copy
|
||||||
|
ytdl_args["outtmpl"] = outtmpl
|
||||||
|
self.ytdl_args = {**self.ytdl_args, **ytdl_args}
|
||||||
|
# Create the ytdl
|
||||||
|
ytdl = YoutubeDL(ytdl_args)
|
||||||
# Find the file name
|
# Find the file name
|
||||||
self.filename = ytdl.prepare_filename(self.info.__dict__)
|
self.video_filename = ytdl.prepare_filename(self.info.__dict__)
|
||||||
# Download the file
|
# Download the file
|
||||||
ytdl.download([self.info.webpage_url])
|
ytdl.download([self.info.webpage_url])
|
||||||
# Final checks
|
# Final checks
|
||||||
assert os.path.exists(self.filename)
|
assert os.path.exists(self.video_filename)
|
||||||
|
|
||||||
def _stop_download(self):
|
def _stop_download(self):
|
||||||
raise InterruptDownload()
|
raise InterruptDownload()
|
||||||
|
@ -46,9 +47,9 @@ class YtdlFile:
|
||||||
info_list = YtdlInfo.create_from_url(url)
|
info_list = YtdlInfo.create_from_url(url)
|
||||||
return [info.download(outtmpl, progress_hooks, **ytdl_args) for info in info_list]
|
return [info.download(outtmpl, progress_hooks, **ytdl_args) for info in info_list]
|
||||||
|
|
||||||
def delete_file(self):
|
def delete_video_file(self):
|
||||||
# TODO: _might_ be unsafe, test this
|
# TODO: _might_ be unsafe, test this
|
||||||
os.remove(self.filename)
|
os.remove(self.video_filename)
|
||||||
|
|
||||||
|
|
||||||
class YtdlInfo:
|
class YtdlInfo:
|
||||||
|
@ -128,8 +129,8 @@ class YtdlInfo:
|
||||||
return [YtdlInfo(second_info) for second_info in first_info["entries"]]
|
return [YtdlInfo(second_info) for second_info in first_info["entries"]]
|
||||||
return [YtdlInfo(first_info)]
|
return [YtdlInfo(first_info)]
|
||||||
|
|
||||||
def download(self, outtmpl="%(title)s-%(id)s.%(ext)s", progress_hooks=None, **ytdl_args) -> YtdlFile:
|
def download(self, outtmpl="%(title)s-%(id)s.%(ext)s", **ytdl_args) -> YtdlFile:
|
||||||
return YtdlFile(self, outtmpl, progress_hooks=progress_hooks)
|
return YtdlFile(self, outtmpl)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
if self.title:
|
if self.title:
|
||||||
|
|
Loading…
Reference in a new issue