1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00
This commit is contained in:
Steffo 2019-11-12 17:41:14 +01:00
parent 839ae802e2
commit db39874408
65 changed files with 766 additions and 569 deletions

View file

@ -1,3 +0,0 @@
from . import audio, bots, commands, packs, database, utils, error, web, version
__all__ = ["audio", "bots", "commands", "database", "utils", "error", "web", "version"]

View file

@ -94,16 +94,16 @@ def run(telegram: typing.Optional[bool],
network_config = rh.Config(network_address, network_password) network_config = rh.Config(network_address, network_password)
# Create a Alchemy configuration # Create a Alchemy configuration
telegram_db_config: typing.Optional[r.database.DatabaseConfig] = None telegram_db_config: typing.Optional[r.alchemy.DatabaseConfig] = None
discord_db_config: typing.Optional[r.database.DatabaseConfig] = None discord_db_config: typing.Optional[r.alchemy.DatabaseConfig] = None
if database is not None: if database is not None:
telegram_db_config = r.database.DatabaseConfig(database, telegram_db_config = r.alchemy.DatabaseConfig(database,
r.packs.common.tables.User,
r.packs.common.tables.Telegram,
"tg_id")
discord_db_config = r.database.DatabaseConfig(database,
r.packs.common.tables.User, r.packs.common.tables.User,
r.packs.common.tables.Discord, r.packs.common.tables.Telegram,
"tg_id")
discord_db_config = r.alchemy.DatabaseConfig(database,
r.packs.common.tables.User,
r.packs.common.tables.Discord,
"discord_id") "discord_id")
# Import command and star packs # Import command and star packs
@ -136,11 +136,11 @@ def run(telegram: typing.Optional[bool],
for command in enabled_commands: for command in enabled_commands:
click.echo(f"{command.name} - {command.description}") click.echo(f"{command.name} - {command.description}")
click.echo("") click.echo("")
telegram_bot = r.bots.TelegramBot(network_config=network_config, telegram_bot = r.interfaces.TelegramBot(network_config=network_config,
database_config=telegram_db_config, database_config=telegram_db_config,
sentry_dsn=sentry_dsn, sentry_dsn=sentry_dsn,
commands=enabled_commands, commands=enabled_commands,
secrets_name=secrets_name) secrets_name=secrets_name)
telegram_process = multiprocessing.Process(name="Telegram Interface", telegram_process = multiprocessing.Process(name="Telegram Interface",
target=telegram_bot.run_blocking, target=telegram_bot.run_blocking,
args=(verbose,), args=(verbose,),
@ -149,11 +149,11 @@ def run(telegram: typing.Optional[bool],
discord_process: typing.Optional[multiprocessing.Process] = None discord_process: typing.Optional[multiprocessing.Process] = None
if interfaces["discord"]: if interfaces["discord"]:
discord_bot = r.bots.DiscordBot(network_config=network_config, discord_bot = r.interfaces.DiscordBot(network_config=network_config,
database_config=discord_db_config, database_config=discord_db_config,
sentry_dsn=sentry_dsn, sentry_dsn=sentry_dsn,
commands=enabled_commands, commands=enabled_commands,
secrets_name=secrets_name) secrets_name=secrets_name)
discord_process = multiprocessing.Process(name="Discord Interface", discord_process = multiprocessing.Process(name="Discord Interface",
target=discord_bot.run_blocking, target=discord_bot.run_blocking,
args=(verbose,), args=(verbose,),

View file

@ -0,0 +1,12 @@
"""Relational database classes and methods."""
from .alchemy import Alchemy
from .table_dfs import table_dfs
from .errors import *
__all__ = [
"Alchemy",
"table_dfs",
"AlchemyException",
"TableNotFoundException"
]

107
royalnet/alchemy/alchemy.py Normal file
View file

@ -0,0 +1,107 @@
from typing import Set, Dict, Union, Optional
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.schema import Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.declarative.api import DeclarativeMeta
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager, asynccontextmanager
from royalnet.utils import asyncify
from royalnet.alchemy.errors import TableNotFoundException
class Alchemy:
"""A wrapper around ``sqlalchemy.orm`` that allows the instantiation of multiple engines at once while maintaining
a single declarative class for all of them."""
def __init__(self, database_uri: str, tables: Set):
"""Create a new Alchemy object.
Args:
database_uri: The `database URI <https://docs.sqlalchemy.org/en/13/core/engines.html>`_ .
tables: The :class:`set` of tables to be created and used in the selected database.
Check the tables submodule for more details.
"""
if database_uri.startswith("sqlite"):
raise NotImplementedError("sqlite databases aren't supported, as they can't be used in multithreaded"
" applications")
self._engine: Engine = create_engine(database_uri)
self._Base: DeclarativeMeta = declarative_base(bind=self._engine)
self._Session: sessionmaker = sessionmaker(bind=self._engine)
self._tables: Dict[str, Table] = {}
for table in tables:
name = table.__name__
assert self._tables.get(name) is None
assert isinstance(name, str)
self._tables[name] = type(name, (self._Base, table), {})
self._Base.metadata.create_all()
def get(self, table: Union[str, type]) -> Optional[Table]:
"""Get the table with a specified name or class.
Args:
table: The table name or table class you want to get.
Raises:
TableNotFoundError: if the requested table was not found."""
if isinstance(table, str):
result = self._tables.get(table)
if result is None:
raise TableNotFoundException(f"Table '{table}' isn't present in this Alchemy instance")
return result
elif isinstance(table, type):
name = table.__name__
result = self._tables.get(name)
if result is None:
raise TableNotFoundException(f"Table '{table}' isn't present in this Alchemy instance")
return result
else:
raise TypeError(f"Can't get tables with objects of type '{table.__class__.__qualname__}'")
@contextmanager
def session_cm(self):
"""Create a Session as a context manager (that can be used in ``with`` statements).
The Session will be closed safely when the context manager exits (even in case of error).
Example:
You can use the context manager like this: ::
with alchemy.session_cm() as session:
# Do some stuff
...
# Commit the session
session.commit()
"""
session = self._Session()
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.close()
@asynccontextmanager
async def session_acm(self):
"""Create a Session as a async context manager (that can be used in ``async with`` statements).
The Session will be closed safely when the context manager exits (even in case of error).
Example:
You can use the async context manager like this: ::
async with alchemy.session_acm() as session:
# Do some stuff
...
# Commit the session
await asyncify(session.commit)"""
session = await asyncify(self._Session)
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.close()

View file

@ -0,0 +1,7 @@
class AlchemyException(Exception):
"""Base class for Alchemy exceptions."""
class TableNotFoundException(AlchemyException):
"""The requested table was not found."""

View file

@ -1,14 +1,18 @@
import typing from typing import Optional
from sqlalchemy.inspection import inspect from sqlalchemy.inspection import inspect
from sqlalchemy.schema import Table
def relationshiplinkchain(starting_class, ending_class) -> typing.Optional[tuple]: def table_dfs(starting_table: Table, ending_table: Table) -> tuple:
"""Find the path to follow to get from the starting table to the ending table.""" """Depth-first-search for the path from the starting table to the ending table.
Returns:
A :class:`tuple` containing the path, starting from the starting table and ending at the ending table."""
inspected = set() inspected = set()
def search(_mapper, chain): def search(_mapper, chain):
inspected.add(_mapper) inspected.add(_mapper)
if _mapper.class_ == ending_class: if _mapper.class_ == ending_table:
return chain return chain
relationships = _mapper.relationships relationships = _mapper.relationships
for _relationship in set(relationships): for _relationship in set(relationships):
@ -19,4 +23,4 @@ def relationshiplinkchain(starting_class, ending_class) -> typing.Optional[tuple
return result return result
return () return ()
return search(inspect(starting_class), tuple()) return search(inspect(starting_table), tuple())

View file

@ -1,10 +0,0 @@
"""Video and audio downloading related classes, mainly used for Discord voice bots."""
from . import playmodes
from .ytdlinfo import YtdlInfo
from .ytdlfile import YtdlFile
from .fileaudiosource import FileAudioSource
from .ytdldiscord import YtdlDiscord
from .ytdlmp3 import YtdlMp3
__all__ = ["playmodes", "YtdlInfo", "YtdlFile", "FileAudioSource", "YtdlDiscord", "YtdlMp3"]

View file

@ -1,18 +0,0 @@
class YtdlError(Exception):
pass
class NotFoundError(YtdlError):
pass
class MultipleFilesError(YtdlError):
pass
class MissingInfoError(YtdlError):
pass
class AlreadyDownloadedError(YtdlError):
pass

View file

@ -1,72 +0,0 @@
import contextlib
import os
import typing
import youtube_dl
from .ytdlinfo import YtdlInfo
from .errors import NotFoundError, MultipleFilesError, MissingInfoError, AlreadyDownloadedError
class YtdlFile:
"""Information about a youtube-dl downloaded file."""
_default_ytdl_args = {
"quiet": not __debug__, # Do not print messages to stdout.
"noplaylist": True, # Download single video instead of a playlist if in doubt.
"no_warnings": not __debug__, # Do not print out anything for warnings.
"outtmpl": "%(epoch)s-%(title)s-%(id)s.%(ext)s", # Use the default outtmpl.
"ignoreerrors": True # Ignore unavailable videos
}
def __init__(self,
url: str,
info: typing.Optional[YtdlInfo] = None,
filename: typing.Optional[str] = None):
self.url: str = url
self.info: typing.Optional[YtdlInfo] = info
self.filename: typing.Optional[str] = filename
def has_info(self) -> bool:
return self.info is not None
def is_downloaded(self) -> bool:
return self.filename is not None
@contextlib.contextmanager
def open(self):
if not self.is_downloaded():
raise FileNotFoundError("The file hasn't been downloaded yet.")
with open(self.filename, "r") as file:
yield file
def update_info(self, **ytdl_args) -> None:
infos = YtdlInfo.retrieve_for_url(self.url, **ytdl_args)
if len(infos) == 0:
raise NotFoundError()
elif len(infos) > 1:
raise MultipleFilesError()
self.info = infos[0]
def download_file(self, **ytdl_args) -> None:
if not self.has_info():
raise MissingInfoError()
if self.is_downloaded():
raise AlreadyDownloadedError()
with youtube_dl.YoutubeDL({**self._default_ytdl_args, **ytdl_args}) as ytdl:
filename = ytdl.prepare_filename(self.info.__dict__)
ytdl.download([self.info.webpage_url])
self.filename = filename
def delete(self):
if self.is_downloaded():
os.remove(self.filename)
self.filename = None
@classmethod
def download_from_url(cls, url: str, **ytdl_args) -> typing.List["YtdlFile"]:
infos = YtdlInfo.retrieve_for_url(url, **ytdl_args)
files = []
for info in infos:
file = YtdlFile(url=info.webpage_url, info=info)
file.download_file(**ytdl_args)
files.append(file)
return files

View file

@ -1,140 +0,0 @@
import typing
import datetime
import dateparser
import youtube_dl
import discord
import royalnet.utils as u
class YtdlInfo:
"""A wrapper around youtube_dl extracted info."""
_default_ytdl_args = {
"quiet": True, # Do not print messages to stdout.
"noplaylist": True, # Download single video instead of a playlist if in doubt.
"no_warnings": True, # Do not print out anything for warnings.
"outtmpl": "%(title)s-%(id)s.%(ext)s", # Use the default outtmpl.
"ignoreerrors": True # Ignore unavailable videos
}
def __init__(self, info: typing.Dict[str, typing.Any]):
"""Create a YtdlInfo from the dict returned by the :py:func:`youtube_dl.YoutubeDL.extract_info` function.
Warning:
Does not download the info, for that use :py:func:`royalnet.audio.YtdlInfo.retrieve_for_url`."""
self.id: typing.Optional[str] = info.get("id")
self.uploader: typing.Optional[str] = info.get("uploader")
self.uploader_id: typing.Optional[str] = info.get("uploader_id")
self.uploader_url: typing.Optional[str] = info.get("uploader_url")
self.channel_id: typing.Optional[str] = info.get("channel_id")
self.channel_url: typing.Optional[str] = info.get("channel_url")
self.upload_date: typing.Optional[datetime.datetime] = dateparser.parse(u.ytdldateformat(info.get("upload_date")))
self.license: typing.Optional[str] = info.get("license")
self.creator: typing.Optional[...] = info.get("creator")
self.title: typing.Optional[str] = info.get("title")
self.alt_title: typing.Optional[...] = info.get("alt_title")
self.thumbnail: typing.Optional[str] = info.get("thumbnail")
self.description: typing.Optional[str] = info.get("description")
self.categories: typing.Optional[typing.List[str]] = info.get("categories")
self.tags: typing.Optional[typing.List[str]] = info.get("tags")
self.subtitles: typing.Optional[typing.Dict[str, typing.List[typing.Dict[str, str]]]] = info.get("subtitles")
self.automatic_captions: typing.Optional[dict] = info.get("automatic_captions")
self.duration: typing.Optional[datetime.timedelta] = datetime.timedelta(seconds=info.get("duration", 0))
self.age_limit: typing.Optional[int] = info.get("age_limit")
self.annotations: typing.Optional[...] = info.get("annotations")
self.chapters: typing.Optional[...] = info.get("chapters")
self.webpage_url: typing.Optional[str] = info.get("webpage_url")
self.view_count: typing.Optional[int] = info.get("view_count")
self.like_count: typing.Optional[int] = info.get("like_count")
self.dislike_count: typing.Optional[int] = info.get("dislike_count")
self.average_rating: typing.Optional[...] = info.get("average_rating")
self.formats: typing.Optional[list] = info.get("formats")
self.is_live: typing.Optional[bool] = info.get("is_live")
self.start_time: typing.Optional[float] = info.get("start_time")
self.end_time: typing.Optional[float] = info.get("end_time")
self.series: typing.Optional[str] = info.get("series")
self.season_number: typing.Optional[int] = info.get("season_number")
self.episode_number: typing.Optional[int] = info.get("episode_number")
self.track: typing.Optional[...] = info.get("track")
self.artist: typing.Optional[...] = info.get("artist")
self.extractor: typing.Optional[str] = info.get("extractor")
self.webpage_url_basename: typing.Optional[str] = info.get("webpage_url_basename")
self.extractor_key: typing.Optional[str] = info.get("extractor_key")
self.playlist: typing.Optional[str] = info.get("playlist")
self.playlist_index: typing.Optional[int] = info.get("playlist_index")
self.thumbnails: typing.Optional[typing.List[typing.Dict[str, str]]] = info.get("thumbnails")
self.display_id: typing.Optional[str] = info.get("display_id")
self.requested_subtitles: typing.Optional[...] = info.get("requested_subtitles")
self.requested_formats: typing.Optional[tuple] = info.get("requested_formats")
self.format: typing.Optional[str] = info.get("format")
self.format_id: typing.Optional[str] = info.get("format_id")
self.width: typing.Optional[int] = info.get("width")
self.height: typing.Optional[int] = info.get("height")
self.resolution: typing.Optional[...] = info.get("resolution")
self.fps: typing.Optional[int] = info.get("fps")
self.vcodec: typing.Optional[str] = info.get("vcodec")
self.vbr: typing.Optional[int] = info.get("vbr")
self.stretched_ratio: typing.Optional[...] = info.get("stretched_ratio")
self.acodec: typing.Optional[str] = info.get("acodec")
self.abr: typing.Optional[int] = info.get("abr")
self.ext: typing.Optional[str] = info.get("ext")
@classmethod
def retrieve_for_url(cls, url, **ytdl_args) -> typing.List["YtdlInfo"]:
"""Fetch the info for an url through YoutubeDL.
Returns:
A :py:class:`list` containing the infos for the requested videos."""
# So many redundant options!
ytdl = youtube_dl.YoutubeDL({**cls._default_ytdl_args, **ytdl_args})
first_info = ytdl.extract_info(url=url, download=False)
# No video was found
if first_info is None:
return []
# If it is a playlist, create multiple videos!
if "entries" in first_info and first_info["entries"][0] is not None:
second_info_list = []
for second_info in first_info["entries"]:
if second_info is None:
continue
second_info_list.append(YtdlInfo(second_info))
return second_info_list
return [YtdlInfo(first_info)]
def to_discord_embed(self) -> discord.Embed:
"""Return this info as a :py:class:`discord.Embed`."""
colors = {
"youtube": 0xCC0000,
"soundcloud": 0xFF5400,
"Clyp": 0x3DBEB3,
"Bandcamp": 0x1DA0C3,
"Peertube": 0x0A193C,
}
embed = discord.Embed(title=self.title,
colour=discord.Colour(colors.get(self.extractor, 0x4F545C)),
url=self.webpage_url if self.webpage_url is not None and self.webpage_url.startswith("http") else discord.embeds.EmptyEmbed)
if self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
if self.uploader:
embed.set_author(name=self.uploader, url=self.uploader_url if self.uploader_url is not None else discord.embeds.EmptyEmbed)
# embed.set_footer(text="Source: youtube-dl", icon_url="https://i.imgur.com/TSvSRYn.png")
if self.duration:
embed.add_field(name="Duration", value=str(self.duration), inline=True)
if self.upload_date:
embed.add_field(name="Published on", value=self.upload_date.strftime("%d %b %Y"), inline=True)
return embed
def __repr__(self):
if self.title:
return f"<YtdlInfo of {self.title}>"
if self.webpage_url:
return f"<YtdlInfo for {self.webpage_url}>"
return f"<YtdlInfo id={self.id} ...>"
def __str__(self):
"""Return the video name."""
if self.title:
return self.title
if self.webpage_url:
return self.webpage_url
return self.id

View file

@ -1,66 +0,0 @@
import typing
import re
import ffmpeg
import os
from .ytdlinfo import YtdlInfo
from .ytdlfile import YtdlFile
from .fileaudiosource import FileAudioSource
class YtdlMp3:
def __init__(self, ytdl_file: YtdlFile):
self.ytdl_file: YtdlFile = ytdl_file
self.mp3_filename: typing.Optional[str] = None
self._fas_spawned: typing.List[FileAudioSource] = []
def pcm_available(self):
return self.mp3_filename is not None and os.path.exists(self.mp3_filename)
def convert_to_mp3(self) -> None:
if not self.ytdl_file.is_downloaded():
raise FileNotFoundError("File hasn't been downloaded yet")
destination_filename = re.sub(r"\.[^.]+$", ".mp3", self.ytdl_file.filename)
out, err = (
ffmpeg.input(self.ytdl_file.filename)
.output(destination_filename, format="mp3")
.overwrite_output()
.run_async()
)
self.mp3_filename = destination_filename
def ready_up(self):
if not self.ytdl_file.has_info():
self.ytdl_file.update_info()
if not self.ytdl_file.is_downloaded():
self.ytdl_file.download_file()
if not self.pcm_available():
self.convert_to_mp3()
def delete(self) -> None:
if self.pcm_available():
for source in self._fas_spawned:
if not source.file.closed:
source.file.close()
os.remove(self.mp3_filename)
self.mp3_filename = None
self.ytdl_file.delete()
@classmethod
def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
files = YtdlFile.download_from_url(url, **ytdl_args)
dfiles = []
for file in files:
dfile = YtdlMp3(file)
dfiles.append(dfile)
return dfiles
@classmethod
def create_and_ready_from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
dfiles = cls.create_from_url(url, **ytdl_args)
for dfile in dfiles:
dfile.ready_up()
return dfiles
@property
def info(self) -> typing.Optional[YtdlInfo]:
return self.ytdl_file.info

14
royalnet/bard/__init__.py Normal file
View file

@ -0,0 +1,14 @@
from .ytdlinfo import YtdlInfo
from .ytdlfile import YtdlFile
from .ytdlmp3 import YtdlMp3
from .errors import *
__all__ = [
"YtdlInfo",
"YtdlFile",
"YtdlMp3",
"BardError",
"YtdlError",
"NotFoundError",
"MultipleFilesError",
]

14
royalnet/bard/errors.py Normal file
View file

@ -0,0 +1,14 @@
class BardError(Exception):
"""Base class for ``bard`` errors."""
class YtdlError(BardError):
"""Base class for ``youtube_dl`` errors."""
class NotFoundError(YtdlError):
"""The requested resource wasn't found."""
class MultipleFilesError(YtdlError):
"""The resource contains multiple media files."""

104
royalnet/bard/ytdlfile.py Normal file
View file

@ -0,0 +1,104 @@
import os
import youtube_dl
from contextlib import asynccontextmanager
from typing import Optional, List, Dict, Any
from royalnet.utils import asyncify, MultiLock
from asyncio import AbstractEventLoop, get_event_loop
from .ytdlinfo import YtdlInfo
from .errors import NotFoundError, MultipleFilesError
class YtdlFile:
"""A representation of a file download with ``youtube_dl``."""
default_ytdl_args = {
"quiet": not __debug__, # Do not print messages to stdout.
"noplaylist": True, # Download single video instead of a playlist if in doubt.
"no_warnings": not __debug__, # Do not print out anything for warnings.
"outtmpl": "%(epoch)s-%(title)s-%(id)s.%(ext)s", # Use the default outtmpl.
"ignoreerrors": True # Ignore unavailable videos
}
def __init__(self,
url: str,
info: Optional[YtdlInfo] = None,
filename: Optional[str] = None,
ytdl_args: Optional[Dict[str, Any]] = None,
loop: Optional[AbstractEventLoop] = None):
"""Create a YtdlFile instance.
Warning:
Please avoid using directly ``__init__()``, use :meth:`.from_url` instead!"""
self.url: str = url
self.info: Optional[YtdlInfo] = info
self.filename: Optional[str] = filename
self.ytdl_args: Dict[str, Any] = {**self.default_ytdl_args, **ytdl_args}
self.lock: MultiLock = MultiLock()
if not loop:
loop = get_event_loop()
self._loop = loop
@property
def has_info(self) -> bool:
"""Does the YtdlFile have info available?"""
return self.info is not None
async def retrieve_info(self) -> None:
"""Retrieve info about the YtdlFile through ``youtube_dl``."""
if not self.has_info:
infos = await asyncify(YtdlInfo.from_url, self.url, loop=self._loop, **self.ytdl_args)
if len(infos) == 0:
raise NotFoundError()
elif len(infos) > 1:
raise MultipleFilesError()
self.info = infos[0]
@property
def is_downloaded(self) -> bool:
"""Has the file been downloaded yet?"""
return self.filename is not None
async def download_file(self) -> None:
"""Download the file."""
def download():
"""Download function block to be asyncified."""
with youtube_dl.YoutubeDL(self.ytdl_args) as ytdl:
filename = ytdl.prepare_filename(self.info.__dict__)
ytdl.download([self.info.webpage_url])
self.filename = filename
await self.retrieve_info()
async with self.lock.exclusive():
await asyncify(download, loop=self._loop)
@asynccontextmanager
async def aopen(self):
"""Open the downloaded file as an async context manager (and download it if it isn't available yet).
Example:
You can use the async context manager like this: ::
async with ytdlfile.aopen() as file:
b: bytes = file.read()
"""
await self.download_file()
async with self.lock.normal():
with open(self.filename, "rb") as file:
yield file
async def delete_asap(self):
"""As soon as nothing is using the file, delete it."""
async with self.lock.exclusive():
os.remove(self.filename)
self.filename = None
@classmethod
async def from_url(cls, url: str, **ytdl_args) -> List["YtdlFile"]:
"""Create a :class:`list` of :class:`YtdlFile` from a URL."""
infos = await YtdlInfo.from_url(url, **ytdl_args)
files = []
for info in infos:
file = YtdlFile(url=info.webpage_url, info=info, ytdl_args=ytdl_args)
files.append(file)
return files

118
royalnet/bard/ytdlinfo.py Normal file
View file

@ -0,0 +1,118 @@
from asyncio import AbstractEventLoop, get_event_loop
from typing import Optional, Dict, List, Any
from datetime import datetime, timedelta
import dateparser
from youtube_dl import YoutubeDL
from royalnet.utils import ytdldateformat, asyncify
class YtdlInfo:
"""A wrapper around youtube_dl extracted info."""
_default_ytdl_args = {
"quiet": True, # Do not print messages to stdout.
"noplaylist": True, # Download single video instead of a playlist if in doubt.
"no_warnings": True, # Do not print out anything for warnings.
"outtmpl": "%(title)s-%(id)s.%(ext)s", # Use the default outtmpl.
"ignoreerrors": True # Ignore unavailable videos
}
def __init__(self, info: Dict[str, Any]):
"""Create a YtdlInfo from the dict returned by the :py:func:`youtube_dl.YoutubeDL.extract_info` function.
Warning:
Does not download the info, for that use :py:func:`royalnet.audio.YtdlInfo.retrieve_for_url`."""
self.id: Optional[str] = info.get("id")
self.uploader: Optional[str] = info.get("uploader")
self.uploader_id: Optional[str] = info.get("uploader_id")
self.uploader_url: Optional[str] = info.get("uploader_url")
self.channel_id: Optional[str] = info.get("channel_id")
self.channel_url: Optional[str] = info.get("channel_url")
self.upload_date: Optional[datetime] = dateparser.parse(ytdldateformat(info.get("upload_date")))
self.license: Optional[str] = info.get("license")
self.creator: Optional[...] = info.get("creator")
self.title: Optional[str] = info.get("title")
self.alt_title: Optional[...] = info.get("alt_title")
self.thumbnail: Optional[str] = info.get("thumbnail")
self.description: Optional[str] = info.get("description")
self.categories: Optional[List[str]] = info.get("categories")
self.tags: Optional[List[str]] = info.get("tags")
self.subtitles: Optional[Dict[str, List[Dict[str, str]]]] = info.get("subtitles")
self.automatic_captions: Optional[dict] = info.get("automatic_captions")
self.duration: Optional[timedelta] = timedelta(seconds=info.get("duration", 0))
self.age_limit: Optional[int] = info.get("age_limit")
self.annotations: Optional[...] = info.get("annotations")
self.chapters: Optional[...] = info.get("chapters")
self.webpage_url: Optional[str] = info.get("webpage_url")
self.view_count: Optional[int] = info.get("view_count")
self.like_count: Optional[int] = info.get("like_count")
self.dislike_count: Optional[int] = info.get("dislike_count")
self.average_rating: Optional[...] = info.get("average_rating")
self.formats: Optional[list] = info.get("formats")
self.is_live: Optional[bool] = info.get("is_live")
self.start_time: Optional[float] = info.get("start_time")
self.end_time: Optional[float] = info.get("end_time")
self.series: Optional[str] = info.get("series")
self.season_number: Optional[int] = info.get("season_number")
self.episode_number: Optional[int] = info.get("episode_number")
self.track: Optional[...] = info.get("track")
self.artist: Optional[...] = info.get("artist")
self.extractor: Optional[str] = info.get("extractor")
self.webpage_url_basename: Optional[str] = info.get("webpage_url_basename")
self.extractor_key: Optional[str] = info.get("extractor_key")
self.playlist: Optional[str] = info.get("playlist")
self.playlist_index: Optional[int] = info.get("playlist_index")
self.thumbnails: Optional[List[Dict[str, str]]] = info.get("thumbnails")
self.display_id: Optional[str] = info.get("display_id")
self.requested_subtitles: Optional[...] = info.get("requested_subtitles")
self.requested_formats: Optional[tuple] = info.get("requested_formats")
self.format: Optional[str] = info.get("format")
self.format_id: Optional[str] = info.get("format_id")
self.width: Optional[int] = info.get("width")
self.height: Optional[int] = info.get("height")
self.resolution: Optional[...] = info.get("resolution")
self.fps: Optional[int] = info.get("fps")
self.vcodec: Optional[str] = info.get("vcodec")
self.vbr: Optional[int] = info.get("vbr")
self.stretched_ratio: Optional[...] = info.get("stretched_ratio")
self.acodec: Optional[str] = info.get("acodec")
self.abr: Optional[int] = info.get("abr")
self.ext: Optional[str] = info.get("ext")
@classmethod
async def from_url(cls, url, loop: Optional[AbstractEventLoop] = None, **ytdl_args) -> List["YtdlInfo"]:
"""Fetch the info for an url through YoutubeDL.
Returns:
A :py:class:`list` containing the infos for the requested videos."""
if loop is None:
loop: AbstractEventLoop = get_event_loop()
# So many redundant options!
with YoutubeDL({**cls._default_ytdl_args, **ytdl_args}) as ytdl:
first_info = await asyncify(ytdl.extract_info, loop=loop, url=url, download=False)
# No video was found
if first_info is None:
return []
# If it is a playlist, create multiple videos!
if "entries" in first_info and first_info["entries"][0] is not None:
second_info_list = []
for second_info in first_info["entries"]:
if second_info is None:
continue
second_info_list.append(YtdlInfo(second_info))
return second_info_list
return [YtdlInfo(first_info)]
def __repr__(self):
if self.title:
return f"<YtdlInfo of '{self.title}'>"
if self.webpage_url:
return f"<YtdlInfo for '{self.webpage_url}'>"
return f"<YtdlInfo id={self.id} ...>"
def __str__(self):
if self.title:
return self.title
if self.webpage_url:
return self.webpage_url
return self.id

57
royalnet/bard/ytdlmp3.py Normal file
View file

@ -0,0 +1,57 @@
import typing
import re
import ffmpeg
import os
from .ytdlinfo import YtdlInfo
from .ytdlfile import YtdlFile
from royalnet.utils import asyncify, MultiLock
class YtdlMp3:
"""A representation of a YtdlFile conversion to mp3."""
def __init__(self, ytdl_file: YtdlFile):
self.ytdl_file: YtdlFile = ytdl_file
self.mp3_filename: typing.Optional[str] = None
self.lock: MultiLock = MultiLock()
@property
def is_converted(self):
"""Has the file been converted?"""
return self.mp3_filename is not None
async def convert_to_mp3(self) -> None:
"""Convert the file to mp3 with ``ffmpeg``."""
await self.ytdl_file.download_file()
if self.mp3_filename is None:
async with self.ytdl_file.lock.normal():
destination_filename = re.sub(r"\.[^.]+$", ".mp3", self.ytdl_file.filename)
async with self.lock.exclusive():
await asyncify(
ffmpeg.input(self.ytdl_file.filename)
.output(destination_filename, format="mp3")
.overwrite_output()
.run
)
self.mp3_filename = destination_filename
def delete_asap(self) -> None:
"""Delete the mp3 file."""
if self.is_converted:
async with self.lock.exclusive():
os.remove(self.mp3_filename)
self.mp3_filename = None
@classmethod
async def from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
"""Create a :class:`list` of :class:`YtdlMp3` from a URL."""
files = await YtdlFile.from_url(url, **ytdl_args)
dfiles = []
for file in files:
dfile = YtdlMp3(file)
dfiles.append(dfile)
return dfiles
@property
def info(self) -> typing.Optional[YtdlInfo]:
"""Shortcut to get the :class:`YtdlInfo` of the object."""
return self.ytdl_file.info

View file

@ -1,5 +1,5 @@
import re import re
import typing from typing import Pattern, AnyStr, Optional, Sequence, Union
from .commanderrors import InvalidInputError from .commanderrors import InvalidInputError
@ -38,7 +38,7 @@ class CommandArgs(list):
raise InvalidInputError(f"Not enough arguments specified (minimum is {require_at_least}).") raise InvalidInputError(f"Not enough arguments specified (minimum is {require_at_least}).")
return " ".join(self) return " ".join(self)
def match(self, pattern: typing.Union[str, typing.Pattern], *flags) -> typing.Sequence[typing.AnyStr]: def match(self, pattern: Union[str, Pattern], *flags) -> Sequence[AnyStr]:
"""Match the :py:func:`royalnet.utils.commandargs.joined` to a regex pattern. """Match the :py:func:`royalnet.utils.commandargs.joined` to a regex pattern.
Parameters: Parameters:
@ -55,7 +55,7 @@ class CommandArgs(list):
raise InvalidInputError("Invalid syntax.") raise InvalidInputError("Invalid syntax.")
return match.groups() return match.groups()
def optional(self, index: int, default=None) -> typing.Optional[str]: def optional(self, index: int, default=None) -> Optional[str]:
"""Get the argument at a specific index, but don't raise an error if nothing is found, instead returning the ``default`` value. """Get the argument at a specific index, but don't raise an error if nothing is found, instead returning the ``default`` value.
Parameters: Parameters:

View file

@ -1,4 +1,4 @@
import typing from typing import Dict, Callable
import warnings import warnings
from .commanderrors import UnsupportedError from .commanderrors import UnsupportedError
from .commandinterface import CommandInterface from .commandinterface import CommandInterface
@ -9,7 +9,7 @@ class CommandData:
def __init__(self, interface: CommandInterface): def __init__(self, interface: CommandInterface):
self._interface: CommandInterface = interface self._interface: CommandInterface = interface
if len(self._interface.command.tables) > 0: if len(self._interface.command.tables) > 0:
self.session = self._interface.alchemy.Session() self.session = self._interface.alchemy._Session()
else: else:
self.session = None self.session = None
@ -40,7 +40,7 @@ class CommandData:
error_if_none: Raise an exception if this is True and the call has no author.""" error_if_none: Raise an exception if this is True and the call has no author."""
raise UnsupportedError("'get_author' is not supported on this platform") raise UnsupportedError("'get_author' is not supported on this platform")
async def keyboard(self, text: str, keyboard: typing.Dict[str, typing.Callable]) -> None: async def keyboard(self, text: str, keyboard: Dict[str, Callable]) -> None:
"""Send a keyboard having the keys of the dict as keys and calling the correspondent values on a press. """Send a keyboard having the keys of the dict as keys and calling the correspondent values on a press.
The function should be passed the :py:class:`CommandData` instance as a argument.""" The function should be passed the :py:class:`CommandData` instance as a argument."""

View file

@ -3,8 +3,8 @@ import asyncio
from .commanderrors import UnsupportedError from .commanderrors import UnsupportedError
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from .command import Command from .command import Command
from ..database import Alchemy from ..alchemy import Alchemy
from ..bots import GenericBot from ..interfaces import GenericBot
class CommandInterface: class CommandInterface:

View file

@ -1,7 +0,0 @@
"""Relational database classes and methods."""
from .alchemy import Alchemy
from .relationshiplinkchain import relationshiplinkchain
from .databaseconfig import DatabaseConfig
__all__ = ["Alchemy", "relationshiplinkchain", "DatabaseConfig"]

View file

@ -1,61 +0,0 @@
import typing
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from contextlib import contextmanager, asynccontextmanager
from ..utils import asyncify
class Alchemy:
"""A wrapper around SQLAlchemy declarative that allows to use multiple databases at once while maintaining a single table-class for both of them."""
def __init__(self, database_uri: str, tables: typing.Set):
"""Create a new Alchemy object.
Args:
database_uri: The uri of the database, as described at https://docs.sqlalchemy.org/en/13/core/engines.html .
tables: The set of tables to be created and used in the selected database. Check the tables submodule for more details.
"""
if database_uri.startswith("sqlite"):
raise NotImplementedError("Support for sqlite databases is currently missing")
self.engine = create_engine(database_uri)
self.Base = declarative_base(bind=self.engine)
self.Session = sessionmaker(bind=self.engine)
self._create_tables(tables)
def _create_tables(self, tables: typing.Set):
for table in tables:
name = table.__name__
try:
self.__getattribute__(name)
except AttributeError:
# Actually the intended result
# TODO: here is the problem!
self.__setattr__(name, type(name, (self.Base, table), {}))
else:
raise NameError(f"{name} is a reserved name and can't be used as a table name")
self.Base.metadata.create_all()
@contextmanager
def session_cm(self):
"""Use Alchemy as a context manager (to be used in with statements)."""
session = self.Session()
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.close()
@asynccontextmanager
async def session_acm(self):
"""Use Alchemy as a asyncronous context manager (to be used in async with statements)."""
session = await asyncify(self.Session)
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.close()

View file

@ -1,15 +0,0 @@
import typing
class DatabaseConfig:
"""The configuration to be used for the :py:class:`royalnet.database.Alchemy` component of :py:class:`royalnet.bots.GenericBot`."""
def __init__(self,
database_uri: str,
master_table: typing.Type,
identity_table: typing.Type,
identity_column_name: str):
self.database_uri: str = database_uri
self.master_table: typing.Type = master_table
self.identity_table: typing.Type = identity_table
self.identity_column_name: str = identity_column_name

View file

@ -1,6 +1,6 @@
"""Various bot interfaces, and a common class to create new ones.""" """Various bot interfaces, and a common class to create new ones."""
from .generic import GenericBot from .interface import GenericBot
from .telegram import TelegramBot from .telegram import TelegramBot
from .discord import DiscordBot from .discord import DiscordBot

View file

@ -0,0 +1,2 @@
from .create_rich_embed import create_rich_embed
from .escape import escape

View file

@ -0,0 +1,27 @@
from discord import Embed, Colour
from discord.embeds import EmptyEmbed
from royalnet.bard import YtdlInfo
def create_rich_embed(yi: YtdlInfo) -> Embed:
"""Return this info as a :py:class:`discord.Embed`."""
colors = {
"youtube": 0xCC0000,
"soundcloud": 0xFF5400,
"Clyp": 0x3DBEB3,
"Bandcamp": 0x1DA0C3,
}
embed = Embed(title=yi.title,
colour=Colour(colors.get(yi.extractor, 0x4F545C)),
url=yi.webpage_url if (yi.webpage_url and yi.webpage_url.startswith("http")) else EmptyEmbed)
if yi.thumbnail:
embed.set_thumbnail(url=yi.thumbnail)
if yi.uploader:
embed.set_author(name=yi.uploader,
url=yi.uploader_url if yi.uploader_url is not None else EmptyEmbed)
# embed.set_footer(text="Source: youtube-dl", icon_url="https://i.imgur.com/TSvSRYn.png")
if yi.duration:
embed.add_field(name="Duration", value=str(yi.duration), inline=True)
if yi.upload_date:
embed.add_field(name="Published on", value=yi.upload_date.strftime("%d %b %Y"), inline=True)
return embed

View file

@ -2,10 +2,10 @@ import discord
import sentry_sdk import sentry_sdk
import logging as _logging import logging as _logging
from .generic import GenericBot from .generic import GenericBot
from ..utils import * from royalnet.utils import *
from ..error import * from royalnet.error import *
from ..audio import * from royalnet.bard import *
from ..commands import * from royalnet.commands import *
log = _logging.getLogger(__name__) log = _logging.getLogger(__name__)

View file

@ -0,0 +1,18 @@
def escape(string: str) -> str:
"""Escape a string to be sent through Discord, and format it using RoyalCode.
Warning:
Currently escapes everything, even items in code blocks."""
return string.replace("*", "\\*") \
.replace("_", "\\_") \
.replace("`", "\\`") \
.replace("[b]", "**") \
.replace("[/b]", "**") \
.replace("[i]", "_") \
.replace("[/i]", "_") \
.replace("[u]", "__") \
.replace("[/u]", "__") \
.replace("[c]", "`") \
.replace("[/c]", "`") \
.replace("[p]", "```") \
.replace("[/p]", "```")

View file

@ -2,7 +2,7 @@ import discord
class FileAudioSource(discord.AudioSource): class FileAudioSource(discord.AudioSource):
"""A :py:class:`discord.AudioSource` that uses a :py:class:`io.BufferedIOBase` as an input instead of memory. """A :class:`discord.AudioSource` that uses a :class:`io.BufferedIOBase` as an input instead of memory.
The stream should be in the usual PCM encoding. The stream should be in the usual PCM encoding.

View file

@ -1,6 +1,6 @@
import math from math import inf
import random from random import shuffle
import typing from typing import Optional, List, AsyncGenerator, Union
from collections import namedtuple from collections import namedtuple
from .ytdldiscord import YtdlDiscord from .ytdldiscord import YtdlDiscord
from .fileaudiosource import FileAudioSource from .fileaudiosource import FileAudioSource
@ -11,17 +11,17 @@ class PlayMode:
def __init__(self): def __init__(self):
"""Create a new PlayMode and initialize the generator inside.""" """Create a new PlayMode and initialize the generator inside."""
self.now_playing: typing.Optional[YtdlDiscord] = None self.now_playing: Optional[YtdlDiscord] = None
self.generator: typing.AsyncGenerator = self._generate_generator() self.generator: AsyncGenerator = self._generate_generator()
async def next(self) -> typing.Optional[FileAudioSource]: async def next(self) -> Optional[FileAudioSource]:
"""Get the next :py:class:`royalnet.audio.FileAudioSource` from the list and advance it. """Get the next :py:class:`royalnet.audio.FileAudioSource` from the list and advance it.
Returns: Returns:
The next :py:class:`royalnet.audio.FileAudioSource`.""" The next :py:class:`royalnet.audio.FileAudioSource`."""
return await self.generator.__anext__() return await self.generator.__anext__()
def videos_left(self) -> typing.Union[int, float]: def videos_left(self) -> Union[int, float]:
"""Return the number of videos left in the PlayMode. """Return the number of videos left in the PlayMode.
Returns: Returns:
@ -49,7 +49,7 @@ class PlayMode:
"""Delete all :py:class:`royalnet.audio.YtdlDiscord` contained inside this PlayMode.""" """Delete all :py:class:`royalnet.audio.YtdlDiscord` contained inside this PlayMode."""
raise NotImplementedError() raise NotImplementedError()
def queue_preview(self) -> typing.List[YtdlDiscord]: def queue_preview(self) -> List[YtdlDiscord]:
"""Display all the videos in the PlayMode as a list, if possible. """Display all the videos in the PlayMode as a list, if possible.
To be used with ``queue`` packs, for example. To be used with ``queue`` packs, for example.
@ -65,7 +65,7 @@ class PlayMode:
class Playlist(PlayMode): class Playlist(PlayMode):
"""A video list. :py:class:`royalnet.audio.YtdlDiscord` played are removed from the list.""" """A video list. :py:class:`royalnet.audio.YtdlDiscord` played are removed from the list."""
def __init__(self, starting_list: typing.List[YtdlDiscord] = None): def __init__(self, starting_list: List[YtdlDiscord] = None):
"""Create a new Playlist. """Create a new Playlist.
Args: Args:
@ -73,9 +73,9 @@ class Playlist(PlayMode):
super().__init__() super().__init__()
if starting_list is None: if starting_list is None:
starting_list = [] starting_list = []
self.list: typing.List[YtdlDiscord] = starting_list self.list: List[YtdlDiscord] = starting_list
def videos_left(self) -> typing.Union[int, float]: def videos_left(self) -> Union[int, float]:
return len(self.list) return len(self.list)
async def _generate_generator(self): async def _generate_generator(self):
@ -100,14 +100,14 @@ class Playlist(PlayMode):
while self.list: while self.list:
self.list.pop(0).delete() self.list.pop(0).delete()
def queue_preview(self) -> typing.List[YtdlDiscord]: def queue_preview(self) -> List[YtdlDiscord]:
return self.list return self.list
class Pool(PlayMode): class Pool(PlayMode):
"""A random pool. :py:class:`royalnet.audio.YtdlDiscord` are selected in random order and are not repeated until every song has been played at least once.""" """A random pool. :py:class:`royalnet.audio.YtdlDiscord` are selected in random order and are not repeated until every song has been played at least once."""
def __init__(self, starting_pool: typing.List[YtdlDiscord] = None): def __init__(self, starting_pool: List[YtdlDiscord] = None):
"""Create a new Pool. """Create a new Pool.
Args: Args:
@ -115,11 +115,11 @@ class Pool(PlayMode):
super().__init__() super().__init__()
if starting_pool is None: if starting_pool is None:
starting_pool = [] starting_pool = []
self.pool: typing.List[YtdlDiscord] = starting_pool self.pool: List[YtdlDiscord] = starting_pool
self._pool_copy: typing.List[YtdlDiscord] = [] self._pool_copy: List[YtdlDiscord] = []
def videos_left(self) -> typing.Union[int, float]: def videos_left(self) -> Union[int, float]:
return math.inf return inf
async def _generate_generator(self): async def _generate_generator(self):
while True: while True:
@ -128,7 +128,7 @@ class Pool(PlayMode):
yield None yield None
continue continue
self._pool_copy = self.pool.copy() self._pool_copy = self.pool.copy()
random.shuffle(self._pool_copy) shuffle(self._pool_copy)
while self._pool_copy: while self._pool_copy:
next_video = self._pool_copy.pop(0) next_video = self._pool_copy.pop(0)
self.now_playing = next_video self.now_playing = next_video
@ -137,7 +137,7 @@ class Pool(PlayMode):
def add(self, item) -> None: def add(self, item) -> None:
self.pool.append(item) self.pool.append(item)
self._pool_copy.append(item) self._pool_copy.append(item)
random.shuffle(self._pool_copy) shuffle(self._pool_copy)
def delete(self) -> None: def delete(self) -> None:
for item in self.pool: for item in self.pool:
@ -145,9 +145,9 @@ class Pool(PlayMode):
self.pool = None self.pool = None
self._pool_copy = None self._pool_copy = None
def queue_preview(self) -> typing.List[YtdlDiscord]: def queue_preview(self) -> List[YtdlDiscord]:
preview_pool = self.pool.copy() preview_pool = self.pool.copy()
random.shuffle(preview_pool) shuffle(preview_pool)
return preview_pool return preview_pool
@ -156,7 +156,7 @@ class Layers(PlayMode):
Layer = namedtuple("Layer", ["dfile", "source"]) Layer = namedtuple("Layer", ["dfile", "source"])
def __init__(self, starting_layers: typing.List[YtdlDiscord] = None): def __init__(self, starting_layers: List[YtdlDiscord] = None):
super().__init__() super().__init__()
if starting_layers is None: if starting_layers is None:
starting_layers = [] starting_layers = []
@ -164,7 +164,7 @@ class Layers(PlayMode):
for item in starting_layers: for item in starting_layers:
self.add(item) self.add(item)
def videos_left(self) -> typing.Union[int, float]: def videos_left(self) -> Union[int, float]:
return 1 if len(self.layers) > 0 else 0 return 1 if len(self.layers) > 0 else 0
async def _generate_generator(self): async def _generate_generator(self):
@ -209,5 +209,5 @@ class Layers(PlayMode):
item.dfile.delete() item.dfile.delete()
self.layers = None self.layers = None
def queue_preview(self) -> typing.List[YtdlDiscord]: def queue_preview(self) -> List[YtdlDiscord]:
return [layer.dfile for layer in self.layers] return [layer.dfile for layer in self.layers]

View file

@ -1,31 +1,31 @@
import typing from typing import Optional, List
import re from re import sub
import ffmpeg from ffmpeg import input
import os from os import path, remove
from .ytdlinfo import YtdlInfo from royalnet.bard import YtdlInfo
from .ytdlfile import YtdlFile from royalnet.bard import YtdlFile
from .fileaudiosource import FileAudioSource from .fileaudiosource import FileAudioSource
class YtdlDiscord: class YtdlDiscord:
def __init__(self, ytdl_file: YtdlFile): def __init__(self, ytdl_file: YtdlFile):
self.ytdl_file: YtdlFile = ytdl_file self.ytdl_file: YtdlFile = ytdl_file
self.pcm_filename: typing.Optional[str] = None self.pcm_filename: Optional[str] = None
self._fas_spawned: typing.List[FileAudioSource] = [] self._fas_spawned: List[FileAudioSource] = []
def __repr__(self): def __repr__(self):
return f"<{self.__class__.__name__} {self.info.title or 'Unknown Title'} ({'ready' if self.pcm_available() else 'not ready'}," \ return f"<{self.__class__.__name__} {self.info.title or 'Unknown Title'} ({'ready' if self.pcm_available() else 'not ready'}," \
f" {len(self._fas_spawned)} audiosources spawned)>" f" {len(self._fas_spawned)} audiosources spawned)>"
def pcm_available(self): def pcm_available(self):
return self.pcm_filename is not None and os.path.exists(self.pcm_filename) return self.pcm_filename is not None and path.exists(self.pcm_filename)
def convert_to_pcm(self) -> None: def convert_to_pcm(self) -> None:
if not self.ytdl_file.is_downloaded(): if not self.ytdl_file.is_downloaded():
raise FileNotFoundError("File hasn't been downloaded yet") raise FileNotFoundError("File hasn't been downloaded yet")
destination_filename = re.sub(r"\.[^.]+$", ".pcm", self.ytdl_file.filename) destination_filename = sub(r"\.[^.]+$", ".pcm", self.ytdl_file.filename)
( (
ffmpeg.input(self.ytdl_file.filename) input(self.ytdl_file.filename)
.output(destination_filename, format="s16le", ac=2, ar="48000") .output(destination_filename, format="s16le", ac=2, ar="48000")
.overwrite_output() .overwrite_output()
.run(quiet=not __debug__) .run(quiet=not __debug__)
@ -34,7 +34,7 @@ class YtdlDiscord:
def ready_up(self): def ready_up(self):
if not self.ytdl_file.has_info(): if not self.ytdl_file.has_info():
self.ytdl_file.update_info() self.ytdl_file.retrieve_info()
if not self.ytdl_file.is_downloaded(): if not self.ytdl_file.is_downloaded():
self.ytdl_file.download_file() self.ytdl_file.download_file()
if not self.pcm_available(): if not self.pcm_available():
@ -54,12 +54,12 @@ class YtdlDiscord:
for source in self._fas_spawned: for source in self._fas_spawned:
if not source.file.closed: if not source.file.closed:
source.file.close() source.file.close()
os.remove(self.pcm_filename) remove(self.pcm_filename)
self.pcm_filename = None self.pcm_filename = None
self.ytdl_file.delete() self.ytdl_file.delete()
@classmethod @classmethod
def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlDiscord"]: def create_from_url(cls, url, **ytdl_args) -> List["YtdlDiscord"]:
files = YtdlFile.download_from_url(url, **ytdl_args) files = YtdlFile.download_from_url(url, **ytdl_args)
dfiles = [] dfiles = []
for file in files: for file in files:
@ -68,5 +68,5 @@ class YtdlDiscord:
return dfiles return dfiles
@property @property
def info(self) -> typing.Optional[YtdlInfo]: def info(self) -> Optional[YtdlInfo]:
return self.ytdl_file.info return self.ytdl_file.info

View file

@ -9,7 +9,7 @@ from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.aiohttp import AioHttpIntegration from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.logging import LoggingIntegration from sentry_sdk.integrations.logging import LoggingIntegration
from ..utils import * from ..utils import *
from ..database import * from ..alchemy import *
from ..commands import * from ..commands import *
from ..error import * from ..error import *
@ -17,7 +17,7 @@ from ..error import *
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class GenericBot: class Interface:
"""A common bot class, to be used as base for the other more specific classes, such as """A common bot class, to be used as base for the other more specific classes, such as
:py:class:`royalnet.bots.TelegramBot` and :py:class:`royalnet.bots.DiscordBot`. """ :py:class:`royalnet.bots.TelegramBot` and :py:class:`royalnet.bots.DiscordBot`. """
interface_name = NotImplemented interface_name = NotImplemented
@ -29,7 +29,7 @@ class GenericBot:
self._Interface = self._interface_factory() self._Interface = self._interface_factory()
self._Data = self._data_factory() self._Data = self._data_factory()
self.commands = {} self.commands = {}
self.network_handlers: typing.Dict[str, typing.Callable[["GenericBot", typing.Any], self.network_handlers: typing.Dict[str, typing.Callable[["Interface", typing.Any],
typing.Awaitable[typing.Optional[typing.Dict]]]] = {} typing.Awaitable[typing.Optional[typing.Dict]]]] = {}
for SelectedCommand in self.uninitialized_commands: for SelectedCommand in self.uninitialized_commands:
interface = self._Interface() interface = self._Interface()
@ -142,7 +142,7 @@ class GenericBot:
self.identity_column = self.identity_table.__getattribute__(self.identity_table, self.identity_column = self.identity_table.__getattribute__(self.identity_table,
self.uninitialized_database_config.identity_column_name) self.uninitialized_database_config.identity_column_name)
log.debug(f"Identity column: {self.identity_column.__class__.__qualname__}") log.debug(f"Identity column: {self.identity_column.__class__.__qualname__}")
self.identity_chain = relationshiplinkchain(self.master_table, self.identity_table) self.identity_chain = table_dfs(self.master_table, self.identity_table)
log.debug(f"Identity chain: {' -> '.join([str(item) for item in self.identity_chain])}") log.debug(f"Identity chain: {' -> '.join([str(item) for item in self.identity_chain])}")
else: else:
log.info(f"Alchemy: disabled") log.info(f"Alchemy: disabled")

View file

@ -0,0 +1 @@
from .escape import escape

View file

@ -0,0 +1,17 @@
def escape(string: str) -> str:
"""Escape a string to be sent through Telegram (as HTML), and format it using RoyalCode.
Warning:
Currently escapes everything, even items in code blocks."""
return string.replace("<", "&lt;") \
.replace(">", "&gt;") \
.replace("[b]", "<b>") \
.replace("[/b]", "</b>") \
.replace("[i]", "<i>") \
.replace("[/i]", "</i>") \
.replace("[u]", "<b>") \
.replace("[/u]", "</b>") \
.replace("[c]", "<code>") \
.replace("[/c]", "</code>") \
.replace("[p]", "<pre>") \
.replace("[/p]", "</pre>")

View file

@ -6,7 +6,7 @@ import asyncio
import sentry_sdk import sentry_sdk
import logging as _logging import logging as _logging
import warnings import warnings
from .generic import GenericBot from .interface import GenericBot
from ..utils import * from ..utils import *
from ..error import * from ..error import *
from ..commands import * from ..commands import *

View file

@ -1,4 +1,4 @@
from . import common from . import default
__all__ = [ __all__ = [
"common", "common",

View file

@ -4,9 +4,10 @@ from .asyncify import asyncify
from .escaping import telegram_escape, discord_escape from .escaping import telegram_escape, discord_escape
from .safeformat import safeformat from .safeformat import safeformat
from .classdictjanitor import cdj from .classdictjanitor import cdj
from .sleepuntil import sleep_until from .sleep_until import sleep_until
from .formatters import andformat, plusformat, underscorize, ytdldateformat, numberemojiformat, splitstring, ordinalformat from .formatters import andformat, plusformat, underscorize, ytdldateformat, numberemojiformat, splitstring, ordinalformat
from .urluuid import to_urluuid, from_urluuid from .urluuid import to_urluuid, from_urluuid
from .multilock import MultiLock
__all__ = [ __all__ = [
"asyncify", "asyncify",
@ -25,4 +26,5 @@ __all__ = [
"ordinalformat", "ordinalformat",
"to_urluuid", "to_urluuid",
"from_urluuid", "from_urluuid",
"MultiLock",
] ]

View file

@ -3,10 +3,11 @@ import functools
import typing import typing
async def asyncify(function: typing.Callable, *args, **kwargs): async def asyncify(function: typing.Callable, *args, loop: typing.Optional[asyncio.AbstractEventLoop] = None, **kwargs):
"""Convert a function into a coroutine. """Asyncronously run the function in a different thread or process, preventing it from blocking the event loop.
Warning: Warning:
The coroutine cannot be cancelled, and any attempts to do so will result in unexpected outputs.""" If the function has side effects, it may behave strangely."""
loop = asyncio.get_event_loop() if not loop:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, functools.partial(function, *args, **kwargs)) return await loop.run_in_executor(None, functools.partial(function, *args, **kwargs))

View file

@ -1,19 +0,0 @@
import typing
def cdj(class_: typing.Any) -> dict:
"""Return a dict of the class attributes without the ``__module__``, ``__dict__``, ``__weakref__`` and ``__doc__`` keys, to be used while generating dynamically SQLAlchemy declarative table classes.
Parameters:
class_: The object that you want to dict-ify.
Returns:
The class dict.
Warning:
You can't dict-ify classes with ``__slots__``!"""
d = dict(class_.__dict__)
del d["__module__"]
del d["__dict__"]
del d["__weakref__"]
del d["__doc__"]
return d

View file

@ -1,37 +0,0 @@
def discord_escape(string: str) -> str:
"""Escape a string to be sent through Discord, and format it using RoyalCode.
Warning:
Currently escapes everything, even items in code blocks."""
return string.replace("*", "\\*") \
.replace("_", "\\_") \
.replace("`", "\\`") \
.replace("[b]", "**") \
.replace("[/b]", "**") \
.replace("[i]", "_") \
.replace("[/i]", "_") \
.replace("[u]", "__") \
.replace("[/u]", "__") \
.replace("[c]", "`") \
.replace("[/c]", "`") \
.replace("[p]", "```") \
.replace("[/p]", "```")
def telegram_escape(string: str) -> str:
"""Escape a string to be sent through Telegram, and format it using RoyalCode.
Warning:
Currently escapes everything, even items in code blocks."""
return string.replace("<", "&lt;") \
.replace(">", "&gt;") \
.replace("[b]", "<b>") \
.replace("[/b]", "</b>") \
.replace("[i]", "<i>") \
.replace("[/i]", "</i>") \
.replace("[u]", "<b>") \
.replace("[/u]", "</b>") \
.replace("[c]", "<code>") \
.replace("[/c]", "</code>") \
.replace("[p]", "<pre>") \
.replace("[/p]", "</pre>")

View file

@ -0,0 +1,41 @@
from asyncio import Event
from contextlib import asynccontextmanager
class MultiLock:
"""A lock that can allow both simultaneous access and exclusive access to a resource."""
def __init__(self):
self._counter: int = 0
self._normal_event: Event = Event()
self._exclusive_event: Event = Event()
self._exclusive_event.set()
def _check_event(self):
if self._counter > 0:
self._normal_event.clear()
else:
self._normal_event.set()
@asynccontextmanager
async def normal(self):
"""Acquire the lock for simultaneous access."""
await self._exclusive_event.wait()
self._counter += 1
self._check_event()
try:
yield
finally:
self._counter -= 1
self._check_event()
@asynccontextmanager
async def exclusive(self):
"""Acquire the lock for exclusive access."""
# TODO: check if this actually works
await self._exclusive_event.wait()
self._exclusive_event.clear()
await self._normal_event.wait()
try:
yield
finally:
self._exclusive_event.set()

View file

@ -3,7 +3,7 @@ import datetime
async def sleep_until(dt: datetime.datetime) -> None: async def sleep_until(dt: datetime.datetime) -> None:
"""Block the call until the specified datetime. """Sleep until the specified datetime.
Warning: Warning:
Accurate only to seconds.""" Accurate only to seconds."""

View file

@ -1,29 +0,0 @@
import re
import markdown2
class RenderError(Exception):
"""An error occurred while trying to render the page."""
def prepare_page_markdown(markdown):
if list(markdown).count(">") > 99:
raise RenderError("Too many nested quotes")
converted_md = markdown2.markdown(markdown.replace("<", "&lt;"),
extras=["spoiler", "tables", "smarty-pants", "fenced-code-blocks"])
converted_md = re.sub(r"{https?://(?:www\.)?(?:youtube\.com/watch\?.*?&?v=|youtu.be/)([0-9A-Za-z-]+).*?}",
r'<div class="youtube-embed">'
r' <iframe src="https://www.youtube-nocookie.com/embed/\1?rel=0&amp;showinfo=0"'
r' frameborder="0"'
r' allow="autoplay; encrypted-media"'
r' allowfullscreen'
r' width="640px"'
r' height="320px">'
r' </iframe>'
r'</div>', converted_md)
converted_md = re.sub(r"{https?://clyp.it/([a-z0-9]+)}",
r'<div class="clyp-embed">'
r' <iframe width="100%" height="160" src="https://clyp.it/\1/widget" frameborder="0">'
r' </iframe>'
r'</div>', converted_md)
return converted_md

View file

@ -1,4 +0,0 @@
semantic = "5.0a93"
if __name__ == "__main__":
print(semantic)

View file

@ -0,0 +1,95 @@
import typing
import uvicorn
import logging
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
import royalnet
import keyring
from starlette.applications import Starlette
from .star import PageStar, ExceptionStar
log = logging.getLogger(__name__)
class Constellation:
def __init__(self,
secrets_name: str,
database_uri: str,
tables: set,
page_stars: typing.List[typing.Type[PageStar]] = None,
exc_stars: typing.List[typing.Type[ExceptionStar]] = None,
*,
debug: bool = __debug__,):
if page_stars is None:
page_stars = []
if exc_stars is None:
exc_stars = []
self.secrets_name: str = secrets_name
log.info("Creating starlette app...")
self.starlette = Starlette(debug=debug)
log.info(f"Creating alchemy with tables: {' '.join([table.__name__ for table in tables])}")
self.alchemy: royalnet.alchemy.Alchemy = royalnet.alchemy.Alchemy(database_uri=database_uri, tables=tables)
log.info("Registering page_stars...")
for SelectedPageStar in page_stars:
try:
page_star_instance = SelectedPageStar(constellation=self)
except Exception as e:
log.error(f"{e.__class__.__qualname__} during the registration of {SelectedPageStar.__qualname__}")
sentry_sdk.capture_exception(e)
continue
log.info(f"Registering: {page_star_instance.path} -> {page_star_instance.__class__.__name__}")
self.starlette.add_route(page_star_instance.path, page_star_instance.page, page_star_instance.methods)
log.info("Registering exc_stars...")
for SelectedExcStar in exc_stars:
try:
exc_star_instance = SelectedExcStar(constellation=self)
except Exception as e:
log.error(f"{e.__class__.__qualname__} during the registration of {SelectedExcStar.__qualname__}")
sentry_sdk.capture_exception(e)
continue
log.info(f"Registering: {exc_star_instance.error} -> {exc_star_instance.__class__.__name__}")
self.starlette.add_exception_handler(exc_star_instance.error, exc_star_instance.page)
def _init_sentry(self):
sentry_dsn = self.get_secret("sentry")
if sentry_dsn:
# noinspection PyUnreachableCode
if __debug__:
release = "DEV"
else:
release = royalnet.version.semantic
log.info(f"Sentry: enabled (Royalnet {release})")
self.sentry = sentry_sdk.init(sentry_dsn,
integrations=[AioHttpIntegration(),
SqlalchemyIntegration(),
LoggingIntegration(event_level=None)],
release=release)
else:
log.info("Sentry: disabled")
def get_secret(self, username: str):
return keyring.get_password(f"Royalnet/{self.secrets_name}", username)
def set_secret(self, username: str, password: str):
return keyring.set_password(f"Royalnet/{self.secrets_name}", username, password)
def run_blocking(self, address: str, port: int, verbose: bool):
if verbose:
core_logger = logging.root
core_logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
core_logger.addHandler(stream_handler)
core_logger.debug("Logging setup complete.")
self._init_sentry()
log.info(f"Running constellation server on {address}:{port}...")
uvicorn.run(self.starlette, host=address, port=port)

37
royalnet/web/star.py Normal file
View file

@ -0,0 +1,37 @@
import typing
from starlette.requests import Request
from starlette.responses import Response
if typing.TYPE_CHECKING:
from .constellation import Constellation
class Star:
tables: set = {}
def __init__(self, constellation: "Constellation"):
self.constellation: "Constellation" = constellation
async def page(self, request: Request) -> Response:
raise NotImplementedError()
@property
def alchemy(self):
return self.constellation.alchemy
@property
def Session(self):
return self.constellation.alchemy._Session
@property
def session_acm(self):
return self.constellation.alchemy.session_acm
class PageStar(Star):
path: str = NotImplemented
methods: typing.List[str] = ["GET"]
class ExceptionStar(Star):
error: typing.Union[typing.Type[Exception], int]