1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 13:34:28 +00:00

MUCH STUFF

VERY DOGE
This commit is contained in:
Steffo 2019-08-25 17:30:05 +02:00
parent 51e7ef80be
commit 444002a2c2
35 changed files with 829 additions and 781 deletions

View file

@ -5,6 +5,6 @@ from .ytdlinfo import YtdlInfo
from .ytdlfile import YtdlFile
from .fileaudiosource import FileAudioSource
from .ytdldiscord import YtdlDiscord
from .ytdlvorbis import YtdlVorbis
from .ytdlmp3 import YtdlMp3
__all__ = ["playmodes", "YtdlInfo", "YtdlFile", "FileAudioSource", "YtdlDiscord", "YtdlVorbis"]
__all__ = ["playmodes", "YtdlInfo", "YtdlFile", "FileAudioSource", "YtdlDiscord", "YtdlMp3"]

View file

@ -7,16 +7,16 @@ from .ytdlfile import YtdlFile
from .fileaudiosource import FileAudioSource
class YtdlVorbis:
class YtdlMp3:
def __init__(self, ytdl_file: YtdlFile):
self.ytdl_file: YtdlFile = ytdl_file
self.vorbis_filename: typing.Optional[str] = None
self.mp3_filename: typing.Optional[str] = None
self._fas_spawned: typing.List[FileAudioSource] = []
def pcm_available(self):
return self.vorbis_filename is not None and os.path.exists(self.vorbis_filename)
return self.mp3_filename is not None and os.path.exists(self.mp3_filename)
def convert_to_vorbis(self) -> None:
def convert_to_mp3(self) -> None:
if not self.ytdl_file.is_downloaded():
raise FileNotFoundError("File hasn't been downloaded yet")
destination_filename = re.sub(r"\.[^.]+$", ".mp3", self.ytdl_file.filename)
@ -26,7 +26,7 @@ class YtdlVorbis:
.overwrite_output()
.run()
)
self.vorbis_filename = destination_filename
self.mp3_filename = destination_filename
def ready_up(self):
if not self.ytdl_file.has_info():
@ -34,28 +34,28 @@ class YtdlVorbis:
if not self.ytdl_file.is_downloaded():
self.ytdl_file.download_file()
if not self.pcm_available():
self.convert_to_vorbis()
self.convert_to_mp3()
def delete(self) -> None:
if self.pcm_available():
for source in self._fas_spawned:
if not source.file.closed:
source.file.close()
os.remove(self.vorbis_filename)
self.vorbis_filename = None
os.remove(self.mp3_filename)
self.mp3_filename = None
self.ytdl_file.delete()
@classmethod
def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlVorbis"]:
def create_from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
files = YtdlFile.download_from_url(url, **ytdl_args)
dfiles = []
for file in files:
dfile = YtdlVorbis(file)
dfile = YtdlMp3(file)
dfiles.append(dfile)
return dfiles
@classmethod
def create_and_ready_from_url(cls, url, **ytdl_args) -> typing.List["YtdlVorbis"]:
def create_and_ready_from_url(cls, url, **ytdl_args) -> typing.List["YtdlMp3"]:
dfiles = cls.create_from_url(url, **ytdl_args)
for dfile in dfiles:
dfile.ready_up()

View file

@ -35,6 +35,7 @@ class GenericBot:
class GenericInterface(CommandInterface):
alchemy = self.alchemy
bot = self
loop = self.loop
def register_net_handler(ci, message_type: str, network_handler: typing.Callable):
self.network_handlers[message_type] = network_handler

View file

@ -38,7 +38,7 @@ class CommandArgs(list):
raise InvalidInputError("Not enough arguments")
return " ".join(self)
def match(self, pattern: typing.Pattern) -> typing.Sequence[typing.AnyStr]:
def match(self, pattern: typing.Union[str, typing.Pattern]) -> typing.Sequence[typing.AnyStr]:
"""Match the :py:func:`royalnet.utils.commandargs.joined` to a regex pattern.
Parameters:

View file

@ -1,4 +1,5 @@
import typing
import asyncio
if typing.TYPE_CHECKING:
from ..database import Alchemy
from ..bots import GenericBot
@ -9,6 +10,7 @@ class CommandInterface:
prefix: str = NotImplemented
alchemy: "Alchemy" = NotImplemented
bot: "GenericBot" = NotImplemented
loop: asyncio.AbstractEventLoop = NotImplemented
def __init__(self):
self.session = self.alchemy.Session()

View file

@ -7,8 +7,24 @@ from .ping import PingCommand
from .ciaoruozi import CiaoruoziCommand
from .color import ColorCommand
from .cv import CvCommand
from .diario import DiarioCommand
from .mp3 import Mp3Command
from .summon import SummonCommand
from .pause import PauseCommand
from .play import PlayCommand
from .playmode import PlaymodeCommand
from .queue import QueueCommand
from .reminder import ReminderCommand
__all__ = ["PingCommand",
"CiaoruoziCommand",
"ColorCommand",
"CvCommand"]
"CvCommand",
"DiarioCommand",
"Mp3Command",
"SummonCommand",
"PauseCommand",
"PlayCommand",
"PlaymodeCommand",
"QueueCommand",
"ReminderCommand"]

View file

@ -1,7 +1,6 @@
import typing
import telegram
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData

View file

@ -1,6 +1,5 @@
import typing
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData

View file

@ -0,0 +1,212 @@
import typing
import re
import datetime
import telegram
import os
import aiohttp
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...database.tables import Royal, Diario, Alias
from ...utils import asyncify
from ...error import *
async def to_imgur(photosizes: typing.List[telegram.PhotoSize], caption="") -> str:
# Select the largest photo
largest_photo = sorted(photosizes, key=lambda p: p.width * p.height)[-1]
# Get the photo url
photo_file: telegram.File = await asyncify(largest_photo.get_file)
# Forward the url to imgur, as an upload
try:
imgur_api_key = os.environ["IMGUR_CLIENT_ID"]
except KeyError:
raise InvalidConfigError("Missing IMGUR_CLIENT_ID envvar, can't upload images to imgur.")
async with aiohttp.request("post", "https://api.imgur.com/3/upload", data={
"image": photo_file.file_path,
"type": "URL",
"title": "Diario image",
"description": caption
}, headers={
"Authorization": f"Client-ID {imgur_api_key}"
}) as request:
response = await request.json()
if not response["success"]:
raise ExternalError("imgur returned an error in the image upload.")
return response["data"]["link"]
class DiarioCommand(Command):
name: str = "diario"
description: str = "Aggiungi una citazione al Diario."
syntax = "[!] \"(testo)\" --[autore], [contesto]"
require_alchemy_tables = {Royal, Diario, Alias}
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "telegram":
update: telegram.Update = data.update
message: telegram.Message = update.message
reply: telegram.Message = message.reply_to_message
creator = await data.get_author()
# noinspection PyUnusedLocal
quoted: typing.Optional[str]
# noinspection PyUnusedLocal
text: typing.Optional[str]
# noinspection PyUnusedLocal
context: typing.Optional[str]
# noinspection PyUnusedLocal
timestamp: datetime.datetime
# noinspection PyUnusedLocal
media_url: typing.Optional[str]
# noinspection PyUnusedLocal
spoiler: bool
if creator is None:
await data.reply("⚠️ Devi essere registrato a Royalnet per usare questo comando!")
return
if reply is not None:
# Get the message text
text = reply.text
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = reply.photo
if photosizes:
# Text is a caption
text = reply.caption
media_url = await to_imgur(photosizes, text if text is not None else "")
else:
media_url = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Find the Royalnet account associated with the sender
quoted_tg = await asyncify(self.interface.session.query(self.interface.alchemy.Telegram)
.filter_by(tg_id=reply.from_user.id)
.one_or_none)
quoted_account = quoted_tg.royal if quoted_tg is not None else None
# Find the quoted name to assign
quoted_user: telegram.User = reply.from_user
quoted = quoted_user.full_name
# Get the timestamp
timestamp = reply.date
# Set the other properties
spoiler = False
context = None
else:
# Get the current timestamp
timestamp = datetime.datetime.now()
# Get the message text
raw_text = " ".join(args)
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = message.photo
if photosizes:
media_url = await to_imgur(photosizes, raw_text if raw_text is not None else "")
else:
media_url = None
# Parse the text, if it exists
if raw_text:
# Pass the sentence through the diario regex
match = re.match(
r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?',
raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
# Ensure there's a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(
self.interface.session.query(self.interface.alchemy.Alias)
.filter_by(alias=quoted.lower()).one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
else:
text = None
quoted = None
quoted_account = None
spoiler = False
context = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Create the diario quote
diario = self.interface.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=media_url,
spoiler=spoiler)
self.interface.session.add(diario)
await asyncify(self.interface.session.commit)
await data.reply(f"{str(diario)}")
else:
# Find the creator of the quotes
creator = await data.get_author(error_if_none=True)
# Recreate the full sentence
raw_text = " ".join(args)
# Pass the sentence through the diario regex
match = re.match(r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?',
raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
timestamp = datetime.datetime.now()
# Ensure there is some text
if not text:
raise InvalidInputError("Missing text.")
# Or a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(
self.interface.session.query(self.interface.alchemy.Alias)
.filter_by(alias=quoted.lower())
.one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
if quoted_alias is not None and quoted_account is None:
await data.reply("⚠️ Il nome dell'autore è ambiguo, quindi la riga non è stata aggiunta.\n"
"Per piacere, ripeti il comando con un nome più specifico!")
return
# Create the diario quote
diario = self.interface.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=None,
spoiler=spoiler)
self.interface.session.add(diario)
await asyncify(self.interface.session.commit)
await data.reply(f"{str(diario)}")

View file

@ -0,0 +1,40 @@
import typing
import urllib.parse
import asyncio
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import asyncify
from ...audio import YtdlMp3
class Mp3Command(Command):
name: str = "mp3"
description: str = "Scarica un video con youtube-dl e invialo in chat."
syntax = "(ytdlstring)"
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
seconds_before_deletion = 15 * 60
async def run(self, args: CommandArgs, data: CommandData) -> None:
url = args.joined()
if url.startswith("http://") or url.startswith("https://"):
vfiles: typing.List[YtdlMp3] = await asyncify(YtdlMp3.create_and_ready_from_url,
url,
**self.ytdl_args)
else:
vfiles = await asyncify(YtdlMp3.create_and_ready_from_url, f"ytsearch:{url}", **self.ytdl_args)
for vfile in vfiles:
await data.reply(f"⬇️ Il file richiesto può essere scaricato a:\n"
f"https://scaleway.steffo.eu/{urllib.parse.quote(vfile.mp3_filename.replace('./downloads/', './musicbot_cache/'))}\n"
f"Verrà eliminato tra {self.seconds_before_deletion} secondi.")
await asyncio.sleep(self.seconds_before_deletion)
for vfile in vfiles:
vfile.delete()
await data.reply(f"⏹ Il file {vfile.info.title} è scaduto ed è stato eliminato.")

View file

@ -1,22 +0,0 @@
from ..utils import Command, Call
from telegram import Update, User
class CiaoruoziCommand(Command):
command_name = "ciaoruozi"
command_description = "Saluta Ruozi, anche se non è più in RYG."
command_syntax = ""
@classmethod
async def common(cls, call: "Call"):
await call.reply("👋 Ciao Ruozi!")
@classmethod
async def telegram(cls, call: Call):
update: Update = call.kwargs["update"]
user: User = update.effective_user
if user.id == 112437036:
await call.reply("👋 Ciao me!")
else:
await call.reply("👋 Ciao Ruozi!")

View file

@ -1,14 +0,0 @@
from ..utils import Command, Call
class ColorCommand(Command):
command_name = "color"
command_description = "Invia un colore in chat...?"
command_syntax = ""
@classmethod
async def common(cls, call: Call):
await call.reply("""
[i]I am sorry, unknown error occured during working with your request, Admin were notified[/i]
""")

View file

@ -1,127 +0,0 @@
import typing
import discord
import asyncio
from ..utils import Command, Call, NetworkHandler, andformat
from ..network import Request, ResponseSuccess
from ..error import NoneFoundError, TooManyFoundError
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
class CvNH(NetworkHandler):
message_type = "discord_cv"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
# Find the matching guild
if data["guild_name"]:
guild: discord.Guild = bot.client.find_guild_by_name(data["guild_name"])
else:
if len(bot.client.guilds) == 0:
raise NoneFoundError("No guilds found")
if len(bot.client.guilds) > 1:
raise TooManyFoundError("Multiple guilds found")
guild = list(bot.client.guilds)[0]
# Edit the message, sorted by channel
discord_members = list(guild.members)
channels = {0: None}
members_in_channels = {0: []}
message = ""
# Find all the channels
for member in discord_members:
if member.voice is not None:
channel = members_in_channels.get(member.voice.channel.id)
if channel is None:
members_in_channels[member.voice.channel.id] = list()
channel = members_in_channels[member.voice.channel.id]
channels[member.voice.channel.id] = member.voice.channel
channel.append(member)
else:
members_in_channels[0].append(member)
# Edit the message, sorted by channel
for channel in sorted(channels, key=lambda c: -c):
members_in_channels[channel].sort(key=lambda x: x.nick if x.nick is not None else x.name)
if channel == 0:
message += "[b]Non in chat vocale:[/b]\n"
else:
message += f"[b]In #{channels[channel].name}:[/b]\n"
for member in members_in_channels[channel]:
member: typing.Union[discord.User, discord.Member]
# Ignore not-connected non-notable members
if not data["full"] and channel == 0 and len(member.roles) < 2:
continue
# Ignore offline members
if member.status == discord.Status.offline and member.voice is None:
continue
# Online status emoji
if member.bot:
message += "🤖 "
elif member.status == discord.Status.online:
message += "🔵 "
elif member.status == discord.Status.idle:
message += "⚫️ "
elif member.status == discord.Status.dnd:
message += "🔴 "
elif member.status == discord.Status.offline:
message += "⚪️ "
# Voice
if channel != 0:
# Voice status
if member.voice.afk:
message += "💤 "
elif member.voice.self_deaf or member.voice.deaf:
message += "🔇 "
elif member.voice.self_mute or member.voice.mute:
message += "🔈 "
elif member.voice.self_video:
message += "📺 "
else:
message += "🔊 "
# Nickname
if member.nick is not None:
message += f"[i]{member.nick}[/i]"
else:
message += member.name
# Game or stream
if member.activity is not None:
if member.activity.type == discord.ActivityType.playing:
message += f" | 🎮 {member.activity.name}"
# Rich presence
try:
if member.activity.state is not None:
message += f" ({member.activity.state}" \
f" | {member.activity.details})"
except AttributeError:
pass
elif member.activity.type == discord.ActivityType.streaming:
message += f" | 📡 {member.activity.url}"
elif member.activity.type == discord.ActivityType.listening:
if isinstance(member.activity, discord.Spotify):
if member.activity.title == member.activity.album:
message += f" | 🎧 {member.activity.title} ({andformat(member.activity.artists, final=' e ')})"
else:
message += f" | 🎧 {member.activity.title} ({member.activity.album} | {andformat(member.activity.artists, final=' e ')})"
else:
message += f" | 🎧 {member.activity.name}"
elif member.activity.type == discord.ActivityType.watching:
message += f" | 📺 {member.activity.name}"
else:
message += f" | ❓ Unknown activity"
message += "\n"
message += "\n"
return ResponseSuccess({"response": message})
class CvCommand(Command):
command_name = "cv"
command_description = "Elenca le persone attualmente connesse alla chat vocale."
command_syntax = "[guildname]"
network_handlers = [CvNH]
@classmethod
async def common(cls, call: Call):
guild_name = call.args.optional(0)
response = await call.net_request(Request("discord_cv", {"guild_name": guild_name, "full": False}), "discord")
await call.reply(response["response"])

View file

@ -1,209 +0,0 @@
import re
import datetime
import telegram
import typing
import os
import aiohttp
from ..utils import Command, Call
from ..error import InvalidInputError, InvalidConfigError, ExternalError
from ..database.tables import Royal, Diario, Alias
from ..utils import asyncify
# NOTE: Requires imgur api key for image upload, get one at https://apidocs.imgur.com
class DiarioCommand(Command):
command_name = "diario"
command_description = "Aggiungi una citazione al Diario."
command_syntax = "[!] \"(testo)\" --[autore], [contesto]"
require_alchemy_tables = {Royal, Diario, Alias}
@classmethod
async def _telegram_to_imgur(cls, photosizes: typing.List[telegram.PhotoSize], caption="") -> str:
# Select the largest photo
largest_photo = sorted(photosizes, key=lambda p: p.width * p.height)[-1]
# Get the photo url
photo_file: telegram.File = await asyncify(largest_photo.get_file)
# Forward the url to imgur, as an upload
try:
imgur_api_key = os.environ["IMGUR_CLIENT_ID"]
except KeyError:
raise InvalidConfigError("Missing IMGUR_CLIENT_ID envvar, can't upload images to imgur.")
async with aiohttp.request("post", "https://api.imgur.com/3/upload", data={
"image": photo_file.file_path,
"type": "URL",
"title": "Diario image",
"description": caption
}, headers={
"Authorization": f"Client-ID {imgur_api_key}"
}) as request:
response = await request.json()
if not response["success"]:
raise ExternalError("imgur returned an error in the image upload.")
return response["data"]["link"]
@classmethod
async def common(cls, call: Call):
# Find the creator of the quotes
creator = await call.get_author(error_if_none=True)
# Recreate the full sentence
raw_text = " ".join(call.args)
# Pass the sentence through the diario regex
match = re.match(r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?', raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
timestamp = datetime.datetime.now()
# Ensure there is some text
if not text:
raise InvalidInputError("Missing text.")
# Or a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(call.session.query(call.alchemy.Alias).filter_by(alias=quoted.lower()).one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
if quoted_alias is not None and quoted_account is None:
await call.reply("⚠️ Il nome dell'autore è ambiguo, quindi la riga non è stata aggiunta.\n"
"Per piacere, ripeti il comando con un nome più specifico!")
return
# Create the diario quote
diario = call.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=None,
spoiler=spoiler)
call.session.add(diario)
await asyncify(call.session.commit)
await call.reply(f"{str(diario)}")
@classmethod
async def telegram(cls, call: Call):
update: telegram.Update = call.kwargs["update"]
message: telegram.Message = update.message
reply: telegram.Message = message.reply_to_message
creator = await call.get_author()
# noinspection PyUnusedLocal
quoted_account: typing.Optional[call.alchemy.Telegram]
# noinspection PyUnusedLocal
quoted: typing.Optional[str]
# noinspection PyUnusedLocal
text: typing.Optional[str]
# noinspection PyUnusedLocal
context: typing.Optional[str]
# noinspection PyUnusedLocal
timestamp: datetime.datetime
# noinspection PyUnusedLocal
media_url: typing.Optional[str]
# noinspection PyUnusedLocal
spoiler: bool
if creator is None:
await call.reply("⚠️ Devi essere registrato a Royalnet per usare questo comando!")
return
if reply is not None:
# Get the message text
text = reply.text
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = reply.photo
if photosizes:
# Text is a caption
text = reply.caption
media_url = await cls._telegram_to_imgur(photosizes, text if text is not None else "")
else:
media_url = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Find the Royalnet account associated with the sender
quoted_tg = await asyncify(call.session.query(call.alchemy.Telegram)
.filter_by(tg_id=reply.from_user.id)
.one_or_none)
quoted_account = quoted_tg.royal if quoted_tg is not None else None
# Find the quoted name to assign
quoted_user: telegram.User = reply.from_user
quoted = quoted_user.full_name
# Get the timestamp
timestamp = reply.date
# Set the other properties
spoiler = False
context = None
else:
# Get the current timestamp
timestamp = datetime.datetime.now()
# Get the message text
raw_text = " ".join(call.args)
# Check if there's an image associated with the reply
photosizes: typing.Optional[typing.List[telegram.PhotoSize]] = message.photo
if photosizes:
media_url = await cls._telegram_to_imgur(photosizes, raw_text if raw_text is not None else "")
else:
media_url = None
# Parse the text, if it exists
if raw_text:
# Pass the sentence through the diario regex
match = re.match(r'(!)? *["«‘“‛‟❛❝〝"`]([^"]+)["»’”❜❞〞"`] *(?:(?:-{1,2}|—) *([\w ]+))?(?:, *([^ ].*))?',
raw_text)
# Find the corresponding matches
if match is not None:
spoiler = bool(match.group(1))
text = match.group(2)
quoted = match.group(3)
context = match.group(4)
# Otherwise, consider everything part of the text
else:
spoiler = False
text = raw_text
quoted = None
context = None
# Ensure there's a quoted
if not quoted:
quoted = None
if not context:
context = None
# Find if there's a Royalnet account associated with the quoted name
if quoted is not None:
quoted_alias = await asyncify(
call.session.query(call.alchemy.Alias)
.filter_by(alias=quoted.lower()).one_or_none)
else:
quoted_alias = None
quoted_account = quoted_alias.royal if quoted_alias is not None else None
else:
text = None
quoted = None
quoted_account = None
spoiler = False
context = None
# Ensure there is a text or an image
if not (text or media_url):
raise InvalidInputError("Missing text.")
# Create the diario quote
diario = call.alchemy.Diario(creator=creator,
quoted_account=quoted_account,
quoted=quoted,
text=text,
context=context,
timestamp=timestamp,
media_url=media_url,
spoiler=spoiler)
call.session.add(diario)
await asyncify(call.session.commit)
await call.reply(f"{str(diario)}")

View file

@ -1,34 +0,0 @@
import asyncio
import typing
import urllib.parse
from ..utils import Command, Call, asyncify
from ..audio import YtdlVorbis
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
seconds_before_deletion = 15*60
class DlmusicCommand(Command):
command_name = "dlmusic"
command_description = "Scarica un video."
command_syntax = "(url)"
@classmethod
async def common(cls, call: Call):
url = call.args.joined()
if url.startswith("http://") or url.startswith("https://"):
vfiles: typing.List[YtdlVorbis] = await asyncify(YtdlVorbis.create_and_ready_from_url, url, **ytdl_args)
else:
vfiles = await asyncify(YtdlVorbis.create_and_ready_from_url, f"ytsearch:{url}", **ytdl_args)
for vfile in vfiles:
await call.reply(f"⬇️ https://scaleway.steffo.eu/{urllib.parse.quote(vfile.vorbis_filename.replace('./downloads/', './musicbot_cache/'))}")
await asyncio.sleep(seconds_before_deletion)
for vfile in vfiles:
vfile.delete()

View file

@ -1,84 +0,0 @@
import typing
import asyncio
import pickle
from ..utils import Command, Call, NetworkHandler, asyncify
from ..network import Request, ResponseSuccess
from ..error import TooManyFoundError, NoneFoundError
from ..audio import YtdlDiscord
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
class PlayNH(NetworkHandler):
message_type = "music_play"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a play Royalnet request. That is, add audio to a PlayMode."""
# Find the matching guild
if data["guild_name"]:
guild = bot.client.find_guild(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
if len(bot.music_data) > 1:
raise TooManyFoundError("Multiple guilds found")
guild = list(bot.music_data)[0]
# Ensure the guild has a PlayMode before adding the file to it
if not bot.music_data.get(guild):
# TODO: change Exception
raise Exception("No music_data for this guild")
# Start downloading
if data["url"].startswith("http://") or data["url"].startswith("https://"):
dfiles: typing.List[YtdlDiscord] = await asyncify(YtdlDiscord.create_and_ready_from_url, data["url"], **ytdl_args)
else:
dfiles = await asyncify(YtdlDiscord.create_and_ready_from_url, f"ytsearch:{data['url']}", **ytdl_args)
await bot.add_to_music_data(dfiles, guild)
# Create response dictionary
response = {
"videos": [{
"title": dfile.info.title,
"discord_embed_pickle": str(pickle.dumps(dfile.info.to_discord_embed()))
} for dfile in dfiles]
}
return ResponseSuccess(response)
async def notify_on_timeout(call: Call, url: str, time: float, repeat: bool = False):
"""Send a message after a while to let the user know that the bot is still downloading the files and hasn't crashed."""
while True:
await asyncio.sleep(time)
await call.reply(f" Il download di [c]{url}[/c] sta richiedendo più tempo del solito, ma è ancora in corso!")
if not repeat:
break
class PlayCommand(Command):
command_name = "play"
command_description = "Riproduce una canzone in chat vocale."
command_syntax = "[ [guild] ] (url)"
network_handlers = [PlayNH]
@classmethod
async def common(cls, call: Call):
guild_name, url = call.args.match(r"(?:\[(.+)])?\s*<?(.+)>?")
download_task = call.loop.create_task(call.net_request(Request("music_play", {"url": url, "guild_name": guild_name}), "discord"))
notify_task = call.loop.create_task(notify_on_timeout(call, url, time=30, repeat=True))
try:
data: dict = await download_task
finally:
notify_task.cancel()
for video in data["videos"]:
if call.interface_name == "discord":
# This is one of the unsafest things ever
embed = pickle.loads(eval(video["discord_embed_pickle"]))
await call.channel.send(content="✅ Aggiunto alla coda:", embed=embed)
else:
await call.reply(f"✅ [i]{video['title']}[/i] scaricato e aggiunto alla coda.")

View file

@ -1,20 +0,0 @@
import random
from ..utils import Command, Call
MAD = ["MADDEN MADDEN MADDEN MADDEN",
"EA bad, praise Geraldo!",
"Stai sfogando la tua ira sul bot!",
"Basta, io cambio gilda!",
"Fondiamo la RRYG!"]
class RageCommand(Command):
command_name = "rage"
command_description = "Arrabbiati con qualcosa, possibilmente una software house."
command_syntax = ""
@classmethod
async def common(cls, call: Call):
await call.reply(f"😠 {random.sample(MAD, 1)[0]}")

View file

@ -1,63 +0,0 @@
import random
from ..utils import Command, Call, safeformat
DS_LIST = ["della secca", "del seccatore", "del secchiello", "del secchio", "del secchione", "del secondino",
"del sedano", "del sedativo", "della sedia", "del sedicente", "del sedile", "della sega", "del segale",
"della segatura", "della seggiola", "del seggiolino", "della seggiovia", "della segheria", "del seghetto",
"del segnalibro", "del segnaposto", "del segno", "del segretario", "della segreteria", "del seguace",
"del segugio", "della selce", "della sella", "della selz", "della selva", "della selvaggina", "del semaforo",
"del seme", "del semifreddo", "del seminario", "della seminarista", "della semola", "del semolino",
"del semplicione", "della senape", "del senatore", "del seno", "del sensore", "della sentenza",
"della sentinella", "del sentore", "della seppia", "del sequestratore", "della serenata", "del sergente",
"del sermone", "della serpe", "del serpente", "della serpentina", "della serra", "del serraglio",
"del serramanico", "della serranda", "della serratura", "del servitore", "della servitù", "del servizievole",
"del servo", "del set", "della seta", "della setola", "del sidecar", "del siderurgico", "del sidro",
"della siepe", "del sifone", "della sigaretta", "del sigaro", "del sigillo", "della signora",
"della signorina", "del silenziatore", "della silhouette", "del silicio", "del silicone", "del siluro",
"della sinagoga", "della sindacalista", "del sindacato", "del sindaco", "della sindrome", "della sinfonia",
"del sipario", "del sire", "della sirena", "della siringa", "del sismografo", "del sobborgo",
"del sobillatore", "del sobrio", "del soccorritore", "del socio", "del sociologo", "della soda", "del sofà",
"della soffitta", "del software", "dello sogghignare", "del soggiorno", "della sogliola", "del sognatore",
"della soia", "del solaio", "del solco", "del soldato", "del soldo", "del sole", "della soletta",
"della solista", "del solitario", "del sollazzare", "del sollazzo", "del sollecito", "del solleone",
"del solletico", "del sollevare", "del sollievo", "del solstizio", "del solubile", "del solvente",
"della soluzione", "del somaro", "del sombrero", "del sommergibile", "del sommo", "della sommossa",
"del sommozzatore", "del sonar", "della sonda", "del sondaggio", "del sondare", "del sonnacchioso",
"del sonnambulo", "del sonnellino", "del sonnifero", "del sonno", "della sonnolenza", "del sontuoso",
"del soppalco", "del soprabito", "del sopracciglio", "del sopraffare", "del sopraffino", "del sopraluogo",
"del sopramobile", "del soprannome", "del soprano", "del soprappensiero", "del soprassalto",
"del soprassedere", "del sopravvento", "del sopravvivere", "del soqquadro", "del sorbetto", "del sordido",
"della sordina", "del sordo", "della sorella", "della sorgente", "del sornione", "del sorpasso",
"della sorpresa", "del sorreggere", "del sorridere", "della sorsata", "del sorteggio", "del sortilegio",
"del sorvegliante", "del sorvolare", "del sosia", "del sospettoso", "del sospirare", "della sosta",
"della sostanza", "del sostegno", "del sostenitore", "del sostituto", "del sottaceto", "della sottana",
"del sotterfugio", "del sotterraneo", "del sottile", "del sottilizzare", "del sottintendere",
"del sottobanco", "del sottobosco", "del sottomarino", "del sottopassaggio", "del sottoposto",
"del sottoscala", "della sottoscrizione", "del sottostare", "del sottosuolo", "del sottotetto",
"del sottotitolo", "del sottovalutare", "del sottovaso", "della sottoveste", "del sottovuoto",
"del sottufficiale", "della soubrette", "del souvenir", "del soverchiare", "del sovrano", "del sovrapprezzo",
"della sovvenzione", "del sovversivo", "del sozzo", "dello suadente", "del sub", "del subalterno",
"del subbuglio", "del subdolo", "del sublime", "del suburbano", "del successore", "del succo",
"della succube", "del succulento", "della succursale", "del sudario", "della sudditanza", "del suddito",
"del sudicio", "del suffisso", "del suffragio", "del suffumigio", "del suggeritore", "del sughero",
"del sugo", "del suino", "della suite", "del sulfureo", "del sultano", "di Steffo", "di Spaggia",
"di Sabrina", "del sas", "del ses", "del sis", "del sos", "del sus", "della supremazia", "del Santissimo",
"della scatola", "del supercalifragilistichespiralidoso", "del sale", "del salame", "di (Town of) Salem",
"di Stronghold", "di SOMA", "dei Saints", "di S.T.A.L.K.E.R.", "di Sanctum", "dei Sims", "di Sid",
"delle Skullgirls", "di Sonic", "di Spiral (Knights)", "di Spore", "di Starbound", "di SimCity", "di Sensei",
"di Ssssssssssssss... Boom! E' esploso il dizionario", "della scala", "di Sakura", "di Suzie", "di Shinji",
"del senpai", "del support", "di Superman", "di Sekiro", "dello Slime God", "del salassato", "della salsa"]
SMECDS = "🤔 Secondo me, è colpa {ds}."
class SmecdsCommand(Command):
command_name = "smecds"
command_description = "Secondo me, è colpa dello stagista..."
command_syntax = ""
@classmethod
async def common(cls, call: Call):
ds = random.sample(DS_LIST, 1)[0]
return await call.reply(safeformat(SMECDS, ds=ds))

View file

@ -1,68 +0,0 @@
import typing
import discord
from ..utils import Command, Call, NetworkHandler
from ..network import Request, ResponseSuccess
from ..error import NoneFoundError
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
class SummonNH(NetworkHandler):
message_type = "music_summon"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a summon Royalnet request. That is, join a voice channel, or move to a different one if that is not possible."""
channel = bot.client.find_channel_by_name(data["channel_name"])
if not isinstance(channel, discord.VoiceChannel):
raise NoneFoundError("Channel is not a voice channel")
bot.loop.create_task(bot.client.vc_connect_or_move(channel))
return ResponseSuccess()
class SummonCommand(Command):
command_name = "summon"
command_description = "Evoca il bot in un canale vocale."
command_syntax = "[channelname]"
network_handlers = [SummonNH]
@classmethod
async def common(cls, call: Call):
channel_name: str = call.args[0].lstrip("#")
await call.net_request(Request("music_summon", {"channel_name": channel_name}), "discord")
await call.reply(f"✅ Mi sono connesso in [c]#{channel_name}[/c].")
@classmethod
async def discord(cls, call: Call):
bot = call.interface_obj.client
message: discord.Message = call.kwargs["message"]
channel_name: str = call.args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await call.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await call.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await call.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
await bot.vc_connect_or_move(channel)
await call.reply(f"✅ Mi sono connesso in [c]#{channel.name}[/c].")

View file

@ -1,45 +0,0 @@
import discord
import typing
from ..utils import Command, Call
class VideochannelCommand(Command):
command_name = "videochannel"
command_description = "Converti il canale vocale in un canale video."
command_syntax = "[channelname]"
@classmethod
async def discord(cls, call: Call):
bot = call.interface_obj.client
message: discord.Message = call.kwargs["message"]
channel_name: str = call.args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await call.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await call.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await call.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
if author.is_on_mobile():
await call.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>\n[b]Attenzione: la modalità video non funziona su Discord per Android e iOS![/b]")
return
await call.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>")

View file

@ -1,10 +1,14 @@
import typing
import discord
from ..network import Request, ResponseSuccess
from ..utils import Command, Call, NetworkHandler
from ..error import TooManyFoundError, NoneFoundError
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler
from ...network import Request, ResponseSuccess
from ...error import NoneFoundError
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
from ...bots import DiscordBot
class PauseNH(NetworkHandler):
@ -32,22 +36,24 @@ class PauseNH(NetworkHandler):
voice_client._player.resume()
else:
voice_client._player.pause()
return ResponseSuccess({"resume": resume})
return ResponseSuccess({"resumed": resume})
class PauseCommand(Command):
name: str = "pause"
command_name = "pause"
command_description = "Mette in pausa o riprende la riproduzione della canzone attuale."
command_syntax = "[ [guild] ]"
description: str = "Mette in pausa o riprende la riproduzione della canzone attuale."
network_handlers = [PauseNH]
syntax = "[ [guild] ]"
@classmethod
async def common(cls, call: Call):
guild, = call.args.match(r"(?:\[(.+)])?")
response = await call.net_request(Request("music_pause", {"guild_name": guild}), "discord")
if response["resume"]:
await call.reply(f"▶️ Riproduzione ripresa.")
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(PauseNH.message_type, PauseNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild, = args.match(r"(?:\[(.+)])?")
response = await self.interface.net_request(Request("music_pause", {"guild_name": guild}), "discord")
if response["resumed"]:
await data.reply(f"▶️ Riproduzione ripresa.")
else:
await call.reply(f"⏸ Riproduzione messa in pausa.")
await data.reply(f"⏸ Riproduzione messa in pausa.")

View file

@ -1,6 +1,5 @@
import typing
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
@ -8,7 +7,7 @@ from ..commanddata import CommandData
class PingCommand(Command):
name: str = "ping"
description: str = "Replies with a Pong!"
description: str = "Gioca a ping-pong con il bot."
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply("🏓 Pong!")

View file

@ -0,0 +1,75 @@
import typing
import pickle
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler, asyncify
from ...network import Request, ResponseSuccess
from ...error import *
from ...audio import YtdlDiscord
if typing.TYPE_CHECKING:
from ...bots import DiscordBot
class PlayNH(NetworkHandler):
message_type = "music_play"
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a play Royalnet request. That is, add audio to a PlayMode."""
# Find the matching guild
if data["guild_name"]:
guild = bot.client.find_guild(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
if len(bot.music_data) > 1:
raise TooManyFoundError("Multiple guilds found")
guild = list(bot.music_data)[0]
# Ensure the guild has a PlayMode before adding the file to it
if not bot.music_data.get(guild):
# TODO: change Exception
raise Exception("No music_data for this guild")
# Start downloading
if data["url"].startswith("http://") or data["url"].startswith("https://"):
dfiles: typing.List[YtdlDiscord] = await asyncify(YtdlDiscord.create_and_ready_from_url, data["url"], **cls.ytdl_args)
else:
dfiles = await asyncify(YtdlDiscord.create_and_ready_from_url, f"ytsearch:{data['url']}", **cls.ytdl_args)
await bot.add_to_music_data(dfiles, guild)
# Create response dictionary
response = {
"videos": [{
"title": dfile.info.title,
"discord_embed_pickle": str(pickle.dumps(dfile.info.to_discord_embed()))
} for dfile in dfiles]
}
return ResponseSuccess(response)
class PlayCommand(Command):
name: str = "play"
description: str = "Aggiunge una canzone alla coda della chat vocale."
syntax = "[ [guild] ] (url)"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(PlayNH.message_type, PlayNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild_name, url = args.match(r"(?:\[(.+)])?\s*<?(.+)>?")
await self.interface.net_request(Request("music_play", {"url": url, "guild_name": guild_name}), "discord")
for video in data["videos"]:
if self.interface.name == "discord":
# This is one of the unsafest things ever
embed = pickle.loads(eval(video["discord_embed_pickle"]))
await data.message.channel.send(content="▶️ Aggiunto alla coda:", embed=embed)
else:
await data.reply(f"▶️ Aggiunto alla coda: [i]{video['title']}[/i]")

View file

@ -1,10 +1,15 @@
import typing
from ..utils import Command, Call, NetworkHandler
from ..network import Request, ResponseSuccess
from ..error import NoneFoundError, TooManyFoundError
from ..audio.playmodes import Playlist, Pool, Layers
import pickle
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler
from ...network import Request, ResponseSuccess
from ...error import *
from ...audio.playmodes import Playlist, Pool, Layers
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
from ...bots import DiscordBot
class PlaymodeNH(NetworkHandler):
@ -38,14 +43,19 @@ class PlaymodeNH(NetworkHandler):
class PlaymodeCommand(Command):
command_name = "playmode"
command_description = "Cambia modalità di riproduzione per la chat vocale."
command_syntax = "[ [guild] ] (mode)"
name: str = "playmode"
network_handlers = [PlaymodeNH]
description: str = "Cambia modalità di riproduzione per la chat vocale."
@classmethod
async def common(cls, call: Call):
guild_name, mode_name = call.args.match(r"(?:\[(.+)])?\s*(\S+)\s*")
await call.net_request(Request("music_playmode", {"mode_name": mode_name, "guild_name": guild_name}), "discord")
await call.reply(f"✅ Modalità di riproduzione [c]{mode_name}[/c].")
syntax = "[ [guild] ] (mode)"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(PlaymodeNH.message_type, PlaymodeNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild_name, mode_name = args.match(r"(?:\[(.+)])?\s*(\S+)\s*")
await self.interface.net_request(Request(PlaymodeNH.message_type, {"mode_name": mode_name,
"guild_name": guild_name}),
"discord")
await data.reply(f"🔃 Impostata la modalità di riproduzione a: [c]{mode_name}[/c].")

View file

@ -1,10 +1,14 @@
import typing
import pickle
from ..network import Request, ResponseSuccess
from ..utils import Command, Call, NetworkHandler, numberemojiformat
from ..error import TooManyFoundError, NoneFoundError
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler, numberemojiformat
from ...network import Request, ResponseSuccess
from ...error import *
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
from ...bots import DiscordBot
class QueueNH(NetworkHandler):
@ -44,22 +48,24 @@ class QueueNH(NetworkHandler):
class QueueCommand(Command):
name: str = "queue"
command_name = "queue"
command_description = "Visualizza un'anteprima della coda di riproduzione attuale."
command_syntax = "[ [guild] ]"
description: str = "Visualizza la coda di riproduzione attuale."
network_handlers = [QueueNH]
syntax = "[ [guild] ]"
@classmethod
async def common(cls, call: Call):
guild, = call.args.match(r"(?:\[(.+)])?")
data = await call.net_request(Request("music_queue", {"guild_name": guild}), "discord")
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(QueueNH.message_type, QueueNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild, = args.match(r"(?:\[(.+)])?")
data = await self.interface.net_request(Request(QueueNH.message_type, {"guild_name": guild}), "discord")
if data["type"] is None:
await call.reply(" Non c'è nessuna coda di riproduzione attiva al momento.")
await data.reply(" Non c'è nessuna coda di riproduzione attiva al momento.")
return
elif "queue" not in data:
await call.reply(f" La coda di riproduzione attuale ([c]{data['type']}[/c]) non permette l'anteprima.")
await data.reply(f" La coda di riproduzione attuale ([c]{data['type']}[/c]) non permette l'anteprima.")
return
if data["type"] == "Playlist":
if len(data["queue"]["strings"]) == 0:
@ -81,10 +87,10 @@ class QueueCommand(Command):
message = f" Il PlayMode attuale, [c]{data['type']}[/c], è vuoto.\n"
else:
message = f" Il PlayMode attuale, [c]{data['type']}[/c], contiene {len(data['queue']['strings'])} elementi:\n"
if call.interface_name == "discord":
await call.reply(message)
if self.interface.name == "discord":
await data.reply(message)
for embed in pickle.loads(eval(data["queue"]["pickled_embeds"]))[:5]:
await call.channel.send(embed=embed)
await data.message.channel.send(embed=embed)
else:
message += numberemojiformat(data["queue"]["strings"][:10])
await call.reply(message)
await data.reply(message)

View file

@ -0,0 +1,20 @@
import typing
import random
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
class RageCommand(Command):
name: str = "ship"
description: str = "Arrabbiati per qualcosa, come una software house californiana."
MAD = ["MADDEN MADDEN MADDEN MADDEN",
"EA bad, praise Geraldo!",
"Stai sfogando la tua ira sul bot!",
"Basta, io cambio gilda!",
"Fondiamo la RRYG!"]
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply(f"😠 {random.sample(self.MAD, 1)[0]}")

View file

@ -0,0 +1,79 @@
import typing
import dateparser
import datetime
import pickle
import telegram
import discord
from sqlalchemy import and_
from ..command import Command
from ..commandargs import CommandArgs
from ..commandinterface import CommandInterface
from ..commanddata import CommandData
from ...utils import sleep_until, asyncify, telegram_escape, discord_escape
from ...database.tables import Reminder
from ...error import *
class ReminderCommand(Command):
name: str = "reminder"
description: str = "Ti ricorda di fare qualcosa dopo un po' di tempo."
syntax: str = "[ (data) ] (messaggio)"
require_alchemy_tables = {Reminder}
def __init__(self, interface: CommandInterface):
super().__init__(interface)
reminders = (
interface.session
.query(interface.alchemy.Reminder)
.filter(and_(
interface.alchemy.Reminder.datetime >= datetime.datetime.now(),
interface.alchemy.Reminder.interface_name == interface.name))
.all()
)
for reminder in reminders:
interface.loop.create_task(self.remind(reminder))
async def remind(self, reminder):
await sleep_until(reminder.datetime)
if self.interface.name == "telegram":
chat_id: int = pickle.loads(reminder.interface_data)
bot: telegram.Bot = self.interface.bot.client
await asyncify(bot.send_message,
chat_id=chat_id,
text=telegram_escape(f"❗️ {reminder.message}"),
parse_mode="HTML",
disable_web_page_preview=True)
elif self.interface.name == "discord":
channel_id: int = pickle.loads(reminder.interface_data)
bot: discord.Client = self.interface.bot.client
channel = bot.get_channel(channel_id)
await channel.send(discord_escape(f"❗️ {reminder.message}"))
async def run(self, args: CommandArgs, data: CommandData) -> None:
date_str, reminder_text = args.match(r"\[ *(.+?) *] *(.+?) *$")
try:
date: typing.Optional[datetime.datetime] = dateparser.parse(date_str)
except OverflowError:
date = None
if date is None:
await data.reply("⚠️ La data che hai inserito non è valida.")
return
await data.reply(f"✅ Promemoria impostato per [b]{date.strftime('%Y-%m-%d %H:%M:%S')}[/b]")
if self.interface.name == "telegram":
interface_data = pickle.dumps(data.update.effective_chat.id)
elif self.interface.name == "discord":
interface_data = pickle.dumps(data.message.channel.id)
else:
raise UnsupportedError("Interface not supported")
creator = await data.get_author()
reminder = self.interface.alchemy.Reminder(creator=creator,
interface_name=self.interface.name,
interface_data=interface_data,
datetime=date,
message=reminder_text)
self.interface.loop.create_task(self.remind(reminder))
self.interface.session.add(reminder)
await asyncify(self.interface.session.commit)

View file

@ -1,22 +1,23 @@
import typing
import re
from ..utils import Command, Call, safeformat
SHIP_RESULT = "💕 {one} + {two} = [b]{result}[/b]"
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import safeformat
class ShipCommand(Command):
name: str = "ship"
command_name = "ship"
command_description = "Crea una ship tra due cose."
command_syntax = "(uno) (due)"
description: str = "Crea una ship tra due nomi."
@classmethod
async def common(cls, call: Call):
name_one = call.args[0]
name_two = call.args[1]
syntax = "(nomeuno) (nomedue)"
async def run(self, args: CommandArgs, data: CommandData) -> None:
name_one = args[0]
name_two = args[1]
if name_two == "+":
name_two = call.args[2]
name_two = args[2]
name_one = name_one.lower()
name_two = name_two.lower()
# Get all letters until the first vowel, included
@ -33,7 +34,7 @@ class ShipCommand(Command):
part_two = match_two.group(0)
# Combine the two name parts
mixed = part_one + part_two
await call.reply(safeformat(SHIP_RESULT,
await data.reply(safeformat("💕 {one} + {two} = [b]{result}[/b]",
one=name_one.capitalize(),
two=name_two.capitalize(),
result=mixed.capitalize()))

View file

@ -1,10 +1,16 @@
import typing
import pickle
import discord
from ..network import Request, ResponseSuccess
from ..utils import Command, Call, NetworkHandler
from ..error import TooManyFoundError, NoneFoundError
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler, asyncify
from ...network import Request, ResponseSuccess
from ...error import *
from ...audio import YtdlDiscord
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
from ...bots import DiscordBot
class SkipNH(NetworkHandler):
@ -31,15 +37,17 @@ class SkipNH(NetworkHandler):
class SkipCommand(Command):
name: str = "skip"
command_name = "skip"
command_description = "Salta la canzone attualmente in riproduzione in chat vocale."
command_syntax = "[ [guild] ]"
description: str = "Salta la canzone attualmente in riproduzione in chat vocale."
network_handlers = [SkipNH]
syntax: str = "[ [guild] ]"
@classmethod
async def common(cls, call: Call):
guild, = call.args.match(r"(?:\[(.+)])?")
await call.net_request(Request("music_skip", {"guild_name": guild}), "discord")
await call.reply(f"✅ Richiesto lo skip della canzone attuale.")
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(SkipNH.message_type, SkipNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild, = args.match(r"(?:\[(.+)])?")
await self.interface.net_request(Request(SkipNH.message_type, {"guild_name": guild}), "discord")
await data.reply(f"⏩ Richiesto lo skip della canzone attuale.")

View file

@ -0,0 +1,79 @@
import typing
import random
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import safeformat
class SmecdsCommand(Command):
name: str = "smecds"
description: str = "Secondo me, è colpa dello stagista..."
syntax = ""
DS_LIST = ["della secca", "del seccatore", "del secchiello", "del secchio", "del secchione", "del secondino",
"del sedano", "del sedativo", "della sedia", "del sedicente", "del sedile", "della sega", "del segale",
"della segatura", "della seggiola", "del seggiolino", "della seggiovia", "della segheria",
"del seghetto",
"del segnalibro", "del segnaposto", "del segno", "del segretario", "della segreteria", "del seguace",
"del segugio", "della selce", "della sella", "della selz", "della selva", "della selvaggina",
"del semaforo",
"del seme", "del semifreddo", "del seminario", "della seminarista", "della semola", "del semolino",
"del semplicione", "della senape", "del senatore", "del seno", "del sensore", "della sentenza",
"della sentinella", "del sentore", "della seppia", "del sequestratore", "della serenata", "del sergente",
"del sermone", "della serpe", "del serpente", "della serpentina", "della serra", "del serraglio",
"del serramanico", "della serranda", "della serratura", "del servitore", "della servitù",
"del servizievole",
"del servo", "del set", "della seta", "della setola", "del sidecar", "del siderurgico", "del sidro",
"della siepe", "del sifone", "della sigaretta", "del sigaro", "del sigillo", "della signora",
"della signorina", "del silenziatore", "della silhouette", "del silicio", "del silicone", "del siluro",
"della sinagoga", "della sindacalista", "del sindacato", "del sindaco", "della sindrome",
"della sinfonia",
"del sipario", "del sire", "della sirena", "della siringa", "del sismografo", "del sobborgo",
"del sobillatore", "del sobrio", "del soccorritore", "del socio", "del sociologo", "della soda",
"del sofà",
"della soffitta", "del software", "dello sogghignare", "del soggiorno", "della sogliola",
"del sognatore",
"della soia", "del solaio", "del solco", "del soldato", "del soldo", "del sole", "della soletta",
"della solista", "del solitario", "del sollazzare", "del sollazzo", "del sollecito", "del solleone",
"del solletico", "del sollevare", "del sollievo", "del solstizio", "del solubile", "del solvente",
"della soluzione", "del somaro", "del sombrero", "del sommergibile", "del sommo", "della sommossa",
"del sommozzatore", "del sonar", "della sonda", "del sondaggio", "del sondare", "del sonnacchioso",
"del sonnambulo", "del sonnellino", "del sonnifero", "del sonno", "della sonnolenza", "del sontuoso",
"del soppalco", "del soprabito", "del sopracciglio", "del sopraffare", "del sopraffino",
"del sopraluogo",
"del sopramobile", "del soprannome", "del soprano", "del soprappensiero", "del soprassalto",
"del soprassedere", "del sopravvento", "del sopravvivere", "del soqquadro", "del sorbetto",
"del sordido",
"della sordina", "del sordo", "della sorella", "della sorgente", "del sornione", "del sorpasso",
"della sorpresa", "del sorreggere", "del sorridere", "della sorsata", "del sorteggio", "del sortilegio",
"del sorvegliante", "del sorvolare", "del sosia", "del sospettoso", "del sospirare", "della sosta",
"della sostanza", "del sostegno", "del sostenitore", "del sostituto", "del sottaceto", "della sottana",
"del sotterfugio", "del sotterraneo", "del sottile", "del sottilizzare", "del sottintendere",
"del sottobanco", "del sottobosco", "del sottomarino", "del sottopassaggio", "del sottoposto",
"del sottoscala", "della sottoscrizione", "del sottostare", "del sottosuolo", "del sottotetto",
"del sottotitolo", "del sottovalutare", "del sottovaso", "della sottoveste", "del sottovuoto",
"del sottufficiale", "della soubrette", "del souvenir", "del soverchiare", "del sovrano",
"del sovrapprezzo",
"della sovvenzione", "del sovversivo", "del sozzo", "dello suadente", "del sub", "del subalterno",
"del subbuglio", "del subdolo", "del sublime", "del suburbano", "del successore", "del succo",
"della succube", "del succulento", "della succursale", "del sudario", "della sudditanza", "del suddito",
"del sudicio", "del suffisso", "del suffragio", "del suffumigio", "del suggeritore", "del sughero",
"del sugo", "del suino", "della suite", "del sulfureo", "del sultano", "di Steffo", "di Spaggia",
"di Sabrina", "del sas", "del ses", "del sis", "del sos", "del sus", "della supremazia",
"del Santissimo",
"della scatola", "del supercalifragilistichespiralidoso", "del sale", "del salame", "di (Town of) Salem",
"di Stronghold", "di SOMA", "dei Saints", "di S.T.A.L.K.E.R.", "di Sanctum", "dei Sims", "di Sid",
"delle Skullgirls", "di Sonic", "di Spiral (Knights)", "di Spore", "di Starbound", "di SimCity",
"di Sensei",
"di Ssssssssssssss... Boom! E' esploso il dizionario", "della scala", "di Sakura", "di Suzie",
"di Shinji",
"del senpai", "del support", "di Superman", "di Sekiro", "dello Slime God", "del salassato",
"della salsa"]
SMECDS = "🤔 Secondo me, è colpa {ds}."
async def run(self, args: CommandArgs, data: CommandData) -> None:
ds = random.sample(self.DS_LIST, 1)[0]
await data.reply(safeformat(self.SMECDS, ds=ds))

View file

@ -0,0 +1,74 @@
import typing
import discord
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...utils import NetworkHandler
from ...network import Request, ResponseSuccess
from ...error import NoneFoundError
if typing.TYPE_CHECKING:
from ...bots import DiscordBot
class SummonNH(NetworkHandler):
message_type = "music_summon"
@classmethod
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a summon Royalnet request.
That is, join a voice channel, or move to a different one if that is not possible."""
channel = bot.client.find_channel_by_name(data["channel_name"])
if not isinstance(channel, discord.VoiceChannel):
raise NoneFoundError("Channel is not a voice channel")
bot.loop.create_task(bot.client.vc_connect_or_move(channel))
return ResponseSuccess()
class SummonCommand(Command):
name: str = "summon"
description: str = "Evoca il bot in un canale vocale."
syntax: str = "[nomecanale]"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
interface.register_net_handler(SummonNH.message_type, SummonNH)
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "discord":
bot = self.interface.bot.client
message: discord.Message = data.message
channel_name: str = args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await data.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await data.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await data.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
await bot.vc_connect_or_move(channel)
await data.reply(f"✅ Mi sono connesso in [c]#{channel.name}[/c].")
else:
channel_name: str = args[0].lstrip("#")
await self.interface.net_request(Request(SummonNH.message_type, {"channel_name": channel_name}), "discord")
await data.reply(f"✅ Mi sono connesso in [c]#{channel_name}[/c].")

View file

@ -0,0 +1,51 @@
import typing
import discord
from ..command import Command
from ..commandargs import CommandArgs
from ..commanddata import CommandData
from ...error import *
class VideochannelCommand(Command):
name: str = "videochannel"
description: str = "Converti il canale vocale in un canale video."
syntax = "[channelname]"
async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name == "discord":
bot: discord.Client = self.interface.bot
message: discord.Message = data.message
channel_name: str = args.optional(0)
if channel_name:
guild: typing.Optional[discord.Guild] = message.guild
if guild is not None:
channels: typing.List[discord.abc.GuildChannel] = guild.channels
else:
channels = bot.get_all_channels()
matching_channels: typing.List[discord.VoiceChannel] = []
for channel in channels:
if isinstance(channel, discord.VoiceChannel):
if channel.name == channel_name:
matching_channels.append(channel)
if len(matching_channels) == 0:
await data.reply("⚠️ Non esiste alcun canale vocale con il nome specificato.")
return
elif len(matching_channels) > 1:
await data.reply("⚠️ Esiste più di un canale vocale con il nome specificato.")
return
channel = matching_channels[0]
else:
author: discord.Member = message.author
voice: typing.Optional[discord.VoiceState] = author.voice
if voice is None:
await data.reply("⚠️ Non sei connesso a nessun canale vocale!")
return
channel = voice.channel
if author.is_on_mobile():
await data.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>\n[b]Attenzione: la modalità video non funziona su Discord per Android e iOS![/b]")
return
await data.reply(f"📹 Per entrare in modalità video, clicca qui: <https://discordapp.com/channels/{channel.guild.id}/{channel.id}>")
else:
raise UnsupportedError(f"This command is not supported on {self.interface.name.capitalize()}.")

View file

@ -11,6 +11,7 @@ from .wikirevisions import WikiRevision
from .medals import Medal
from .medalawards import MedalAward
from .bios import Bio
from .reminders import Reminder
__all__ = ["Royal", "Telegram", "Diario", "Alias", "ActiveKvGroup", "Keyvalue", "Keygroup", "Discord", "WikiPage",
"WikiRevision", "Medal", "MedalAward", "Bio"]
"WikiRevision", "Medal", "MedalAward", "Bio", "Reminder"]

View file

@ -0,0 +1,48 @@
from sqlalchemy import Column, \
Integer, \
String, \
LargeBinary, \
DateTime, \
ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declared_attr
# noinspection PyUnresolvedReferences
from .royals import Royal
class Reminder:
__tablename__ = "reminder"
@declared_attr
def reminder_id(self):
return Column(Integer, primary_key=True)
@declared_attr
def creator_id(self):
return Column(Integer, ForeignKey("royals.uid"))
@declared_attr
def creator(self):
return relationship("Royal", backref="reminders_created")
@declared_attr
def interface_name(self):
return Column(String)
@declared_attr
def interface_data(self):
return Column(LargeBinary)
@declared_attr
def datetime(self):
return Column(DateTime)
@declared_attr
def message(self):
return Column(String)
def __repr__(self):
return f"<Reminder for {self.datetime.isoformat()} about {self.message}>"
def __str__(self):
return self.message

View file

@ -19,7 +19,15 @@ log.addHandler(stream_handler)
commands = [PingCommand,
ColorCommand,
CiaoruoziCommand,
CvCommand]
CvCommand,
DiarioCommand,
Mp3Command,
SummonCommand,
PauseCommand,
PlayCommand,
PlaymodeCommand,
QueueCommand,
ReminderCommand]
# noinspection PyUnreachableCode
if __debug__: