1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Completato #83

* Start moving some stuff around

* Maybe this was a bad idea afterall

* This might actually work

* Fix leftover bugs

* Port a few commands to the new format

* MUCH STUFF

VERY DOGE
This commit is contained in:
Steffo 2019-08-25 17:32:28 +02:00 committed by GitHub
parent 30e8e60e29
commit 372e35a994
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
60 changed files with 1148 additions and 1099 deletions

View file

@ -1,3 +1,4 @@
from . import audio, bots, commands, database, network, utils, error, web, version from . import audio, bots, database, network, utils, error, web, version
from royalnet import commands
__all__ = ["audio", "bots", "commands", "database", "network", "utils", "error", "web", "version"] __all__ = ["audio", "bots", "commands", "database", "network", "utils", "error", "web", "version"]

View file

@ -5,6 +5,6 @@ from .ytdlinfo import YtdlInfo
from .ytdlfile import YtdlFile from .ytdlfile import YtdlFile
from .fileaudiosource import FileAudioSource from .fileaudiosource import FileAudioSource
from .ytdldiscord import YtdlDiscord from .ytdldiscord import YtdlDiscord
from .ytdlvorbis import YtdlVorbis from .ytdlmp3 import YtdlMp3
__all__ = ["playmodes", "YtdlInfo", "YtdlFile", "FileAudioSource", "YtdlDiscord", "YtdlVorbis"] __all__ = ["playmodes", "YtdlInfo", "YtdlFile", "FileAudioSource", "YtdlDiscord", "YtdlMp3"]

View file

@ -7,16 +7,16 @@ from .ytdlfile import YtdlFile
from .fileaudiosource import FileAudioSource from .fileaudiosource import FileAudioSource
class YtdlVorbis: class YtdlMp3:
def __init__(self, ytdl_file: YtdlFile): def __init__(self, ytdl_file: YtdlFile):
self.ytdl_file: YtdlFile = ytdl_file self.ytdl_file: YtdlFile = ytdl_file
self.vorbis_filename: typing.Optional[str] = None self.mp3_filename: typing.Optional[str] = None
self._fas_spawned: typing.List[FileAudioSource] = [] self._fas_spawned: typing.List[FileAudioSource] = []
def pcm_available(self): def pcm_available(self):
return self.vorbis_filename is not None and os.path.exists(self.vorbis_filename) return self.mp3_filename is not None and os.path.exists(self.mp3_filename)
def convert_to_vorbis(self) -> None: def convert_to_mp3(self) -> None:
if not self.ytdl_file.is_downloaded(): if not self.ytdl_file.is_downloaded():
raise FileNotFoundError("File hasn't been downloaded yet") raise FileNotFoundError("File hasn't been downloaded yet")
destination_filename = re.sub(r"\.[^.]+$", ".mp3", self.ytdl_file.filename) destination_filename = re.sub(r"\.[^.]+$", ".mp3", self.ytdl_file.filename)
@ -26,7 +26,7 @@ class YtdlVorbis:
.overwrite_output() .overwrite_output()
.run() .run()
) )
self.vorbis_filename = destination_filename self.mp3_filename = destination_filename
def ready_up(self): def ready_up(self):
if not self.ytdl_file.has_info(): if not self.ytdl_file.has_info():
@ -34,28 +34,28 @@ class YtdlVorbis:
if not self.ytdl_file.is_downloaded(): if not self.ytdl_file.is_downloaded():
self.ytdl_file.download_file() self.ytdl_file.download_file()
if not self.pcm_available(): if not self.pcm_available():
self.convert_to_vorbis() self.convert_to_mp3()
def delete(self) -> None: def delete(self) -> None:
if self.pcm_available(): if self.pcm_available():
for source in self._fas_spawned: for source in self._fas_spawned:
if not source.file.closed: if not source.file.closed:
source.file.close() source.file.close()
os.remove(self.vorbis_filename) os.remove(self.mp3_filename)
self.vorbis_filename = None self.mp3_filename = None
self.ytdl_file.delete() self.ytdl_file.delete()
@classmethod @classmethod
def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlVorbis"]: def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
files = YtdlFile.download_from_url(url, **ytdl_args) files = YtdlFile.download_from_url(url, **ytdl_args)
dfiles = [] dfiles = []
for file in files: for file in files:
dfile = YtdlVorbis(file) dfile = YtdlMp3(file)
dfiles.append(dfile) dfiles.append(dfile)
return dfiles return dfiles
@classmethod @classmethod
def create_and_ready_from_url(cls, url, **ytdl_args) -> typing.List["YtdlVorbis"]: def create_and_ready_from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
dfiles = cls.create_from_url(url, **ytdl_args) dfiles = cls.create_from_url(url, **ytdl_args)
for dfile in dfiles: for dfile in dfiles:
dfile.ready_up() dfile.ready_up()

View file

@ -1,14 +1,13 @@
import discord import discord
import asyncio
import typing import typing
import logging as _logging import logging as _logging
from .generic import GenericBot from .generic import GenericBot
from ..commands import NullCommand from ..utils import *
from ..utils import asyncify, Call, Command, discord_escape from ..error import *
from ..error import UnregisteredError, NoneFoundError, TooManyFoundError, InvalidConfigError, RoyalnetResponseError from ..network import *
from ..network import RoyalnetConfig, Request, ResponseSuccess, ResponseError from ..database import *
from ..database import DatabaseConfig from ..audio import *
from ..audio import playmodes, YtdlDiscord from ..commands import *
log = _logging.getLogger(__name__) log = _logging.getLogger(__name__)
@ -25,7 +24,6 @@ class DiscordConfig:
class DiscordBot(GenericBot): class DiscordBot(GenericBot):
"""A bot that connects to `Discord <https://discordapp.com/>`_.""" """A bot that connects to `Discord <https://discordapp.com/>`_."""
interface_name = "discord" interface_name = "discord"
def _init_voice(self): def _init_voice(self):
@ -33,40 +31,30 @@ class DiscordBot(GenericBot):
log.debug(f"Creating music_data dict") log.debug(f"Creating music_data dict")
self.music_data: typing.Dict[discord.Guild, playmodes.PlayMode] = {} self.music_data: typing.Dict[discord.Guild, playmodes.PlayMode] = {}
def _call_factory(self) -> typing.Type[Call]: def _interface_factory(self) -> typing.Type[CommandInterface]:
log.debug(f"Creating DiscordCall") # noinspection PyPep8Naming
GenericInterface = super()._interface_factory()
# noinspection PyMethodParameters # noinspection PyMethodParameters,PyAbstractClass
class DiscordCall(Call): class DiscordInterface(GenericInterface):
interface_name = self.interface_name name = self.interface_name
interface_obj = self prefix = "!"
interface_prefix = "!"
alchemy = self.alchemy return DiscordInterface
async def reply(call, text: str): def _data_factory(self) -> typing.Type[CommandData]:
# TODO: don't escape characters inside [c][/c] blocks # noinspection PyMethodParameters,PyAbstractClass
await call.channel.send(discord_escape(text)) class DiscordData(CommandData):
def __init__(data, interface: CommandInterface, message: discord.Message):
data._interface = interface
data.message = message
async def net_request(call, request: Request, destination: str) -> dict: async def reply(data, text: str):
if self.network is None: await data.message.channel.send(discord_escape(text))
raise InvalidConfigError("Royalnet is not enabled on this bot")
response_dict: dict = await self.network.request(request.to_dict(), destination)
if "type" not in response_dict:
raise RoyalnetResponseError("Response is missing a type")
elif response_dict["type"] == "ResponseSuccess":
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
elif response_dict["type"] == "ResponseError":
response = ResponseError.from_dict(response_dict)
else:
raise RoyalnetResponseError("Response type is unknown")
response.raise_on_error()
return response.data
async def get_author(call, error_if_none=False): async def get_author(data, error_if_none=False):
message: discord.Message = call.kwargs["message"] user: discord.Member = data.message.author
user: discord.Member = message.author query = data._interface.session.query(self.master_table)
query = call.session.query(self.master_table)
for link in self.identity_chain: for link in self.identity_chain:
query = query.join(link.mapper.class_) query = query.join(link.mapper.class_)
query = query.filter(self.identity_column == user.id) query = query.filter(self.identity_column == user.id)
@ -75,7 +63,7 @@ class DiscordBot(GenericBot):
raise UnregisteredError("Author is not registered") raise UnregisteredError("Author is not registered")
return result return result
return DiscordCall return DiscordData
def _bot_factory(self) -> typing.Type[discord.Client]: def _bot_factory(self) -> typing.Type[discord.Client]:
"""Create a custom DiscordClient class inheriting from :py:class:`discord.Client`.""" """Create a custom DiscordClient class inheriting from :py:class:`discord.Client`."""
@ -109,20 +97,25 @@ class DiscordBot(GenericBot):
if not text: if not text:
return return
# Skip non-command updates # Skip non-command updates
if not text.startswith(self.command_prefix): if not text.startswith("!"):
return return
# Skip bot messages # Skip bot messages
author: typing.Union[discord.User] = message.author author: typing.Union[discord.User] = message.author
if author.bot: if author.bot:
return return
# Start typing
with message.channel.typing():
# Find and clean parameters # Find and clean parameters
command_text, *parameters = text.split(" ") command_text, *parameters = text.split(" ")
# Don't use a case-sensitive command name # Don't use a case-sensitive command name
command_name = command_text.lower() command_name = command_text.lower()
# Find the command
try:
command = self.commands[command_name]
except KeyError:
# Skip the message
return
# Call the command # Call the command
await self.call(command_name, message.channel, parameters, message=message) with message.channel.typing():
await command.run(CommandArgs(parameters), self._Data(interface=command.interface, message=message))
async def on_ready(cli): async def on_ready(cli):
log.debug("Connection successful, client is ready") log.debug("Connection successful, client is ready")
@ -148,12 +141,14 @@ class DiscordBot(GenericBot):
def find_channel_by_name(cli, def find_channel_by_name(cli,
name: str, name: str,
guild: typing.Optional[discord.Guild] = None) -> discord.abc.GuildChannel: guild: typing.Optional[discord.Guild] = None) -> discord.abc.GuildChannel:
"""Find the :py:class:`TextChannel`, :py:class:`VoiceChannel` or :py:class:`CategoryChannel` with the specified name. """Find the :py:class:`TextChannel`, :py:class:`VoiceChannel` or :py:class:`CategoryChannel` with the
specified name.
Case-insensitive. Case-insensitive.
Guild is optional, but the method will raise a :py:exc:`TooManyFoundError` if none is specified and there is more than one channel with the same name. Guild is optional, but the method will raise a :py:exc:`TooManyFoundError` if none is specified and
Will also raise a :py:exc:`NoneFoundError` if no channels are found.""" there is more than one channel with the same name. Will also raise a :py:exc:`NoneFoundError` if no
channels are found. """
if guild is not None: if guild is not None:
all_channels = guild.channels all_channels = guild.channels
else: else:
@ -194,16 +189,10 @@ class DiscordBot(GenericBot):
discord_config: DiscordConfig, discord_config: DiscordConfig,
royalnet_config: typing.Optional[RoyalnetConfig] = None, royalnet_config: typing.Optional[RoyalnetConfig] = None,
database_config: typing.Optional[DatabaseConfig] = None, database_config: typing.Optional[DatabaseConfig] = None,
command_prefix: str = "!", commands: typing.List[typing.Type[Command]] = None):
commands: typing.List[typing.Type[Command]] = None,
missing_command: typing.Type[Command] = NullCommand,
error_command: typing.Type[Command] = NullCommand):
super().__init__(royalnet_config=royalnet_config, super().__init__(royalnet_config=royalnet_config,
database_config=database_config, database_config=database_config,
command_prefix=command_prefix, commands=commands)
commands=commands,
missing_command=missing_command,
error_command=error_command)
self._discord_config = discord_config self._discord_config = discord_config
self._init_client() self._init_client()
self._init_voice() self._init_voice()

View file

@ -2,48 +2,71 @@ import sys
import typing import typing
import asyncio import asyncio
import logging import logging
from ..utils import Command, NetworkHandler, Call from ..utils import *
from ..commands import NullCommand from ..network import *
from ..network import RoyalnetLink, Request, Response, ResponseError, RoyalnetConfig from ..database import *
from ..database import Alchemy, DatabaseConfig, relationshiplinkchain from ..commands import *
from ..error import *
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class GenericBot: class GenericBot:
"""A generic bot class, to be used as base for the other more specific classes, such as :ref:`royalnet.bots.TelegramBot` and :ref:`royalnet.bots.DiscordBot`.""" """A generic bot class, to be used as base for the other more specific classes, such as
:ref:`royalnet.bots.TelegramBot` and :ref:`royalnet.bots.DiscordBot`. """
interface_name = NotImplemented interface_name = NotImplemented
def _init_commands(self, def _init_commands(self, commands: typing.List[typing.Type[Command]]) -> None:
command_prefix: str, """Generate the ``commands`` dictionary required to handle incoming messages, and the ``network_handlers``
commands: typing.List[typing.Type[Command]], dictionary required to handle incoming requests. """
missing_command: typing.Type[Command], log.debug(f"Now binding commands")
error_command: typing.Type[Command]) -> None: self._Interface = self._interface_factory()
"""Generate the ``commands`` dictionary required to handle incoming messages, and the ``network_handlers`` dictionary required to handle incoming requests.""" self._Data = self._data_factory()
log.debug(f"Now generating commands") self.commands = {}
self.command_prefix = command_prefix
self.commands: typing.Dict[str, typing.Type[Command]] = {}
self.network_handlers: typing.Dict[str, typing.Type[NetworkHandler]] = {} self.network_handlers: typing.Dict[str, typing.Type[NetworkHandler]] = {}
for command in commands: for SelectedCommand in commands:
lower_command_name = command.command_name.lower() interface = self._Interface()
self.commands[f"{command_prefix}{lower_command_name}"] = command self.commands[f"{interface.prefix}{SelectedCommand.name}"] = SelectedCommand(interface)
self.network_handlers = {**self.network_handlers, **command.network_handler_dict()} log.debug(f"Successfully bound commands")
self.missing_command: typing.Type[Command] = missing_command
self.error_command: typing.Type[Command] = error_command
log.debug(f"Successfully generated commands")
def _call_factory(self) -> typing.Type[Call]: def _interface_factory(self) -> typing.Type[CommandInterface]:
"""Create the TelegramCall class, representing a command call. It should inherit from :py:class:`royalnet.utils.Call`. # noinspection PyAbstractClass,PyMethodParameters
class GenericInterface(CommandInterface):
alchemy = self.alchemy
bot = self
loop = self.loop
Returns: def register_net_handler(ci, message_type: str, network_handler: typing.Callable):
The created TelegramCall class.""" self.network_handlers[message_type] = network_handler
def unregister_net_handler(ci, message_type: str):
del self.network_handlers[message_type]
async def net_request(ci, request: Request, destination: str) -> dict:
if self.network is None:
raise InvalidConfigError("Royalnet is not enabled on this bot")
response_dict: dict = await self.network.request(request.to_dict(), destination)
if "type" not in response_dict:
raise RoyalnetResponseError("Response is missing a type")
elif response_dict["type"] == "ResponseSuccess":
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
elif response_dict["type"] == "ResponseError":
response = ResponseError.from_dict(response_dict)
else:
raise RoyalnetResponseError("Response type is unknown")
response.raise_on_error()
return response.data
return GenericInterface
def _data_factory(self) -> typing.Type[CommandData]:
raise NotImplementedError() raise NotImplementedError()
def _init_royalnet(self, royalnet_config: RoyalnetConfig): def _init_royalnet(self, royalnet_config: RoyalnetConfig):
"""Create a :py:class:`royalnet.network.RoyalnetLink`, and run it as a :py:class:`asyncio.Task`.""" """Create a :py:class:`royalnet.network.RoyalnetLink`, and run it as a :py:class:`asyncio.Task`."""
self.network: RoyalnetLink = RoyalnetLink(royalnet_config.master_uri, royalnet_config.master_secret, self.interface_name, self.network: RoyalnetLink = RoyalnetLink(royalnet_config.master_uri, royalnet_config.master_secret,
self._network_handler) self.interface_name, self._network_handler)
log.debug(f"Running RoyalnetLink {self.network}") log.debug(f"Running RoyalnetLink {self.network}")
self.loop.create_task(self.network.run()) self.loop.create_task(self.network.run())
@ -74,16 +97,18 @@ class GenericBot:
_, exc, _ = sys.exc_info() _, exc, _ = sys.exc_info()
log.debug(f"Exception {exc} in {network_handler}") log.debug(f"Exception {exc} in {network_handler}")
return ResponseError("exception_in_handler", return ResponseError("exception_in_handler",
f"An exception was raised in {network_handler} for {request.handler}. Check extra_info for details.", f"An exception was raised in {network_handler} for {request.handler}. Check "
f"extra_info for details.",
extra_info={ extra_info={
"type": exc.__class__.__name__, "type": exc.__class__.__name__,
"str": str(exc) "str": str(exc)
}).to_dict() }).to_dict()
def _init_database(self, commands: typing.List[typing.Type[Command]], database_config: DatabaseConfig): def _init_database(self, commands: typing.List[typing.Type[Command]], database_config: DatabaseConfig):
"""Create an :py:class:`royalnet.database.Alchemy` with the tables required by the commands. Then, find the chain that links the ``master_table`` to the ``identity_table``.""" """Create an :py:class:`royalnet.database.Alchemy` with the tables required by the commands. Then,
find the chain that links the ``master_table`` to the ``identity_table``. """
log.debug(f"Initializing database") log.debug(f"Initializing database")
required_tables = set() required_tables = {database_config.master_table, database_config.identity_table}
for command in commands: for command in commands:
required_tables = required_tables.union(command.require_alchemy_tables) required_tables = required_tables.union(command.require_alchemy_tables)
log.debug(f"Found {len(required_tables)} required tables") log.debug(f"Found {len(required_tables)} required tables")
@ -98,10 +123,7 @@ class GenericBot:
def __init__(self, *, def __init__(self, *,
royalnet_config: typing.Optional[RoyalnetConfig] = None, royalnet_config: typing.Optional[RoyalnetConfig] = None,
database_config: typing.Optional[DatabaseConfig] = None, database_config: typing.Optional[DatabaseConfig] = None,
command_prefix: str,
commands: typing.List[typing.Type[Command]] = None, commands: typing.List[typing.Type[Command]] = None,
missing_command: typing.Type[Command] = NullCommand,
error_command: typing.Type[Command] = NullCommand,
loop: asyncio.AbstractEventLoop = None): loop: asyncio.AbstractEventLoop = None):
if loop is None: if loop is None:
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
@ -116,35 +138,12 @@ class GenericBot:
self._init_database(commands=commands, database_config=database_config) self._init_database(commands=commands, database_config=database_config)
if commands is None: if commands is None:
commands = [] commands = []
self._init_commands(command_prefix, commands, missing_command=missing_command, error_command=error_command) self._init_commands(commands)
self._Call = self._call_factory()
if royalnet_config is None: if royalnet_config is None:
self.network = None self.network = None
else: else:
self._init_royalnet(royalnet_config=royalnet_config) self._init_royalnet(royalnet_config=royalnet_config)
async def call(self, command_name: str, channel, parameters: typing.List[str] = None, **kwargs):
"""Call the command with the specified name.
If it doesn't exist, call ``self.missing_command``.
If an exception is raised during the execution of the command, call ``self.error_command``."""
log.debug(f"Trying to call {command_name}")
if parameters is None:
parameters = []
try:
command: typing.Type[Command] = self.commands[command_name]
except KeyError:
log.debug(f"Calling missing_command because {command_name} does not exist")
command = self.missing_command
try:
await self._Call(channel, command, parameters, **kwargs).run()
except Exception as exc:
log.debug(f"Calling error_command because of an error in {command_name}")
await self._Call(channel, self.error_command,
exception=exc,
previous_command=command, **kwargs).run()
async def run(self): async def run(self):
"""A blocking coroutine that should make the bot start listening to commands and requests.""" """A blocking coroutine that should make the bot start listening to commands and requests."""
raise NotImplementedError() raise NotImplementedError()

View file

@ -1,14 +1,13 @@
import telegram import telegram
import telegram.utils.request import telegram.utils.request
import asyncio
import typing import typing
import logging as _logging import logging as _logging
from .generic import GenericBot from .generic import GenericBot
from ..commands import NullCommand from ..utils import *
from ..utils import asyncify, Call, Command, telegram_escape from ..error import *
from ..error import UnregisteredError, InvalidConfigError, RoyalnetResponseError from ..network import *
from ..network import RoyalnetConfig, Request, ResponseSuccess, ResponseError from ..database import *
from ..database import DatabaseConfig from ..commands import *
log = _logging.getLogger(__name__) log = _logging.getLogger(__name__)
@ -31,43 +30,36 @@ class TelegramBot(GenericBot):
self.client = telegram.Bot(self._telegram_config.token, request=request) self.client = telegram.Bot(self._telegram_config.token, request=request)
self._offset: int = -100 self._offset: int = -100
def _call_factory(self) -> typing.Type[Call]: def _interface_factory(self) -> typing.Type[CommandInterface]:
# noinspection PyMethodParameters # noinspection PyPep8Naming
class TelegramCall(Call): GenericInterface = super()._interface_factory()
interface_name = self.interface_name
interface_obj = self
interface_prefix = "/"
alchemy = self.alchemy # noinspection PyMethodParameters,PyAbstractClass
class TelegramInterface(GenericInterface):
name = self.interface_name
prefix = "/"
async def reply(call, text: str): return TelegramInterface
await asyncify(call.channel.send_message, telegram_escape(text),
def _data_factory(self) -> typing.Type[CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class TelegramData(CommandData):
def __init__(data, interface: CommandInterface, update: telegram.Update):
data._interface = interface
data.update = update
async def reply(data, text: str):
await asyncify(data.update.effective_chat.send_message, telegram_escape(text),
parse_mode="HTML", parse_mode="HTML",
disable_web_page_preview=True) disable_web_page_preview=True)
async def net_request(call, request: Request, destination: str) -> dict: async def get_author(data, error_if_none=False):
if self.network is None: user: telegram.User = data.update.effective_user
raise InvalidConfigError("Royalnet is not enabled on this bot")
response_dict: dict = await self.network.request(request.to_dict(), destination)
if "type" not in response_dict:
raise RoyalnetResponseError("Response is missing a type")
elif response_dict["type"] == "ResponseSuccess":
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
elif response_dict["type"] == "ResponseError":
response = ResponseError.from_dict(response_dict)
else:
raise RoyalnetResponseError("Response type is unknown")
response.raise_on_error()
return response.data
async def get_author(call, error_if_none=False):
update: telegram.Update = call.kwargs["update"]
user: telegram.User = update.effective_user
if user is None: if user is None:
if error_if_none: if error_if_none:
raise UnregisteredError("No author for this message") raise UnregisteredError("No author for this message")
return None return None
query = call.session.query(self.master_table) query = data._interface.session.query(self.master_table)
for link in self.identity_chain: for link in self.identity_chain:
query = query.join(link.mapper.class_) query = query.join(link.mapper.class_)
query = query.filter(self.identity_column == user.id) query = query.filter(self.identity_column == user.id)
@ -76,22 +68,16 @@ class TelegramBot(GenericBot):
raise UnregisteredError("Author is not registered") raise UnregisteredError("Author is not registered")
return result return result
return TelegramCall return TelegramData
def __init__(self, *, def __init__(self, *,
telegram_config: TelegramConfig, telegram_config: TelegramConfig,
royalnet_config: typing.Optional[RoyalnetConfig] = None, royalnet_config: typing.Optional[RoyalnetConfig] = None,
database_config: typing.Optional[DatabaseConfig] = None, database_config: typing.Optional[DatabaseConfig] = None,
command_prefix: str = "/", commands: typing.List[typing.Type[Command]] = None):
commands: typing.List[typing.Type[Command]] = None,
missing_command: typing.Type[Command] = NullCommand,
error_command: typing.Type[Command] = NullCommand):
super().__init__(royalnet_config=royalnet_config, super().__init__(royalnet_config=royalnet_config,
database_config=database_config, database_config=database_config,
command_prefix=command_prefix, commands=commands)
commands=commands,
missing_command=missing_command,
error_command=error_command)
self._telegram_config = telegram_config self._telegram_config = telegram_config
self._init_client() self._init_client()
@ -108,15 +94,21 @@ class TelegramBot(GenericBot):
if text is None: if text is None:
return return
# Skip non-command updates # Skip non-command updates
if not text.startswith(self.command_prefix): if not text.startswith("/"):
return return
# Find and clean parameters # Find and clean parameters
command_text, *parameters = text.split(" ") command_text, *parameters = text.split(" ")
command_name = command_text.replace(f"@{self.client.username}", "").lower() command_name = command_text.replace(f"@{self.client.username}", "").lower()
# Send a typing notification # Send a typing notification
self.client.send_chat_action(update.message.chat, telegram.ChatAction.TYPING) update.message.chat.send_action(telegram.ChatAction.TYPING)
# Call the command # Find the command
await self.call(command_name, update.message.chat, parameters, update=update) try:
command = self.commands[command_name]
except KeyError:
# Skip the message
return
# Run the command
await command.run(CommandArgs(parameters), self._Data(interface=command.interface, update=update))
async def run(self): async def run(self):
while True: while True:
@ -134,12 +126,3 @@ class TelegramBot(GenericBot):
self._offset = last_updates[-1].update_id + 1 self._offset = last_updates[-1].update_id + 1
except IndexError: except IndexError:
pass pass
@property
def botfather_command_string(self) -> str:
"""Generate a string to be pasted in the "Edit Commands" BotFather prompt."""
string = ""
for command_key in self.commands:
command = self.commands[command_key]
string += f"{command.command_name} - {command.command_description}\n"
return string

View file

@ -1,39 +1,6 @@
"""Commands that can be used in bots. from .commandinterface import CommandInterface
from .command import Command
from .commanddata import CommandData
from .commandargs import CommandArgs
These probably won't suit your needs, as they are tailored for the bots of the Royal Games gaming community, but they may be useful to develop new ones.""" __all__ = ["CommandInterface", "Command", "CommandData", "CommandArgs"]
from .null import NullCommand
from .ping import PingCommand
from .ship import ShipCommand
from .smecds import SmecdsCommand
from .ciaoruozi import CiaoruoziCommand
from .color import ColorCommand
from .sync import SyncCommand
from .diario import DiarioCommand
from .rage import RageCommand
from .dateparser import DateparserCommand
from .author import AuthorCommand
from .reminder import ReminderCommand
from .kvactive import KvactiveCommand
from .kv import KvCommand
from .kvroll import KvrollCommand
from .videoinfo import VideoinfoCommand
from .summon import SummonCommand
from .play import PlayCommand
from .skip import SkipCommand
from .playmode import PlaymodeCommand
from .videochannel import VideochannelCommand
from .missing import MissingCommand
from .cv import CvCommand
from .pause import PauseCommand
from .queue import QueueCommand
from .royalnetprofile import RoyalnetprofileCommand
from .id import IdCommand
from .dlmusic import DlmusicCommand
__all__ = ["NullCommand", "PingCommand", "ShipCommand", "SmecdsCommand", "CiaoruoziCommand", "ColorCommand",
"SyncCommand", "DiarioCommand", "RageCommand", "DateparserCommand", "AuthorCommand", "ReminderCommand",
"KvactiveCommand", "KvCommand", "KvrollCommand", "VideoinfoCommand", "SummonCommand", "PlayCommand",
"SkipCommand", "PlaymodeCommand", "VideochannelCommand", "MissingCommand", "CvCommand", "PauseCommand",
"QueueCommand", "RoyalnetprofileCommand", "IdCommand", "DlmusicCommand"]

View file

@ -1,22 +0,0 @@
from ..utils import Command, Call
from telegram import Update, User
class CiaoruoziCommand(Command):
command_name = "ciaoruozi"
command_description = "Saluta Ruozi, anche se non è più in RYG."
command_syntax = ""
@classmethod
async def common(cls, call: "Call"):
await call.reply("👋 Ciao Ruozi!")
@classmethod
async def telegram(cls, call: Call):
update: Update = call.kwargs["update"]
user: User = update.effective_user
if user.id == 112437036:
await call.reply("👋 Ciao me!")
else:
await call.reply("👋 Ciao Ruozi!")

View file

@ -1,14 +0,0 @@
from ..utils import Command, Call
class ColorCommand(Command):
command_name = "color"
command_description = "Invia un colore in chat...?"
command_syntax = ""
@classmethod
async def common(cls, call: Call):
await call.reply("""
[i]I am sorry, unknown error occured during working with your request, Admin were notified[/i]
""")

View file

@ -0,0 +1,27 @@
import typing
from ..error import UnsupportedError
from .commandinterface import CommandInterface
from .commandargs import CommandArgs
from .commanddata import CommandData
class Command:
name: str = NotImplemented
"""The main name of the command.
To have ``/example`` on Telegram, the name should be ``example``."""
description: str = NotImplemented
"""A small description of the command, to be displayed when the command is being autocompleted."""
syntax: str = ""
"""The syntax of the command, to be displayed when a :py:exc:`royalnet.error.InvalidInputError` is raised,
in the format ``(required_arg) [optional_arg]``."""
require_alchemy_tables: typing.Set = set()
"""A set of :py:class:`royalnet.database` tables that must exist for this command to work."""
def __init__(self, interface: CommandInterface):
self.interface = interface
async def run(self, args: CommandArgs, data: CommandData) -> None:
raise UnsupportedError(f"Command {self.name} can't be called on {self.interface.name}.")

View file

@ -38,7 +38,7 @@ class CommandArgs(list):
raise InvalidInputError("Not enough arguments") raise InvalidInputError("Not enough arguments")
return " ".join(self) return " ".join(self)
def match(self, pattern: typing.Pattern) -> typing.Sequence[typing.AnyStr]: def match(self, pattern: typing.Union[str, typing.Pattern]) -> typing.Sequence[typing.AnyStr]:
"""Match the :py:func:`royalnet.utils.commandargs.joined` to a regex pattern. """Match the :py:func:`royalnet.utils.commandargs.joined` to a regex pattern.
Parameters: Parameters:

View file

@ -0,0 +1,18 @@
class CommandData:
async def reply(self, text: str) -> None:
"""Send a text message to the channel where the call was made.
Parameters:
text: The text to be sent, possibly formatted in the weird undescribed markup that I'm using."""
raise NotImplementedError()
async def get_author(self, error_if_none: bool = False):
"""Try to find the identifier of the user that sent the message.
That probably means, the database row identifying the user.
Parameters:
error_if_none: Raise a :py:exc:`royalnet.error.UnregisteredError` if this is True and the call has no author.
Raises:
:py:exc:`royalnet.error.UnregisteredError` if ``error_if_none`` is set to True and no author is found."""
raise NotImplementedError()

View file

@ -0,0 +1,33 @@
import typing
import asyncio
if typing.TYPE_CHECKING:
from ..database import Alchemy
from ..bots import GenericBot
class CommandInterface:
name: str = NotImplemented
prefix: str = NotImplemented
alchemy: "Alchemy" = NotImplemented
bot: "GenericBot" = NotImplemented
loop: asyncio.AbstractEventLoop = NotImplemented
def __init__(self):
self.session = self.alchemy.Session()
def register_net_handler(self, message_type: str, network_handler: typing.Callable):
"""Register a new handler for messages received through Royalnet."""
raise NotImplementedError()
def unregister_net_handler(self, message_type: str):
"""Remove a Royalnet handler."""
raise NotImplementedError()
async def net_request(self, message, destination: str) -> dict:
"""Send data through a :py:class:`royalnet.network.RoyalnetLink` and wait for a
:py:class:`royalnet.network.Reply`.
Parameters:
message: The data to be sent. Must be :py:mod:`pickle`-able.
destination: The destination of the request, either in UUID format or node name."""
raise NotImplementedError()

View file

@ -1,209 +0,0 @@
import re
import datetime
import telegram
import typing
import os
import aiohttp
from ..utils import Command, Call
from ..error import InvalidInputError, InvalidConfigError, ExternalError
from ..database.tables import Royal, Diario, Alias
from ..utils import asyncify
# NOTE: Requires imgur api key for image upload, get one at https://apidocs.imgur.com
class DiarioCommand(Command):
command_name = "diario"
command_description = "Aggiungi una citazione al Diario."
command_syntax = "[!] \"(testo)\" --[autore], [contesto]"
require_alchemy_tables = {Royal, Diario, Alias}
@classmethod
async def _telegram_to_imgur(cls, photosizes: typing.List[telegram.PhotoSize], caption="") -> str:
# Select the largest photo
largest_photo = sorted(photosizes, key=lambda p: p.width * p.height)[-1]
# Get the photo url
photo_file: telegram.File = await asyncify(largest_photo.get_file)
# Forward the url to imgur, as an upload
try:
imgur_api_key = os.environ["IMGUR_CLIENT_ID"]
except KeyError:
raise InvalidConfigError("Missing IMGUR_CLIENT_ID envvar, can't upload images to imgur.")
async with aiohttp.request("post", "https://api.imgur.com/3/upload", data={
"image": photo_file.file_path,
"type": "URL",
"title": "Diario image",
"description": caption
}, headers={
"Authorization": f"Client-ID {imgur_api_key}"
}) as request:
response = await request.json()
if not response["success"]:
raise ExternalError("imgur returned an error in the image upload.")
return response["data"]["link"]
@classmethod
async def common(cls, call: Call):
# Find the creator of the quotes
creator = await call.get_author(error_if_none=True)
# Recreate the full sentence
raw_text = " ".join(call.args)
# Pass the sentence through the diario regex
match = re.match(r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?', raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
timestamp = datetime.datetime.now()
# Ensure there is some text
if not text:
raise InvalidInputError("Missing text.")
# Or a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(call.session.query(call.alchemy.Alias).filter_by(alias=quoted.lower()).one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
if quoted_alias is not None and quoted_account is None:
await call.reply("⚠️ Il nome dell'autore è ambiguo, quindi la riga non è stata aggiunta.\n"
"Per piacere, ripeti il comando con un nome più specifico!")
return
# Create the diario quote
diario = call.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=None,
spoiler=spoiler)
call.session.add(diario)
await asyncify(call.session.commit)
await call.reply(f"{str(diario)}")
@classmethod
async def telegram(cls, call: Call):
update: telegram.Update = call.kwargs["update"]
message: telegram.Message = update.message
reply: telegram.Message = message.reply_to_message
creator = await call.get_author()
# noinspection PyUnusedLocal
quoted_account: typing.Optional[call.alchemy.Telegram]
# noinspection PyUnusedLocal
quoted: typing.Optional[str]
# noinspection PyUnusedLocal
text: typing.Optional[str]
# noinspection PyUnusedLocal
context: typing.Optional[str]
# noinspection PyUnusedLocal
timestamp: datetime.datetime
# noinspection PyUnusedLocal
media_url: typing.Optional[str]
# noinspection PyUnusedLocal
spoiler: bool
if creator is None:
await call.reply("⚠️ Devi essere registrato a Royalnet per usare questo comando!")
return
if reply is not None:
# Get the message text
text = reply.text
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = reply.photo
if photosizes:
# Text is a caption
text = reply.caption
media_url = await cls._telegram_to_imgur(photosizes, text if text is not None else "")
else:
media_url = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Find the Royalnet account associated with the sender
quoted_tg = await asyncify(call.session.query(call.alchemy.Telegram)
.filter_by(tg_id=reply.from_user.id)
.one_or_none)
quoted_account = quoted_tg.royal if quoted_tg is not None else None
# Find the quoted name to assign
quoted_user: telegram.User = reply.from_user
quoted = quoted_user.full_name
# Get the timestamp
timestamp = reply.date
# Set the other properties
spoiler = False
context = None
else:
# Get the current timestamp
timestamp = datetime.datetime.now()
# Get the message text
raw_text = " ".join(call.args)
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = message.photo
if photosizes:
media_url = await cls._telegram_to_imgur(photosizes, raw_text if raw_text is not None else "")
else:
media_url = None
# Parse the text, if it exists
if raw_text:
# Pass the sentence through the diario regex
match = re.match(r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?',
raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
# Ensure there's a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(
call.session.query(call.alchemy.Alias)
.filter_by(alias=quoted.lower()).one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
else:
text = None
quoted = None
quoted_account = None
spoiler = False
context = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Create the diario quote
diario = call.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=media_url,
spoiler=spoiler)
call.session.add(diario)
await asyncify(call.session.commit)
await call.reply(f"{str(diario)}")

View file

@ -1,34 +0,0 @@
import asyncio
import typing
import urllib.parse
from ..utils import Command, Call, asyncify
from ..audio import YtdlVorbis
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
seconds_before_deletion = 15*60
class DlmusicCommand(Command):
command_name = "dlmusic"
command_description = "Scarica un video."
command_syntax = "(url)"
@classmethod
async def common(cls, call: Call):
url = call.args.joined()
if url.startswith("http://") or url.startswith("https://"):
vfiles: typing.List[YtdlVorbis] = await asyncify(YtdlVorbis.create_and_ready_from_url, url, **ytdl_args)
else:
vfiles = await asyncify(YtdlVorbis.create_and_ready_from_url, f"ytsearch:{url}", **ytdl_args)
for vfile in vfiles:
await call.reply(f"⬇️ https://scaleway.steffo.eu/{urllib.parse.quote(vfile.vorbis_filename.replace('./downloads/', './musicbot_cache/'))}")
await asyncio.sleep(seconds_before_deletion)
for vfile in vfiles:
vfile.delete()

View file

@ -1,21 +0,0 @@
import asyncio
from ..utils import Command, Call
from ..error import InvalidInputError
class PingCommand(Command):
command_name = "ping"
command_description = "Ping pong dopo un po' di tempo!"
command_syntax = "[time_to_wait]"
@classmethod
async def common(cls, call: Call):
try:
time = int(call.args[0])
except InvalidInputError:
time = 0
except ValueError:
raise InvalidInputError("time_to_wait is not a number")
await asyncio.sleep(time)
await call.reply("🏓 Pong!")

View file

@ -1,84 +0,0 @@
import typing
import asyncio
import pickle
from ..utils import Command, Call, NetworkHandler, asyncify
from ..network import Request, ResponseSuccess
from ..error import TooManyFoundError, NoneFoundError
from ..audio import YtdlDiscord
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
class PlayNH(NetworkHandler):
message_type = "music_play"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a play Royalnet request. That is, add audio to a PlayMode."""
# Find the matching guild
if data["guild_name"]:
guild = bot.client.find_guild(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
if len(bot.music_data) > 1:
raise TooManyFoundError("Multiple guilds found")
guild = list(bot.music_data)[0]
# Ensure the guild has a PlayMode before adding the file to it
if not bot.music_data.get(guild):
# TODO: change Exception
raise Exception("No music_data for this guild")
# Start downloading
if data["url"].startswith("http://") or data["url"].startswith("https://"):
dfiles: typing.List[YtdlDiscord] = await asyncify(YtdlDiscord.create_and_ready_from_url, data["url"], **ytdl_args)
else:
dfiles = await asyncify(YtdlDiscord.create_and_ready_from_url, f"ytsearch:{data['url']}", **ytdl_args)
await bot.add_to_music_data(dfiles, guild)
# Create response dictionary
response = {
"videos": [{
"title": dfile.info.title,
"discord_embed_pickle": str(pickle.dumps(dfile.info.to_discord_embed()))
} for dfile in dfiles]
}
return ResponseSuccess(response)
async def notify_on_timeout(call: Call, url: str, time: float, repeat: bool = False):
"""Send a message after a while to let the user know that the bot is still downloading the files and hasn't crashed."""
while True:
await asyncio.sleep(time)
await call.reply(f" Il download di [c]{url}[/c] sta richiedendo più tempo del solito, ma è ancora in corso!")
if not repeat:
break
class PlayCommand(Command):
command_name = "play"
command_description = "Riproduce una canzone in chat vocale."
command_syntax = "[ [guild] ] (url)"
network_handlers = [PlayNH]
@classmethod
async def common(cls, call: Call):
guild_name, url = call.args.match(r"(?:\[(.+)])?\s*<?(.+)>?")
download_task = call.loop.create_task(call.net_request(Request("music_play", {"url": url, "guild_name": guild_name}), "discord"))
notify_task = call.loop.create_task(notify_on_timeout(call, url, time=30, repeat=True))
try:
data: dict = await download_task
finally:
notify_task.cancel()
for video in data["videos"]:
if call.interface_name == "discord":
# This is one of the unsafest things ever
embed = pickle.loads(eval(video["discord_embed_pickle"]))
await call.channel.send(content="✅ Aggiunto alla coda:", embed=embed)
else:
await call.reply(f"✅ [i]{video['title']}[/i] scaricato e aggiunto alla coda.")

View file

@ -1,20 +0,0 @@
import random
from ..utils import Command, Call
MAD = ["MADDEN MADDEN MADDEN MADDEN",
"EA bad, praise Geraldo!",
"Stai sfogando la tua ira sul bot!",
"Basta, io cambio gilda!",
"Fondiamo la RRYG!"]
class RageCommand(Command):
command_name = "rage"
command_description = "Arrabbiati con qualcosa, possibilmente una software house."
command_syntax = ""
@classmethod
async def common(cls, call: Call):
await call.reply(f"😠 {random.sample(MAD, 1)[0]}")

View file

@ -0,0 +1,30 @@
"""Commands that can be used in bots.
These probably won't suit your needs, as they are tailored for the bots of the Royal Games gaming community, but they
may be useful to develop new ones."""
from .ping import PingCommand
from .ciaoruozi import CiaoruoziCommand
from .color import ColorCommand
from .cv import CvCommand
from .diario import DiarioCommand
from .mp3 import Mp3Command
from .summon import SummonCommand
from .pause import PauseCommand
from .play import PlayCommand
from .playmode import PlaymodeCommand
from .queue import QueueCommand
from .reminder import ReminderCommand
__all__ = ["PingCommand",
"CiaoruoziCommand",
"ColorCommand",
"CvCommand",
"DiarioCommand",
"Mp3Command",
"SummonCommand",
"PauseCommand",
"PlayCommand",
"PlaymodeCommand",
"QueueCommand",
"ReminderCommand"]

View file

@ -0,0 +1,24 @@
import typing
import telegram
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
class CiaoruoziCommand(Command):
name: str = "ciaoruozi"
description: str = "Saluta Ruozi, un leggendario essere che una volta era in Royal Games."
syntax: str = ""
require_alchemy_tables: typing.Set = set()
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "telegram":
update: telegram.Update = data.update
user: telegram.User = update.effective_user
if user.id == 112437036:
await data.reply("👋 Ciao me!")
return
await data.reply("👋 Ciao Ruozi!")

View file

@ -0,0 +1,15 @@
import typing
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
class ColorCommand(Command):
name: str = "color"
description: str = "Invia un colore in chat...?"
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply("""
[i]I am sorry, unknown error occured during working with your request, Admin were notified[/i]
""")

View file

@ -1,11 +1,13 @@
import typing
import discord import discord
import asyncio import typing
from ..utils import Command, Call, NetworkHandler, andformat from ..command import Command
from ..network import Request, ResponseSuccess from ..commandinterface import CommandInterface
from ..error import NoneFoundError, TooManyFoundError from ..commandargs import CommandArgs
if typing.TYPE_CHECKING: from ..commanddata import CommandData
from ..bots import DiscordBot from ...network import Request, ResponseSuccess
from ...utils import NetworkHandler, andformat
from ...bots import DiscordBot
from ...error import *
class CvNH(NetworkHandler): class CvNH(NetworkHandler):
@ -41,14 +43,14 @@ class CvNH(NetworkHandler):
# Edit the message, sorted by channel # Edit the message, sorted by channel
for channel in sorted(channels, key=lambda c: -c): for channel in sorted(channels, key=lambda c: -c):
members_in_channels[channel].sort(key=lambda x: x.nick if x.nick is not None else x.name) members_in_channels[channel].sort(key=lambda x: x.nick if x.nick is not None else x.name)
if channel == 0: if channel == 0 and len(members_in_channels[0]) > 0:
message += "[b]Non in chat vocale:[/b]\n" message += "[b]Non in chat vocale:[/b]\n"
else: else:
message += f"[b]In #{channels[channel].name}:[/b]\n" message += f"[b]In #{channels[channel].name}:[/b]\n"
for member in members_in_channels[channel]: for member in members_in_channels[channel]:
member: typing.Union[discord.User, discord.Member] member: typing.Union[discord.User, discord.Member]
# Ignore not-connected non-notable members # Ignore not-connected non-notable members
if not data["full"] and channel == 0 and len(member.roles) < 2: if not data["everyone"] and channel == 0 and len(member.roles) < 2:
continue continue
# Ignore offline members # Ignore offline members
if member.status == discord.Status.offline and member.voice is None: if member.status == discord.Status.offline and member.voice is None:
@ -113,15 +115,20 @@ class CvNH(NetworkHandler):
class CvCommand(Command): class CvCommand(Command):
name: str = "cv"
command_name = "cv" description: str = "Elenca le persone attualmente connesse alla chat vocale."
command_description = "Elenca le persone attualmente connesse alla chat vocale."
command_syntax = "[guildname]"
network_handlers = [CvNH] syntax: str = "[guildname] "
@classmethod def __init__(self, interface: CommandInterface):
async def common(cls, call: Call): super().__init__(interface)
guild_name = call.args.optional(0) interface.register_net_handler("discord_cv", CvNH)
response = await call.net_request(Request("discord_cv", {"guild_name": guild_name, "full": False}), "discord")
await call.reply(response["response"]) async def run(self, args: CommandArgs, data: CommandData) -> None:
# noinspection PyTypeChecker
guild_name, everyone = args.match(r"(?:\[(.+)])?\s*(\S+)?\s*")
response = await self.interface.net_request(Request("discord_cv", {"guild_name": guild_name,
"everyone": bool(everyone)}),
destination="discord")
await data.reply(response["response"])

View file

@ -0,0 +1,212 @@
import typing
import re
import datetime
import telegram
import os
import aiohttp
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...database.tables import Royal, Diario, Alias
from ...utils import asyncify
from ...error import *
async def to_imgur(photosizes: typing.List[telegram.PhotoSize], caption="") -> str:
# Select the largest photo
largest_photo = sorted(photosizes, key=lambda p: p.width * p.height)[-1]
# Get the photo url
photo_file: telegram.File = await asyncify(largest_photo.get_file)
# Forward the url to imgur, as an upload
try:
imgur_api_key = os.environ["IMGUR_CLIENT_ID"]
except KeyError:
raise InvalidConfigError("Missing IMGUR_CLIENT_ID envvar, can't upload images to imgur.")
async with aiohttp.request("post", "https://api.imgur.com/3/upload", data={
"image": photo_file.file_path,
"type": "URL",
"title": "Diario image",
"description": caption
}, headers={
"Authorization": f"Client-ID {imgur_api_key}"
}) as request:
response = await request.json()
if not response["success"]:
raise ExternalError("imgur returned an error in the image upload.")
return response["data"]["link"]
class DiarioCommand(Command):
name: str = "diario"
description: str = "Aggiungi una citazione al Diario."
syntax = "[!] \"(testo)\" --[autore], [contesto]"
require_alchemy_tables = {Royal, Diario, Alias}
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "telegram":
update: telegram.Update = data.update
message: telegram.Message = update.message
reply: telegram.Message = message.reply_to_message
creator = await data.get_author()
# noinspection PyUnusedLocal
quoted: typing.Optional[str]
# noinspection PyUnusedLocal
text: typing.Optional[str]
# noinspection PyUnusedLocal
context: typing.Optional[str]
# noinspection PyUnusedLocal
timestamp: datetime.datetime
# noinspection PyUnusedLocal
media_url: typing.Optional[str]
# noinspection PyUnusedLocal
spoiler: bool
if creator is None:
await data.reply("⚠️ Devi essere registrato a Royalnet per usare questo comando!")
return
if reply is not None:
# Get the message text
text = reply.text
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = reply.photo
if photosizes:
# Text is a caption
text = reply.caption
media_url = await to_imgur(photosizes, text if text is not None else "")
else:
media_url = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Find the Royalnet account associated with the sender
quoted_tg = await asyncify(self.interface.session.query(self.interface.alchemy.Telegram)
.filter_by(tg_id=reply.from_user.id)
.one_or_none)
quoted_account = quoted_tg.royal if quoted_tg is not None else None
# Find the quoted name to assign
quoted_user: telegram.User = reply.from_user
quoted = quoted_user.full_name
# Get the timestamp
timestamp = reply.date
# Set the other properties
spoiler = False
context = None
else:
# Get the current timestamp
timestamp = datetime.datetime.now()
# Get the message text
raw_text = " ".join(args)
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = message.photo
if photosizes:
media_url = await to_imgur(photosizes, raw_text if raw_text is not None else "")
else:
media_url = None
# Parse the text, if it exists
if raw_text:
# Pass the sentence through the diario regex
match = re.match(
r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?',
raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
# Ensure there's a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(
self.interface.session.query(self.interface.alchemy.Alias)
.filter_by(alias=quoted.lower()).one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
else:
text = None
quoted = None
quoted_account = None
spoiler = False
context = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Create the diario quote
diario = self.interface.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=media_url,
spoiler=spoiler)
self.interface.session.add(diario)
await asyncify(self.interface.session.commit)
await data.reply(f"{str(diario)}")
else:
# Find the creator of the quotes
creator = await data.get_author(error_if_none=True)
# Recreate the full sentence
raw_text = " ".join(args)
# Pass the sentence through the diario regex
match = re.match(r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?',
raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
timestamp = datetime.datetime.now()
# Ensure there is some text
if not text:
raise InvalidInputError("Missing text.")
# Or a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(
self.interface.session.query(self.interface.alchemy.Alias)
.filter_by(alias=quoted.lower())
.one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
if quoted_alias is not None and quoted_account is None:
await data.reply("⚠️ Il nome dell'autore è ambiguo, quindi la riga non è stata aggiunta.\n"
"Per piacere, ripeti il comando con un nome più specifico!")
return
# Create the diario quote
diario = self.interface.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=None,
spoiler=spoiler)
self.interface.session.add(diario)
await asyncify(self.interface.session.commit)
await data.reply(f"{str(diario)}")

View file

@ -0,0 +1,40 @@
import typing
import urllib.parse
import asyncio
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import asyncify
from ...audio import YtdlMp3
class Mp3Command(Command):
name: str = "mp3"
description: str = "Scarica un video con youtube-dl e invialo in chat."
syntax = "(ytdlstring)"
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
seconds_before_deletion = 15 * 60
async def run(self, args: CommandArgs, data: CommandData) -> None:
url = args.joined()
if url.startswith("http://") or url.startswith("https://"):
vfiles: typing.List[YtdlMp3] = await asyncify(YtdlMp3.create_and_ready_from_url,
url,
**self.ytdl_args)
else:
vfiles = await asyncify(YtdlMp3.create_and_ready_from_url, f"ytsearch:{url}", **self.ytdl_args)
for vfile in vfiles:
await data.reply(f"⬇️ Il file richiesto può essere scaricato a:\n"
f"https://scaleway.steffo.eu/{urllib.parse.quote(vfile.mp3_filename.replace('./downloads/', './musicbot_cache/'))}\n"
f"Verrà eliminato tra {self.seconds_before_deletion} secondi.")
await asyncio.sleep(self.seconds_before_deletion)
for vfile in vfiles:
vfile.delete()
await data.reply(f"⏹ Il file {vfile.info.title} è scaduto ed è stato eliminato.")

View file

@ -1,10 +1,14 @@
import typing import typing
import discord import discord
from ..network import Request, ResponseSuccess from ..command import Command
from ..utils import Command, Call, NetworkHandler from ..commandinterface import CommandInterface
from ..error import TooManyFoundError, NoneFoundError from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler
from ...network import Request, ResponseSuccess
from ...error import NoneFoundError
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from ..bots import DiscordBot from ...bots import DiscordBot
class PauseNH(NetworkHandler): class PauseNH(NetworkHandler):
@ -32,22 +36,24 @@ class PauseNH(NetworkHandler):
voice_client._player.resume() voice_client._player.resume()
else: else:
voice_client._player.pause() voice_client._player.pause()
return ResponseSuccess({"resume": resume}) return ResponseSuccess({"resumed": resume})
class PauseCommand(Command): class PauseCommand(Command):
name: str = "pause"
command_name = "pause" description: str = "Mette in pausa o riprende la riproduzione della canzone attuale."
command_description = "Mette in pausa o riprende la riproduzione della canzone attuale."
command_syntax = "[ [guild] ]"
network_handlers = [PauseNH] syntax = "[ [guild] ]"
@classmethod def __init__(self, interface: CommandInterface):
async def common(cls, call: Call): super().__init__(interface)
guild, = call.args.match(r"(?:\[(.+)])?") interface.register_net_handler(PauseNH.message_type, PauseNH)
response = await call.net_request(Request("music_pause", {"guild_name": guild}), "discord")
if response["resume"]: async def run(self, args: CommandArgs, data: CommandData) -> None:
await call.reply(f"▶️ Riproduzione ripresa.") guild, = args.match(r"(?:\[(.+)])?")
response = await self.interface.net_request(Request("music_pause", {"guild_name": guild}), "discord")
if response["resumed"]:
await data.reply(f"▶️ Riproduzione ripresa.")
else: else:
await call.reply(f"⏸ Riproduzione messa in pausa.") await data.reply(f"⏸ Riproduzione messa in pausa.")

View file

@ -0,0 +1,13 @@
import typing
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
class PingCommand(Command):
name: str = "ping"
description: str = "Gioca a ping-pong con il bot."
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply("🏓 Pong!")

View file

@ -0,0 +1,75 @@
import typing
import pickle
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler, asyncify
from ...network import Request, ResponseSuccess
from ...error import *
from ...audio import YtdlDiscord
if typing.TYPE_CHECKING:
from ...bots import DiscordBot
class PlayNH(NetworkHandler):
message_type = "music_play"
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a play Royalnet request. That is, add audio to a PlayMode."""
# Find the matching guild
if data["guild_name"]:
guild = bot.client.find_guild(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
if len(bot.music_data) > 1:
raise TooManyFoundError("Multiple guilds found")
guild = list(bot.music_data)[0]
# Ensure the guild has a PlayMode before adding the file to it
if not bot.music_data.get(guild):
# TODO: change Exception
raise Exception("No music_data for this guild")
# Start downloading
if data["url"].startswith("http://") or data["url"].startswith("https://"):
dfiles: typing.List[YtdlDiscord] = await asyncify(YtdlDiscord.create_and_ready_from_url, data["url"], **cls.ytdl_args)
else:
dfiles = await asyncify(YtdlDiscord.create_and_ready_from_url, f"ytsearch:{data['url']}", **cls.ytdl_args)
await bot.add_to_music_data(dfiles, guild)
# Create response dictionary
response = {
"videos": [{
"title": dfile.info.title,
"discord_embed_pickle": str(pickle.dumps(dfile.info.to_discord_embed()))
} for dfile in dfiles]
}
return ResponseSuccess(response)
class PlayCommand(Command):
name: str = "play"
description: str = "Aggiunge una canzone alla coda della chat vocale."
syntax = "[ [guild] ] (url)"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(PlayNH.message_type, PlayNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild_name, url = args.match(r"(?:\[(.+)])?\s*<?(.+)>?")
await self.interface.net_request(Request("music_play", {"url": url, "guild_name": guild_name}), "discord")
for video in data["videos"]:
if self.interface.name == "discord":
# This is one of the unsafest things ever
embed = pickle.loads(eval(video["discord_embed_pickle"]))
await data.message.channel.send(content="▶️ Aggiunto alla coda:", embed=embed)
else:
await data.reply(f"▶️ Aggiunto alla coda: [i]{video['title']}[/i]")

View file

@ -1,10 +1,15 @@
import typing import typing
from ..utils import Command, Call, NetworkHandler import pickle
from ..network import Request, ResponseSuccess from ..command import Command
from ..error import NoneFoundError, TooManyFoundError from ..commandinterface import CommandInterface
from ..audio.playmodes import Playlist, Pool, Layers from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler
from ...network import Request, ResponseSuccess
from ...error import *
from ...audio.playmodes import Playlist, Pool, Layers
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from ..bots import DiscordBot from ...bots import DiscordBot
class PlaymodeNH(NetworkHandler): class PlaymodeNH(NetworkHandler):
@ -38,14 +43,19 @@ class PlaymodeNH(NetworkHandler):
class PlaymodeCommand(Command): class PlaymodeCommand(Command):
command_name = "playmode" name: str = "playmode"
command_description = "Cambia modalità di riproduzione per la chat vocale."
command_syntax = "[ [guild] ] (mode)"
network_handlers = [PlaymodeNH] description: str = "Cambia modalità di riproduzione per la chat vocale."
@classmethod syntax = "[ [guild] ] (mode)"
async def common(cls, call: Call):
guild_name, mode_name = call.args.match(r"(?:\[(.+)])?\s*(\S+)\s*") def __init__(self, interface: CommandInterface):
await call.net_request(Request("music_playmode", {"mode_name": mode_name, "guild_name": guild_name}), "discord") super().__init__(interface)
await call.reply(f"✅ Modalità di riproduzione [c]{mode_name}[/c].") interface.register_net_handler(PlaymodeNH.message_type, PlaymodeNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild_name, mode_name = args.match(r"(?:\[(.+)])?\s*(\S+)\s*")
await self.interface.net_request(Request(PlaymodeNH.message_type, {"mode_name": mode_name,
"guild_name": guild_name}),
"discord")
await data.reply(f"🔃 Impostata la modalità di riproduzione a: [c]{mode_name}[/c].")

View file

@ -1,10 +1,14 @@
import typing import typing
import pickle import pickle
from ..network import Request, ResponseSuccess from ..command import Command
from ..utils import Command, Call, NetworkHandler, numberemojiformat from ..commandinterface import CommandInterface
from ..error import TooManyFoundError, NoneFoundError from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler, numberemojiformat
from ...network import Request, ResponseSuccess
from ...error import *
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from ..bots import DiscordBot from ...bots import DiscordBot
class QueueNH(NetworkHandler): class QueueNH(NetworkHandler):
@ -44,22 +48,24 @@ class QueueNH(NetworkHandler):
class QueueCommand(Command): class QueueCommand(Command):
name: str = "queue"
command_name = "queue" description: str = "Visualizza la coda di riproduzione attuale."
command_description = "Visualizza un'anteprima della coda di riproduzione attuale."
command_syntax = "[ [guild] ]"
network_handlers = [QueueNH] syntax = "[ [guild] ]"
@classmethod def __init__(self, interface: CommandInterface):
async def common(cls, call: Call): super().__init__(interface)
guild, = call.args.match(r"(?:\[(.+)])?") interface.register_net_handler(QueueNH.message_type, QueueNH)
data = await call.net_request(Request("music_queue", {"guild_name": guild}), "discord")
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild, = args.match(r"(?:\[(.+)])?")
data = await self.interface.net_request(Request(QueueNH.message_type, {"guild_name": guild}), "discord")
if data["type"] is None: if data["type"] is None:
await call.reply(" Non c'è nessuna coda di riproduzione attiva al momento.") await data.reply(" Non c'è nessuna coda di riproduzione attiva al momento.")
return return
elif "queue" not in data: elif "queue" not in data:
await call.reply(f" La coda di riproduzione attuale ([c]{data['type']}[/c]) non permette l'anteprima.") await data.reply(f" La coda di riproduzione attuale ([c]{data['type']}[/c]) non permette l'anteprima.")
return return
if data["type"] == "Playlist": if data["type"] == "Playlist":
if len(data["queue"]["strings"]) == 0: if len(data["queue"]["strings"]) == 0:
@ -81,10 +87,10 @@ class QueueCommand(Command):
message = f" Il PlayMode attuale, [c]{data['type']}[/c], è vuoto.\n" message = f" Il PlayMode attuale, [c]{data['type']}[/c], è vuoto.\n"
else: else:
message = f" Il PlayMode attuale, [c]{data['type']}[/c], contiene {len(data['queue']['strings'])} elementi:\n" message = f" Il PlayMode attuale, [c]{data['type']}[/c], contiene {len(data['queue']['strings'])} elementi:\n"
if call.interface_name == "discord": if self.interface.name == "discord":
await call.reply(message) await data.reply(message)
for embed in pickle.loads(eval(data["queue"]["pickled_embeds"]))[:5]: for embed in pickle.loads(eval(data["queue"]["pickled_embeds"]))[:5]:
await call.channel.send(embed=embed) await data.message.channel.send(embed=embed)
else: else:
message += numberemojiformat(data["queue"]["strings"][:10]) message += numberemojiformat(data["queue"]["strings"][:10])
await call.reply(message) await data.reply(message)

View file

@ -0,0 +1,20 @@
import typing
import random
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
class RageCommand(Command):
name: str = "ship"
description: str = "Arrabbiati per qualcosa, come una software house californiana."
MAD = ["MADDEN MADDEN MADDEN MADDEN",
"EA bad, praise Geraldo!",
"Stai sfogando la tua ira sul bot!",
"Basta, io cambio gilda!",
"Fondiamo la RRYG!"]
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply(f"😠 {random.sample(self.MAD, 1)[0]}")

View file

@ -0,0 +1,79 @@
import typing
import dateparser
import datetime
import pickle
import telegram
import discord
from sqlalchemy import and_
from ..command import Command
from ..commandargs import CommandArgs
from ..commandinterface import CommandInterface
from ..commanddata import CommandData
from ...utils import sleep_until, asyncify, telegram_escape, discord_escape
from ...database.tables import Reminder
from ...error import *
class ReminderCommand(Command):
name: str = "reminder"
description: str = "Ti ricorda di fare qualcosa dopo un po' di tempo."
syntax: str = "[ (data) ] (messaggio)"
require_alchemy_tables = {Reminder}
def __init__(self, interface: CommandInterface):
super().__init__(interface)
reminders = (
interface.session
.query(interface.alchemy.Reminder)
.filter(and_(
interface.alchemy.Reminder.datetime >= datetime.datetime.now(),
interface.alchemy.Reminder.interface_name == interface.name))
.all()
)
for reminder in reminders:
interface.loop.create_task(self.remind(reminder))
async def remind(self, reminder):
await sleep_until(reminder.datetime)
if self.interface.name == "telegram":
chat_id: int = pickle.loads(reminder.interface_data)
bot: telegram.Bot = self.interface.bot.client
await asyncify(bot.send_message,
chat_id=chat_id,
text=telegram_escape(f"❗️ {reminder.message}"),
parse_mode="HTML",
disable_web_page_preview=True)
elif self.interface.name == "discord":
channel_id: int = pickle.loads(reminder.interface_data)
bot: discord.Client = self.interface.bot.client
channel = bot.get_channel(channel_id)
await channel.send(discord_escape(f"❗️ {reminder.message}"))
async def run(self, args: CommandArgs, data: CommandData) -> None:
date_str, reminder_text = args.match(r"\[ *(.+?) *] *(.+?) *$")
try:
date: typing.Optional[datetime.datetime] = dateparser.parse(date_str)
except OverflowError:
date = None
if date is None:
await data.reply("⚠️ La data che hai inserito non è valida.")
return
await data.reply(f"✅ Promemoria impostato per [b]{date.strftime('%Y-%m-%d %H:%M:%S')}[/b]")
if self.interface.name == "telegram":
interface_data = pickle.dumps(data.update.effective_chat.id)
elif self.interface.name == "discord":
interface_data = pickle.dumps(data.message.channel.id)
else:
raise UnsupportedError("Interface not supported")
creator = await data.get_author()
reminder = self.interface.alchemy.Reminder(creator=creator,
interface_name=self.interface.name,
interface_data=interface_data,
datetime=date,
message=reminder_text)
self.interface.loop.create_task(self.remind(reminder))
self.interface.session.add(reminder)
await asyncify(self.interface.session.commit)

View file

@ -1,22 +1,23 @@
import typing
import re import re
from ..utils import Command, Call, safeformat from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
SHIP_RESULT = "💕 {one} + {two} = [b]{result}[/b]" from ...utils import safeformat
class ShipCommand(Command): class ShipCommand(Command):
name: str = "ship"
command_name = "ship" description: str = "Crea una ship tra due nomi."
command_description = "Crea una ship tra due cose."
command_syntax = "(uno) (due)"
@classmethod syntax = "(nomeuno) (nomedue)"
async def common(cls, call: Call):
name_one = call.args[0] async def run(self, args: CommandArgs, data: CommandData) -> None:
name_two = call.args[1] name_one = args[0]
name_two = args[1]
if name_two == "+": if name_two == "+":
name_two = call.args[2] name_two = args[2]
name_one = name_one.lower() name_one = name_one.lower()
name_two = name_two.lower() name_two = name_two.lower()
# Get all letters until the first vowel, included # Get all letters until the first vowel, included
@ -33,7 +34,7 @@ class ShipCommand(Command):
part_two = match_two.group(0) part_two = match_two.group(0)
# Combine the two name parts # Combine the two name parts
mixed = part_one + part_two mixed = part_one + part_two
await call.reply(safeformat(SHIP_RESULT, await data.reply(safeformat("💕 {one} + {two} = [b]{result}[/b]",
one=name_one.capitalize(), one=name_one.capitalize(),
two=name_two.capitalize(), two=name_two.capitalize(),
result=mixed.capitalize())) result=mixed.capitalize()))

View file

@ -1,10 +1,16 @@
import typing import typing
import pickle
import discord import discord
from ..network import Request, ResponseSuccess from ..command import Command
from ..utils import Command, Call, NetworkHandler from ..commandinterface import CommandInterface
from ..error import TooManyFoundError, NoneFoundError from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler, asyncify
from ...network import Request, ResponseSuccess
from ...error import *
from ...audio import YtdlDiscord
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from ..bots import DiscordBot from ...bots import DiscordBot
class SkipNH(NetworkHandler): class SkipNH(NetworkHandler):
@ -31,15 +37,17 @@ class SkipNH(NetworkHandler):
class SkipCommand(Command): class SkipCommand(Command):
name: str = "skip"
command_name = "skip" description: str = "Salta la canzone attualmente in riproduzione in chat vocale."
command_description = "Salta la canzone attualmente in riproduzione in chat vocale."
command_syntax = "[ [guild] ]"
network_handlers = [SkipNH] syntax: str = "[ [guild] ]"
@classmethod def __init__(self, interface: CommandInterface):
async def common(cls, call: Call): super().__init__(interface)
guild, = call.args.match(r"(?:\[(.+)])?") interface.register_net_handler(SkipNH.message_type, SkipNH)
await call.net_request(Request("music_skip", {"guild_name": guild}), "discord")
await call.reply(f"✅ Richiesto lo skip della canzone attuale.") async def run(self, args: CommandArgs, data: CommandData) -> None:
guild, = args.match(r"(?:\[(.+)])?")
await self.interface.net_request(Request(SkipNH.message_type, {"guild_name": guild}), "discord")
await data.reply(f"⏩ Richiesto lo skip della canzone attuale.")

View file

@ -0,0 +1,79 @@
import typing
import random
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import safeformat
class SmecdsCommand(Command):
name: str = "smecds"
description: str = "Secondo me, è colpa dello stagista..."
syntax = ""
DS_LIST = ["della secca", "del seccatore", "del secchiello", "del secchio", "del secchione", "del secondino",
"del sedano", "del sedativo", "della sedia", "del sedicente", "del sedile", "della sega", "del segale",
"della segatura", "della seggiola", "del seggiolino", "della seggiovia", "della segheria",
"del seghetto",
"del segnalibro", "del segnaposto", "del segno", "del segretario", "della segreteria", "del seguace",
"del segugio", "della selce", "della sella", "della selz", "della selva", "della selvaggina",
"del semaforo",
"del seme", "del semifreddo", "del seminario", "della seminarista", "della semola", "del semolino",
"del semplicione", "della senape", "del senatore", "del seno", "del sensore", "della sentenza",
"della sentinella", "del sentore", "della seppia", "del sequestratore", "della serenata", "del sergente",
"del sermone", "della serpe", "del serpente", "della serpentina", "della serra", "del serraglio",
"del serramanico", "della serranda", "della serratura", "del servitore", "della servitù",
"del servizievole",
"del servo", "del set", "della seta", "della setola", "del sidecar", "del siderurgico", "del sidro",
"della siepe", "del sifone", "della sigaretta", "del sigaro", "del sigillo", "della signora",
"della signorina", "del silenziatore", "della silhouette", "del silicio", "del silicone", "del siluro",
"della sinagoga", "della sindacalista", "del sindacato", "del sindaco", "della sindrome",
"della sinfonia",
"del sipario", "del sire", "della sirena", "della siringa", "del sismografo", "del sobborgo",
"del sobillatore", "del sobrio", "del soccorritore", "del socio", "del sociologo", "della soda",
"del sofà",
"della soffitta", "del software", "dello sogghignare", "del soggiorno", "della sogliola",
"del sognatore",
"della soia", "del solaio", "del solco", "del soldato", "del soldo", "del sole", "della soletta",
"della solista", "del solitario", "del sollazzare", "del sollazzo", "del sollecito", "del solleone",
"del solletico", "del sollevare", "del sollievo", "del solstizio", "del solubile", "del solvente",
"della soluzione", "del somaro", "del sombrero", "del sommergibile", "del sommo", "della sommossa",
"del sommozzatore", "del sonar", "della sonda", "del sondaggio", "del sondare", "del sonnacchioso",
"del sonnambulo", "del sonnellino", "del sonnifero", "del sonno", "della sonnolenza", "del sontuoso",
"del soppalco", "del soprabito", "del sopracciglio", "del sopraffare", "del sopraffino",
"del sopraluogo",
"del sopramobile", "del soprannome", "del soprano", "del soprappensiero", "del soprassalto",
"del soprassedere", "del sopravvento", "del sopravvivere", "del soqquadro", "del sorbetto",
"del sordido",
"della sordina", "del sordo", "della sorella", "della sorgente", "del sornione", "del sorpasso",
"della sorpresa", "del sorreggere", "del sorridere", "della sorsata", "del sorteggio", "del sortilegio",
"del sorvegliante", "del sorvolare", "del sosia", "del sospettoso", "del sospirare", "della sosta",
"della sostanza", "del sostegno", "del sostenitore", "del sostituto", "del sottaceto", "della sottana",
"del sotterfugio", "del sotterraneo", "del sottile", "del sottilizzare", "del sottintendere",
"del sottobanco", "del sottobosco", "del sottomarino", "del sottopassaggio", "del sottoposto",
"del sottoscala", "della sottoscrizione", "del sottostare", "del sottosuolo", "del sottotetto",
"del sottotitolo", "del sottovalutare", "del sottovaso", "della sottoveste", "del sottovuoto",
"del sottufficiale", "della soubrette", "del souvenir", "del soverchiare", "del sovrano",
"del sovrapprezzo",
"della sovvenzione", "del sovversivo", "del sozzo", "dello suadente", "del sub", "del subalterno",
"del subbuglio", "del subdolo", "del sublime", "del suburbano", "del successore", "del succo",
"della succube", "del succulento", "della succursale", "del sudario", "della sudditanza", "del suddito",
"del sudicio", "del suffisso", "del suffragio", "del suffumigio", "del suggeritore", "del sughero",
"del sugo", "del suino", "della suite", "del sulfureo", "del sultano", "di Steffo", "di Spaggia",
"di Sabrina", "del sas", "del ses", "del sis", "del sos", "del sus", "della supremazia",
"del Santissimo",
"della scatola", "del supercalifragilistichespiralidoso", "del sale", "del salame", "di (Town of) Salem",
"di Stronghold", "di SOMA", "dei Saints", "di S.T.A.L.K.E.R.", "di Sanctum", "dei Sims", "di Sid",
"delle Skullgirls", "di Sonic", "di Spiral (Knights)", "di Spore", "di Starbound", "di SimCity",
"di Sensei",
"di Ssssssssssssss... Boom! E' esploso il dizionario", "della scala", "di Sakura", "di Suzie",
"di Shinji",
"del senpai", "del support", "di Superman", "di Sekiro", "dello Slime God", "del salassato",
"della salsa"]
SMECDS = "🤔 Secondo me, è colpa {ds}."
async def run(self, args: CommandArgs, data: CommandData) -> None:
ds = random.sample(self.DS_LIST, 1)[0]
await data.reply(safeformat(self.SMECDS, ds=ds))

View file

@ -0,0 +1,74 @@
import typing
import discord
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler
from ...network import Request, ResponseSuccess
from ...error import NoneFoundError
if typing.TYPE_CHECKING:
from ...bots import DiscordBot
class SummonNH(NetworkHandler):
message_type = "music_summon"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a summon Royalnet request.
That is, join a voice channel, or move to a different one if that is not possible."""
channel = bot.client.find_channel_by_name(data["channel_name"])
if not isinstance(channel, discord.VoiceChannel):
raise NoneFoundError("Channel is not a voice channel")
bot.loop.create_task(bot.client.vc_connect_or_move(channel))
return ResponseSuccess()
class SummonCommand(Command):
name: str = "summon"
description: str = "Evoca il bot in un canale vocale."
syntax: str = "[nomecanale]"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(SummonNH.message_type, SummonNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "discord":
bot = self.interface.bot.client
message: discord.Message = data.message
channel_name: str = args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await data.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await data.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await data.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
await bot.vc_connect_or_move(channel)
await data.reply(f"✅ Mi sono connesso in [c]#{channel.name}[/c].")
else:
channel_name: str = args[0].lstrip("#")
await self.interface.net_request(Request(SummonNH.message_type, {"channel_name": channel_name}), "discord")
await data.reply(f"✅ Mi sono connesso in [c]#{channel_name}[/c].")

View file

@ -0,0 +1,51 @@
import typing
import discord
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...error import *
class VideochannelCommand(Command):
name: str = "videochannel"
description: str = "Converti il canale vocale in un canale video."
syntax = "[channelname]"
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "discord":
bot: discord.Client = self.interface.bot
message: discord.Message = data.message
channel_name: str = args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await data.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await data.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await data.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
if author.is_on_mobile():
await data.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>\n[b]Attenzione: la modalità video non funziona su Discord per Android e iOS![/b]")
return
await data.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>")
else:
raise UnsupportedError(f"This command is not supported on {self.interface.name.capitalize()}.")

View file

@ -1,63 +0,0 @@
import random
from ..utils import Command, Call, safeformat
DS_LIST = ["della secca", "del seccatore", "del secchiello", "del secchio", "del secchione", "del secondino",
"del sedano", "del sedativo", "della sedia", "del sedicente", "del sedile", "della sega", "del segale",
"della segatura", "della seggiola", "del seggiolino", "della seggiovia", "della segheria", "del seghetto",
"del segnalibro", "del segnaposto", "del segno", "del segretario", "della segreteria", "del seguace",
"del segugio", "della selce", "della sella", "della selz", "della selva", "della selvaggina", "del semaforo",
"del seme", "del semifreddo", "del seminario", "della seminarista", "della semola", "del semolino",
"del semplicione", "della senape", "del senatore", "del seno", "del sensore", "della sentenza",
"della sentinella", "del sentore", "della seppia", "del sequestratore", "della serenata", "del sergente",
"del sermone", "della serpe", "del serpente", "della serpentina", "della serra", "del serraglio",
"del serramanico", "della serranda", "della serratura", "del servitore", "della servitù", "del servizievole",
"del servo", "del set", "della seta", "della setola", "del sidecar", "del siderurgico", "del sidro",
"della siepe", "del sifone", "della sigaretta", "del sigaro", "del sigillo", "della signora",
"della signorina", "del silenziatore", "della silhouette", "del silicio", "del silicone", "del siluro",
"della sinagoga", "della sindacalista", "del sindacato", "del sindaco", "della sindrome", "della sinfonia",
"del sipario", "del sire", "della sirena", "della siringa", "del sismografo", "del sobborgo",
"del sobillatore", "del sobrio", "del soccorritore", "del socio", "del sociologo", "della soda", "del sofà",
"della soffitta", "del software", "dello sogghignare", "del soggiorno", "della sogliola", "del sognatore",
"della soia", "del solaio", "del solco", "del soldato", "del soldo", "del sole", "della soletta",
"della solista", "del solitario", "del sollazzare", "del sollazzo", "del sollecito", "del solleone",
"del solletico", "del sollevare", "del sollievo", "del solstizio", "del solubile", "del solvente",
"della soluzione", "del somaro", "del sombrero", "del sommergibile", "del sommo", "della sommossa",
"del sommozzatore", "del sonar", "della sonda", "del sondaggio", "del sondare", "del sonnacchioso",
"del sonnambulo", "del sonnellino", "del sonnifero", "del sonno", "della sonnolenza", "del sontuoso",
"del soppalco", "del soprabito", "del sopracciglio", "del sopraffare", "del sopraffino", "del sopraluogo",
"del sopramobile", "del soprannome", "del soprano", "del soprappensiero", "del soprassalto",
"del soprassedere", "del sopravvento", "del sopravvivere", "del soqquadro", "del sorbetto", "del sordido",
"della sordina", "del sordo", "della sorella", "della sorgente", "del sornione", "del sorpasso",
"della sorpresa", "del sorreggere", "del sorridere", "della sorsata", "del sorteggio", "del sortilegio",
"del sorvegliante", "del sorvolare", "del sosia", "del sospettoso", "del sospirare", "della sosta",
"della sostanza", "del sostegno", "del sostenitore", "del sostituto", "del sottaceto", "della sottana",
"del sotterfugio", "del sotterraneo", "del sottile", "del sottilizzare", "del sottintendere",
"del sottobanco", "del sottobosco", "del sottomarino", "del sottopassaggio", "del sottoposto",
"del sottoscala", "della sottoscrizione", "del sottostare", "del sottosuolo", "del sottotetto",
"del sottotitolo", "del sottovalutare", "del sottovaso", "della sottoveste", "del sottovuoto",
"del sottufficiale", "della soubrette", "del souvenir", "del soverchiare", "del sovrano", "del sovrapprezzo",
"della sovvenzione", "del sovversivo", "del sozzo", "dello suadente", "del sub", "del subalterno",
"del subbuglio", "del subdolo", "del sublime", "del suburbano", "del successore", "del succo",
"della succube", "del succulento", "della succursale", "del sudario", "della sudditanza", "del suddito",
"del sudicio", "del suffisso", "del suffragio", "del suffumigio", "del suggeritore", "del sughero",
"del sugo", "del suino", "della suite", "del sulfureo", "del sultano", "di Steffo", "di Spaggia",
"di Sabrina", "del sas", "del ses", "del sis", "del sos", "del sus", "della supremazia", "del Santissimo",
"della scatola", "del supercalifragilistichespiralidoso", "del sale", "del salame", "di (Town of) Salem",
"di Stronghold", "di SOMA", "dei Saints", "di S.T.A.L.K.E.R.", "di Sanctum", "dei Sims", "di Sid",
"delle Skullgirls", "di Sonic", "di Spiral (Knights)", "di Spore", "di Starbound", "di SimCity", "di Sensei",
"di Ssssssssssssss... Boom! E' esploso il dizionario", "della scala", "di Sakura", "di Suzie", "di Shinji",
"del senpai", "del support", "di Superman", "di Sekiro", "dello Slime God", "del salassato", "della salsa"]
SMECDS = "🤔 Secondo me, è colpa {ds}."
class SmecdsCommand(Command):
command_name = "smecds"
command_description = "Secondo me, è colpa dello stagista..."
command_syntax = ""
@classmethod
async def common(cls, call: Call):
ds = random.sample(DS_LIST, 1)[0]
return await call.reply(safeformat(SMECDS, ds=ds))

View file

@ -1,68 +0,0 @@
import typing
import discord
from ..utils import Command, Call, NetworkHandler
from ..network import Request, ResponseSuccess
from ..error import NoneFoundError
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
class SummonNH(NetworkHandler):
message_type = "music_summon"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a summon Royalnet request. That is, join a voice channel, or move to a different one if that is not possible."""
channel = bot.client.find_channel_by_name(data["channel_name"])
if not isinstance(channel, discord.VoiceChannel):
raise NoneFoundError("Channel is not a voice channel")
bot.loop.create_task(bot.client.vc_connect_or_move(channel))
return ResponseSuccess()
class SummonCommand(Command):
command_name = "summon"
command_description = "Evoca il bot in un canale vocale."
command_syntax = "[channelname]"
network_handlers = [SummonNH]
@classmethod
async def common(cls, call: Call):
channel_name: str = call.args[0].lstrip("#")
await call.net_request(Request("music_summon", {"channel_name": channel_name}), "discord")
await call.reply(f"✅ Mi sono connesso in [c]#{channel_name}[/c].")
@classmethod
async def discord(cls, call: Call):
bot = call.interface_obj.client
message: discord.Message = call.kwargs["message"]
channel_name: str = call.args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await call.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await call.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await call.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
await bot.vc_connect_or_move(channel)
await call.reply(f"✅ Mi sono connesso in [c]#{channel.name}[/c].")

View file

@ -1,45 +0,0 @@
import discord
import typing
from ..utils import Command, Call
class VideochannelCommand(Command):
command_name = "videochannel"
command_description = "Converti il canale vocale in un canale video."
command_syntax = "[channelname]"
@classmethod
async def discord(cls, call: Call):
bot = call.interface_obj.client
message: discord.Message = call.kwargs["message"]
channel_name: str = call.args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await call.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await call.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await call.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
if author.is_on_mobile():
await call.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>\n[b]Attenzione: la modalità video non funziona su Discord per Android e iOS![/b]")
return
await call.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>")

View file

@ -11,6 +11,7 @@ from .wikirevisions import WikiRevision
from .medals import Medal from .medals import Medal
from .medalawards import MedalAward from .medalawards import MedalAward
from .bios import Bio from .bios import Bio
from .reminders import Reminder
__all__ = ["Royal", "Telegram", "Diario", "Alias", "ActiveKvGroup", "Keyvalue", "Keygroup", "Discord", "WikiPage", __all__ = ["Royal", "Telegram", "Diario", "Alias", "ActiveKvGroup", "Keyvalue", "Keygroup", "Discord", "WikiPage",
"WikiRevision", "Medal", "MedalAward", "Bio"] "WikiRevision", "Medal", "MedalAward", "Bio", "Reminder"]

View file

@ -0,0 +1,48 @@
from sqlalchemy import Column, \
Integer, \
String, \
LargeBinary, \
DateTime, \
ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declared_attr
# noinspection PyUnresolvedReferences
from .royals import Royal
class Reminder:
__tablename__ = "reminder"
@declared_attr
def reminder_id(self):
return Column(Integer, primary_key=True)
@declared_attr
def creator_id(self):
return Column(Integer, ForeignKey("royals.uid"))
@declared_attr
def creator(self):
return relationship("Royal", backref="reminders_created")
@declared_attr
def interface_name(self):
return Column(String)
@declared_attr
def interface_data(self):
return Column(LargeBinary)
@declared_attr
def datetime(self):
return Column(DateTime)
@declared_attr
def message(self):
return Column(String)
def __repr__(self):
return f"<Reminder for {self.datetime.isoformat()} about {self.message}>"
def __str__(self):
return self.message

View file

@ -4,10 +4,7 @@ import os
import asyncio import asyncio
import logging import logging
from royalnet.bots import DiscordBot, DiscordConfig, TelegramBot, TelegramConfig from royalnet.bots import DiscordBot, DiscordConfig, TelegramBot, TelegramConfig
from royalnet.commands import * from royalnet.commands.royalgames import *
# noinspection PyUnresolvedReferences
from royalnet.commands.debug_create import DebugCreateCommand
from royalnet.commands.error_handler import ErrorHandlerCommand
from royalnet.network import RoyalnetServer, RoyalnetConfig from royalnet.network import RoyalnetServer, RoyalnetConfig
from royalnet.database import DatabaseConfig from royalnet.database import DatabaseConfig
from royalnet.database.tables import Royal, Telegram, Discord from royalnet.database.tables import Royal, Telegram, Discord
@ -19,15 +16,21 @@ stream_handler = logging.StreamHandler()
stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{") stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
log.addHandler(stream_handler) log.addHandler(stream_handler)
commands = [PingCommand, ShipCommand, SmecdsCommand, ColorCommand, CiaoruoziCommand, SyncCommand, commands = [PingCommand,
DiarioCommand, RageCommand, ReminderCommand, KvactiveCommand, KvCommand, ColorCommand,
KvrollCommand, SummonCommand, PlayCommand, SkipCommand, PlaymodeCommand, CiaoruoziCommand,
VideochannelCommand, CvCommand, PauseCommand, QueueCommand, RoyalnetprofileCommand, VideoinfoCommand, CvCommand,
IdCommand, DlmusicCommand] DiarioCommand,
Mp3Command,
SummonCommand,
PauseCommand,
PlayCommand,
PlaymodeCommand,
QueueCommand,
ReminderCommand]
# noinspection PyUnreachableCode # noinspection PyUnreachableCode
if __debug__: if __debug__:
commands = [DebugCreateCommand, DateparserCommand, AuthorCommand, *commands]
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
else: else:
log.setLevel(logging.INFO) log.setLevel(logging.INFO)
@ -42,15 +45,11 @@ print("Starting bots...")
ds_bot = DiscordBot(discord_config=DiscordConfig(os.environ["DS_AK"]), ds_bot = DiscordBot(discord_config=DiscordConfig(os.environ["DS_AK"]),
royalnet_config=RoyalnetConfig(f"ws://{address}:{port}", os.environ["MASTER_KEY"]), royalnet_config=RoyalnetConfig(f"ws://{address}:{port}", os.environ["MASTER_KEY"]),
database_config=DatabaseConfig(os.environ["DB_PATH"], Royal, Discord, "discord_id"), database_config=DatabaseConfig(os.environ["DB_PATH"], Royal, Discord, "discord_id"),
commands=commands, commands=commands)
error_command=ErrorHandlerCommand,
missing_command=MissingCommand)
tg_bot = TelegramBot(telegram_config=TelegramConfig(os.environ["TG_AK"]), tg_bot = TelegramBot(telegram_config=TelegramConfig(os.environ["TG_AK"]),
royalnet_config=RoyalnetConfig(f"ws://{address}:{port}", os.environ["MASTER_KEY"]), royalnet_config=RoyalnetConfig(f"ws://{address}:{port}", os.environ["MASTER_KEY"]),
database_config=DatabaseConfig(os.environ["DB_PATH"], Royal, Telegram, "tg_id"), database_config=DatabaseConfig(os.environ["DB_PATH"], Royal, Telegram, "tg_id"),
commands=commands, commands=commands)
error_command=ErrorHandlerCommand,
missing_command=MissingCommand)
loop.create_task(tg_bot.run()) loop.create_task(tg_bot.run())
loop.create_task(ds_bot.run()) loop.create_task(ds_bot.run())

View file

@ -1,48 +0,0 @@
"""The production Royalnet, active at @royalgamesbot on Telegram and Royalbot on Discord."""
import os
import asyncio
import logging
from royalnet.bots import DiscordBot, DiscordConfig, TelegramBot, TelegramConfig
from royalnet.commands import *
# noinspection PyUnresolvedReferences
from royalnet.commands.debug_create import DebugCreateCommand
from royalnet.commands.error_handler import ErrorHandlerCommand
from royalnet.network import RoyalnetServer, RoyalnetConfig
from royalnet.database import DatabaseConfig
from royalnet.database.tables import Royal, Telegram, Discord
loop = asyncio.get_event_loop()
log = logging.root
stream_handler = logging.StreamHandler()
stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
log.addHandler(stream_handler)
log.setLevel(logging.INFO)
commands = [PingCommand, SummonCommand, PlayCommand, SkipCommand, PlaymodeCommand, PauseCommand, QueueCommand]
address, port = "127.0.0.1", 1234
print("Starting master...")
master = RoyalnetServer(address, port, os.environ["MASTER_KEY"])
loop.run_until_complete(master.start())
print("Starting bots...")
ds_bot = DiscordBot(discord_config=DiscordConfig(os.environ["DS_AK"]),
royalnet_config=RoyalnetConfig(f"ws://{address}:{port}", os.environ["MASTER_KEY"]),
database_config=None,
commands=commands,
error_command=ErrorHandlerCommand,
missing_command=MissingCommand)
tg_bot = TelegramBot(telegram_config=TelegramConfig(os.environ["TG_AK"]),
royalnet_config=RoyalnetConfig(f"ws://{address}:{port}", os.environ["MASTER_KEY"]),
database_config=None,
commands=commands,
error_command=ErrorHandlerCommand,
missing_command=MissingCommand)
loop.create_task(tg_bot.run())
loop.create_task(ds_bot.run())
print("Running loop...")
loop.run_forever()

View file

@ -2,15 +2,12 @@
from .asyncify import asyncify from .asyncify import asyncify
from .escaping import telegram_escape, discord_escape from .escaping import telegram_escape, discord_escape
from .call import Call
from .command import Command
from .commandargs import CommandArgs
from .safeformat import safeformat from .safeformat import safeformat
from .classdictjanitor import cdj from .classdictjanitor import cdj
from .sleepuntil import sleep_until from .sleepuntil import sleep_until
from .networkhandler import NetworkHandler from .networkhandler import NetworkHandler
from .formatters import andformat, plusformat, fileformat, ytdldateformat, numberemojiformat from .formatters import andformat, plusformat, fileformat, ytdldateformat, numberemojiformat
__all__ = ["asyncify", "Call", "Command", "safeformat", "cdj", "sleep_until", "plusformat", "CommandArgs", __all__ = ["asyncify", "safeformat", "cdj", "sleep_until", "plusformat",
"NetworkHandler", "andformat", "plusformat", "fileformat", "ytdldateformat", "numberemojiformat", "NetworkHandler", "andformat", "plusformat", "fileformat", "ytdldateformat", "numberemojiformat",
"telegram_escape", "discord_escape"] "telegram_escape", "discord_escape"]

View file

@ -1,100 +0,0 @@
import typing
import asyncio
from .command import Command
from .commandargs import CommandArgs
if typing.TYPE_CHECKING:
from ..database import Alchemy
class Call:
"""A command call. An abstract class, sub-bots should create a new call class from this.
Attributes:
interface_name: The name of the interface that is calling the command. For example, ``telegram``, or ``discord``.
interface_obj: The main object of the interface that is calling the command. For example, the :py:class:`royalnet.bots.TelegramBot` object.
interface_prefix: The command prefix used by the interface. For example, ``/``, or ``!``.
alchemy: The :py:class:`royalnet.database.Alchemy` object associated to this interface. May be None if the interface is not connected to any database."""
# These parameters / methods should be overridden
interface_name = NotImplemented
interface_obj = NotImplemented
interface_prefix = NotImplemented
alchemy: "Alchemy" = NotImplemented
async def reply(self, text: str) -> None:
"""Send a text message to the channel where the call was made.
Parameters:
text: The text to be sent, possibly formatted in the weird undescribed markup that I'm using."""
raise NotImplementedError()
async def net_request(self, message, destination: str) -> dict:
"""Send data through a :py:class:`royalnet.network.RoyalnetLink` and wait for a :py:class:`royalnet.network.Reply`.
Parameters:
message: The data to be sent. Must be :py:mod:`pickle`-able.
destination: The destination of the request, either in UUID format or node name."""
raise NotImplementedError()
async def get_author(self, error_if_none=False):
"""Try to find the universal identifier of the user that sent the message.
That probably means, the database row identifying the user.
Parameters:
error_if_none: Raise a :py:exc:`royalnet.error.UnregisteredError` if this is True and the call has no author.
Raises:
:py:exc:`royalnet.error.UnregisteredError` if ``error_if_none`` is set to True and no author is found."""
raise NotImplementedError()
# These parameters / methods should be left alone
def __init__(self,
channel,
command: typing.Type[Command],
command_args: typing.List[str] = None,
loop: asyncio.AbstractEventLoop = None,
**kwargs):
"""Create the call.
Parameters:
channel: The channel object this call was sent in.
command: The command to be called.
command_args: The arguments to be passed to the command
kwargs: Additional optional keyword arguments that may be passed to the command, possibly specific to the bot.
"""
if command_args is None:
command_args = []
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.channel = channel
self.command = command
self.args = CommandArgs(command_args)
self.kwargs = kwargs
self.session = None
async def _session_init(self):
"""If the command requires database access, create a :py:class:`royalnet.database.Alchemy` session for this call, otherwise, do nothing."""
if not self.command.require_alchemy_tables:
return
self.session = await self.loop.run_in_executor(None, self.alchemy.Session)
async def session_end(self):
"""Close the previously created :py:class:`royalnet.database.Alchemy` session for this call (if it was created)."""
if not self.session:
return
self.session.close()
async def run(self):
"""Execute the called command, and return the command result."""
await self._session_init()
try:
coroutine = getattr(self.command, self.interface_name)
except AttributeError:
coroutine = self.command.common
try:
result = await coroutine(self)
finally:
await self.session_end()
return result

View file

@ -1,35 +0,0 @@
import typing
from ..error import UnsupportedError
if typing.TYPE_CHECKING:
from .call import Call
from ..utils import NetworkHandler
class Command:
"""The base class from which all commands should inherit."""
command_name: typing.Optional[str] = NotImplemented
"""The name of the command. To have ``/example`` on Telegram, the name should be ``example``. If the name is None or empty, the command won't be registered."""
command_description: str = NotImplemented
"""A small description of the command, to be displayed when the command is being autocompleted."""
command_syntax: str = NotImplemented
"""The syntax of the command, to be displayed when a :py:exc:`royalnet.error.InvalidInputError` is raised, in the format ``(required_arg) [optional_arg]``."""
require_alchemy_tables: typing.Set = set()
"""A set of :py:class:`royalnet.database` tables, that must exist for this command to work."""
network_handlers: typing.List[typing.Type["NetworkHandler"]] = []
"""A set of :py:class:`royalnet.utils.NetworkHandler`s that must exist for this command to work."""
@classmethod
async def common(cls, call: "Call"):
raise UnsupportedError()
@classmethod
def network_handler_dict(cls):
d = {}
for network_handler in cls.network_handlers:
d[network_handler.message_type] = network_handler
return d