mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
871 lines
36 KiB
Python
871 lines
36 KiB
Python
import datetime
|
|
import random
|
|
import typing
|
|
import db
|
|
from utils import smecds, cast, errors, emojify
|
|
# python-telegram-bot has a different name
|
|
# noinspection PyPackageRequirements
|
|
import telegram
|
|
# noinspection PyPackageRequirements
|
|
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler
|
|
# noinspection PyPackageRequirements
|
|
from telegram.error import TimedOut, Unauthorized, BadRequest, TelegramError
|
|
import dice
|
|
import sys
|
|
import os
|
|
import re
|
|
import logging
|
|
import configparser
|
|
import markovify
|
|
import raven
|
|
import coloredlogs
|
|
import strings
|
|
import time
|
|
import requests
|
|
IKMarkup = telegram.InlineKeyboardMarkup
|
|
IKButton = telegram.InlineKeyboardButton
|
|
|
|
# Markov models
|
|
try:
|
|
with open("markovmodels/default.json") as file:
|
|
default_model = markovify.Text.from_json(file.read())
|
|
except Exception:
|
|
default_model = None
|
|
try:
|
|
with open("markovmodels/dnd4.json") as file:
|
|
dnd4_model = markovify.Text.from_json(file.read())
|
|
except Exception:
|
|
dnd4_model = None
|
|
|
|
logging.getLogger().disabled = True
|
|
logger = logging.getLogger(__name__)
|
|
os.environ["COLOREDLOGS_LOG_FORMAT"] = "%(asctime)s %(levelname)s %(name)s %(message)s"
|
|
coloredlogs.install(level="DEBUG", logger=logger)
|
|
|
|
# Init the config reader
|
|
config = configparser.ConfigParser()
|
|
config.read("config.ini")
|
|
main_group_id = int(config["Telegram"]["main_group"])
|
|
|
|
discord_connection = None
|
|
|
|
# Init the Sentry client
|
|
sentry = raven.Client(config["Sentry"]["token"],
|
|
release=raven.fetch_git_sha(os.path.dirname(__file__)),
|
|
install_logging_hook=False,
|
|
hook_libraries=[])
|
|
|
|
|
|
def reply_msg(bot: telegram.Bot, chat_id: int, string: str, ignore_escaping=False, disable_web_page_preview=True, **kwargs) -> telegram.Message:
|
|
string = strings.safely_format_string(string, ignore_escaping=ignore_escaping, words=kwargs)
|
|
return bot.send_message(chat_id, string,
|
|
parse_mode="HTML",
|
|
disable_web_page_preview=disable_web_page_preview)
|
|
|
|
|
|
def reply(bot: telegram.Bot, update: telegram.Update, string: str, ignore_escaping=False, disable_web_page_preview=True, **kwargs) -> telegram.Message:
|
|
while True:
|
|
try:
|
|
return reply_msg(bot, update.message.chat.id, string, ignore_escaping=ignore_escaping, disable_web_page_preview=disable_web_page_preview, **kwargs)
|
|
except Unauthorized:
|
|
if update.message.chat.type == telegram.Chat.PRIVATE:
|
|
return reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.UNAUTHORIZED_USER,
|
|
mention=update.message.from_user.mention_html())
|
|
else:
|
|
return reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.UNAUTHORIZED_GROUP,
|
|
group=(update.message.chat.title or update.message.chat.id))
|
|
except TimedOut:
|
|
time.sleep(1)
|
|
pass
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
def command(func: "function"):
|
|
def new_func(bot: telegram.Bot, update: telegram.Update):
|
|
# noinspection PyBroadException
|
|
try:
|
|
bot.send_chat_action(update.message.chat.id, telegram.ChatAction.TYPING)
|
|
return func(bot, update)
|
|
except TimedOut:
|
|
logger.warning(f"Telegram timed out in {update}")
|
|
except Exception:
|
|
logger.error(f"Critical error: {sys.exc_info()}")
|
|
# noinspection PyBroadException
|
|
try:
|
|
reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.CRITICAL_ERROR,
|
|
exc_info=repr(sys.exc_info()))
|
|
except Exception:
|
|
logger.error(f"Double critical error: {sys.exc_info()}")
|
|
sentry.user_context({
|
|
"id": update.effective_user.id,
|
|
"telegram": {
|
|
"username": update.effective_user.username,
|
|
"first_name": update.effective_user.first_name,
|
|
"last_name": update.effective_user.last_name
|
|
}
|
|
})
|
|
sentry.extra_context({
|
|
"update": update.to_dict()
|
|
})
|
|
sentry.captureException()
|
|
return new_func
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
def database_access(func: "function"):
|
|
def new_func(bot: telegram.Bot, update: telegram.Update):
|
|
try:
|
|
session = db.Session()
|
|
return func(bot, update, session)
|
|
except Exception:
|
|
logger.error(f"Database error: {sys.exc_info()}")
|
|
sentry.captureException()
|
|
finally:
|
|
try:
|
|
session.close()
|
|
except Exception:
|
|
pass
|
|
return new_func
|
|
|
|
|
|
@command
|
|
def cmd_ping(bot: telegram.Bot, update: telegram.Update):
|
|
reply(bot, update, strings.PONG)
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_link(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
try:
|
|
username = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reply(bot, update, strings.LINK.ERRORS.INVALID_SYNTAX)
|
|
session.close()
|
|
return
|
|
try:
|
|
t = db.Telegram.create(session,
|
|
royal_username=username,
|
|
telegram_user=update.message.from_user)
|
|
except errors.NotFoundError:
|
|
reply(bot, update, strings.LINK.ERRORS.NOT_FOUND)
|
|
session.close()
|
|
return
|
|
except errors.AlreadyExistingError:
|
|
reply(bot, update, strings.LINK.ERRORS.ALREADY_EXISTING)
|
|
session.close()
|
|
return
|
|
session.add(t)
|
|
session.commit()
|
|
reply(bot, update, strings.LINK.SUCCESS)
|
|
|
|
|
|
@command
|
|
def cmd_cv(bot: telegram.Bot, update: telegram.Update):
|
|
if discord_connection is None:
|
|
reply(bot, update, strings.BRIDGE.ERRORS.INACTIVE_BRIDGE)
|
|
return
|
|
# dirty hack as usual
|
|
if update.message.text.endswith("full"):
|
|
discord_connection.send("get cv full")
|
|
else:
|
|
discord_connection.send("get cv")
|
|
message = discord_connection.recv()
|
|
reply(bot, update, message)
|
|
|
|
|
|
@command
|
|
def cmd_cast(bot: telegram.Bot, update: telegram.Update):
|
|
reply(bot, update, strings.CAST.ERRORS.NOT_YET_AVAILABLE)
|
|
|
|
|
|
@command
|
|
def cmd_color(bot: telegram.Bot, update: telegram.Update):
|
|
reply(bot, update, strings.COLOR)
|
|
|
|
|
|
@command
|
|
def cmd_smecds(bot: telegram.Bot, update: telegram.Update):
|
|
ds = random.sample(smecds, 1)[0]
|
|
reply(bot, update, strings.SMECDS, ds=ds)
|
|
|
|
|
|
@command
|
|
def cmd_ciaoruozi(bot: telegram.Bot, update: telegram.Update):
|
|
if update.message.from_user.username.lstrip("@") == "MeStakes":
|
|
reply(bot, update, strings.CIAORUOZI.THE_LEGEND_HIMSELF)
|
|
else:
|
|
reply(bot, update, strings.CIAORUOZI.SOMEBODY_ELSE)
|
|
|
|
|
|
@command
|
|
def cmd_ahnonlosoio(bot: telegram.Bot, update: telegram.Update):
|
|
if update.message.reply_to_message is not None and update.message.reply_to_message.text in [
|
|
"/ahnonlosoio", "/ahnonlosoio@royalgamesbot", strings.AHNONLOSOIO.ONCE, strings.AHNONLOSOIO.AGAIN
|
|
]:
|
|
reply(bot, update, strings.AHNONLOSOIO.AGAIN)
|
|
else:
|
|
reply(bot, update, strings.AHNONLOSOIO.ONCE)
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_balurage(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
|
|
if user is None:
|
|
reply(bot, update, strings.LINK.ERRORS.ROYALNET_NOT_LINKED)
|
|
return
|
|
try:
|
|
reason = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reason = None
|
|
br = db.BaluRage(royal_id=user.royal_id, reason=reason)
|
|
session.add(br)
|
|
session.commit()
|
|
bot.send_message(update.message.chat.id, f"😡 Stai sfogando la tua ira sul bot!")
|
|
|
|
|
|
def find_author(session: db.Session, text: str):
|
|
author_match = re.match(r".*(?:—|-{1,2}) ?@?([A-Za-z0-9_]+)$", text)
|
|
if author_match is None:
|
|
return None
|
|
author_string = author_match.group(1).lower()
|
|
author = session.query(db.Royal).filter(db.func.lower(db.Royal.username) == author_string).first().telegram[0]
|
|
if author is not None:
|
|
return author
|
|
author = session.query(db.Telegram).filter(db.func.lower(db.Telegram.username) == author_string).first()
|
|
if author is not None:
|
|
return author
|
|
return None
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_diario(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
|
|
if user is None:
|
|
reply(bot, update, strings.LINK.ERRORS.ROYALNET_NOT_LINKED)
|
|
return
|
|
try:
|
|
text = update.message.text.split(" ", 1)[1]
|
|
saver = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
|
|
author = find_author(session, text)
|
|
except IndexError:
|
|
if update.message.reply_to_message is None:
|
|
reply(bot, update, strings.DIARIO.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
text = update.message.reply_to_message.text
|
|
author = session.query(db.Telegram)\
|
|
.filter_by(telegram_id=update.message.reply_to_message.from_user.id)\
|
|
.one_or_none()
|
|
saver = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
|
|
if text is None:
|
|
reply(bot, update, strings.DIARIO.ERRORS.NO_TEXT)
|
|
return
|
|
diario = db.Diario(timestamp=datetime.datetime.now(),
|
|
saver=saver,
|
|
author=author,
|
|
text=text)
|
|
session.add(diario)
|
|
session.commit()
|
|
reply(bot, update, strings.DIARIO.SUCCESS, ignore_escaping=True, diario=diario.to_telegram())
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_vote(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
|
|
if user is None:
|
|
bot.send_message(update.message.chat.id,
|
|
"⚠ Il tuo account Telegram non è registrato al RYGdb!"
|
|
" Registrati con `/register@royalgamesbot <nomeutenteryg>`.", parse_mode="Markdown")
|
|
return
|
|
try:
|
|
_, mode, question = update.message.text.split(" ", 2)
|
|
except IndexError:
|
|
bot.send_message(update.message.chat.id,
|
|
"⚠ Non hai specificato tutti i parametri necessari!"
|
|
"Sintassi: `/vote@royalgamesbot <public|secret> <domanda>`", parse_mode="Markdown")
|
|
return
|
|
if mode == "public":
|
|
vote = db.VoteQuestion(question=question, anonymous=False)
|
|
elif mode == "secret":
|
|
vote = db.VoteQuestion(question=question, anonymous=True)
|
|
else:
|
|
bot.send_message(update.message.chat.id,
|
|
"⚠ Non hai specificato una modalità valida!"
|
|
"Sintassi: `/vote@royalgamesbot <public|secret> <domanda>`", parse_mode="Markdown")
|
|
return
|
|
session.add(vote)
|
|
session.flush()
|
|
inline_keyboard = IKMarkup([[IKButton("🔵 Sì", callback_data="vote_yes")],
|
|
[IKButton("🔴 No", callback_data="vote_no")],
|
|
[IKButton("⚫️ Astieniti", callback_data="vote_abstain")]])
|
|
message = bot.send_message(update.message.chat.id, vote.generate_text(session=session),
|
|
reply_markup=inline_keyboard,
|
|
parse_mode="HTML")
|
|
vote.message_id = message.message_id
|
|
session.commit()
|
|
|
|
|
|
def generate_search_message(term, entries):
|
|
msg = strings.DIARIOSEARCH.HEADER.format(term=term)
|
|
if len(entries) < 100:
|
|
for entry in entries[:5]:
|
|
msg += f'<a href="https://ryg.steffo.eu/diario#entry-{entry.id}">#{entry.id}</a> di <i>{entry.author or "Anonimo"}</i>\n{entry.text}\n\n'
|
|
if len(entries) > 5:
|
|
msg += "I termini comapiono anche nelle righe:\n"
|
|
for entry in entries[5:]:
|
|
msg += f'<a href="https://ryg.steffo.eu/diario#entry-{entry.id}">#{entry.id}</a> '
|
|
else:
|
|
for entry in entries[:100]:
|
|
msg += f'<a href="https://ryg.steffo.eu/diario#entry-{entry.id}">#{entry.id}</a> '
|
|
for entry in entries[100:]:
|
|
msg += f"#{entry.id} "
|
|
return msg
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_search(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
try:
|
|
query = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reply(bot, update, strings.DIARIOSEARCH.ERRORS.INVALID_SYNTAX, command="search")
|
|
return
|
|
query = query.replace('%', '\\%').replace('_', '\\_')
|
|
entries = session.query(db.Diario)\
|
|
.filter(db.Diario.text.op("~*")(r"(?:[\s\.,:;!?\"'<{([]+|^)"
|
|
+ query +
|
|
r"(?:[\s\.,:;!?\"'>\})\]]+|$)"))\
|
|
.order_by(db.Diario.id.desc())\
|
|
.all()
|
|
bot.send_message(update.message.chat.id, generate_search_message(f"<b>{query}</b>", entries), parse_mode="HTML")
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_regex(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
try:
|
|
query = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reply(bot, update, strings.DIARIOSEARCH.ERRORS.INVALID_SYNTAX, command="regex")
|
|
return
|
|
query = query.replace('%', '\\%').replace('_', '\\_')
|
|
entries = session.query(db.Diario).filter(db.Diario.text.op("~*")(query)).order_by(db.Diario.id.desc()).all()
|
|
try:
|
|
bot.send_message(update.message.chat.id, generate_search_message(f"<code>{query}</code>", entries), parse_mode="HTML")
|
|
except (BadRequest, TelegramError):
|
|
reply(bot, update, strings.DIARIOSEARCH.ERRORS.RESULTS_TOO_LONG)
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_mm(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
|
|
if user is None:
|
|
reply(bot, update, strings.LINK.ERRORS.ROYALNET_NOT_LINKED)
|
|
return
|
|
match = re.match(r"/(?:mm|matchmaking)(?:@royalgamesbot)?(?: (?:([0-9]+)-)?([0-9]+))? (?:per )?([A-Za-z0-9!\-_. ]+)(?:.*\n(.+))?",
|
|
update.message.text)
|
|
if match is None:
|
|
reply(bot, update, strings.MATCHMAKING.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
min_players, max_players, match_name, match_desc = match.group(1, 2, 3, 4)
|
|
db_match = db.Match(timestamp=datetime.datetime.now(),
|
|
match_title=match_name,
|
|
match_desc=match_desc,
|
|
min_players=min_players,
|
|
max_players=max_players,
|
|
creator=user)
|
|
session.add(db_match)
|
|
session.flush()
|
|
inline_keyboard = IKMarkup([([IKButton(strings.MATCHMAKING.BUTTONS[key], callback_data=key)]) for key in strings.MATCHMAKING.BUTTONS])
|
|
message = bot.send_message(config["Telegram"]["announcement_group"], db_match.generate_text(session=session),
|
|
parse_mode="HTML",
|
|
reply_markup=inline_keyboard)
|
|
db_match.message_id = message.message_id
|
|
session.commit()
|
|
|
|
|
|
def on_callback_query(bot: telegram.Bot, update: telegram.Update):
|
|
try:
|
|
session = db.Session()
|
|
if update.callback_query.data.startswith("vote_"):
|
|
if update.callback_query.data == "vote_yes":
|
|
status = db.VoteChoices.YES
|
|
emoji = "🔵"
|
|
elif update.callback_query.data == "vote_no":
|
|
status = db.VoteChoices.NO
|
|
emoji = "🔴"
|
|
elif update.callback_query.data == "vote_abstain":
|
|
status = db.VoteChoices.ABSTAIN
|
|
emoji = "⚫️"
|
|
else:
|
|
raise NotImplementedError()
|
|
user = session.query(db.Telegram).filter_by(telegram_id=update.callback_query.from_user.id).one_or_none()
|
|
if user is None:
|
|
bot.answer_callback_query(update.callback_query.id, show_alert=True,
|
|
text=strings.LINK.ERRORS.ROYALNET_NOT_LINKED,
|
|
parse_mode="Markdown")
|
|
return
|
|
question = session.query(db.VoteQuestion)\
|
|
.filter_by(message_id=update.callback_query.message.message_id)\
|
|
.one()
|
|
answer = session.query(db.VoteAnswer).filter_by(question=question, user=user).one_or_none()
|
|
if answer is None:
|
|
answer = db.VoteAnswer(question=question, choice=status, user=user)
|
|
session.add(answer)
|
|
bot.answer_callback_query(update.callback_query.id, text=f"Hai votato {emoji}.", cache_time=1)
|
|
elif answer.choice == status:
|
|
session.delete(answer)
|
|
bot.answer_callback_query(update.callback_query.id, text=f"Hai ritratto il tuo voto.", cache_time=1)
|
|
else:
|
|
answer.choice = status
|
|
bot.answer_callback_query(update.callback_query.id, text=f"Hai cambiato il tuo voto in {emoji}.",
|
|
cache_time=1)
|
|
session.commit()
|
|
inline_keyboard = IKMarkup([[IKButton("🔵 Sì", callback_data="vote_yes")],
|
|
[IKButton("🔴 No", callback_data="vote_no")],
|
|
[IKButton("⚫️ Astieniti", callback_data="vote_abstain")]])
|
|
bot.edit_message_text(message_id=update.callback_query.message.message_id,
|
|
chat_id=update.callback_query.message.chat.id,
|
|
text=question.generate_text(session),
|
|
reply_markup=inline_keyboard,
|
|
parse_mode="HTML")
|
|
elif update.callback_query.data.startswith("match_"):
|
|
user = session.query(db.Telegram).filter_by(telegram_id=update.callback_query.from_user.id).one_or_none()
|
|
if user is None:
|
|
bot.answer_callback_query(update.callback_query.id,
|
|
show_alert=True,
|
|
text=strings.LINK.ERRORS.ROYALNET_NOT_LINKED,
|
|
parse_mode="Markdown")
|
|
return
|
|
match = session.query(db.Match).filter_by(message_id=update.callback_query.message.message_id).one()
|
|
if update.callback_query.data == "match_close":
|
|
if not (match.creator == user or user.telegram_id == 25167391):
|
|
bot.answer_callback_query(update.callback_query.id,
|
|
show_alert=True,
|
|
text=strings.MATCHMAKING.ERRORS.NOT_ADMIN)
|
|
return
|
|
match.closed = True
|
|
for partecipation in match.players:
|
|
if int(partecipation.status) >= 1:
|
|
try:
|
|
reply_msg(bot, partecipation.user.telegram_id, strings.MATCHMAKING.GAME_START[int(partecipation.status)], **match.format_dict())
|
|
except Unauthorized:
|
|
reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.UNAUTHORIZED_USER,
|
|
mention=partecipation.user.mention())
|
|
elif update.callback_query.data == "match_cancel":
|
|
if not (match.creator == user or user.telegram_id == 25167391):
|
|
bot.answer_callback_query(update.callback_query.id,
|
|
show_alert=True,
|
|
text=strings.MATCHMAKING.ERRORS.NOT_ADMIN)
|
|
return
|
|
match.closed = True
|
|
status = {
|
|
"match_ready": db.MatchmakingStatus.READY,
|
|
"match_wait_for_me": db.MatchmakingStatus.WAIT_FOR_ME,
|
|
"match_maybe": db.MatchmakingStatus.MAYBE,
|
|
"match_ignore": db.MatchmakingStatus.IGNORED,
|
|
"match_close": None,
|
|
"match_cancel": None,
|
|
}.get(update.callback_query.data)
|
|
if status:
|
|
if match.closed:
|
|
bot.answer_callback_query(update.callback_query.id,
|
|
show_alert=True,
|
|
text=strings.MATCHMAKING.ERRORS.MATCH_CLOSED)
|
|
return
|
|
partecipation = session.query(db.MatchPartecipation).filter_by(match=match, user=user).one_or_none()
|
|
if partecipation is None:
|
|
partecipation = db.MatchPartecipation(match=match, status=status.value, user=user)
|
|
session.add(partecipation)
|
|
else:
|
|
partecipation.status = status.value
|
|
session.commit()
|
|
bot.answer_callback_query(update.callback_query.id,
|
|
text=strings.MATCHMAKING.TICKER_TEXT[update.callback_query.data],
|
|
cache_time=1)
|
|
if not match.closed:
|
|
inline_keyboard = IKMarkup([([IKButton(strings.MATCHMAKING.BUTTONS[key], callback_data=key)]) for key in strings.MATCHMAKING.BUTTONS])
|
|
else:
|
|
inline_keyboard = None
|
|
while True:
|
|
try:
|
|
bot.edit_message_text(message_id=update.callback_query.message.message_id,
|
|
chat_id=config["Telegram"]["announcement_group"],
|
|
text=match.generate_text(session),
|
|
reply_markup=inline_keyboard,
|
|
parse_mode="HTML")
|
|
break
|
|
except BadRequest:
|
|
break
|
|
except TimedOut:
|
|
pass
|
|
except Exception:
|
|
try:
|
|
bot.answer_callback_query(update.callback_query.id,
|
|
show_alert=True,
|
|
text=strings.TELEGRAM.ERRORS.CRITICAL_ERROR_QUERY)
|
|
except Exception:
|
|
pass
|
|
logger.error(f"Critical error: {sys.exc_info()}")
|
|
sentry.user_context({
|
|
"id": update.effective_user.id,
|
|
"telegram": {
|
|
"username": update.effective_user.username,
|
|
"first_name": update.effective_user.first_name,
|
|
"last_name": update.effective_user.last_name
|
|
}
|
|
})
|
|
sentry.extra_context({
|
|
"update": update.to_dict()
|
|
})
|
|
sentry.captureException()
|
|
finally:
|
|
try:
|
|
# noinspection PyUnboundLocalVariable
|
|
session.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@command
|
|
def cmd_eat(bot: telegram.Bot, update: telegram.Update):
|
|
try:
|
|
food: str = update.message.text.split(" ", 1)[1].capitalize()
|
|
except IndexError:
|
|
reply(bot, update, strings.EAT.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
reply(bot, update, strings.EAT.FOODS.get(food.lower(), strings.EAT.FOODS["_default"]), food=food)
|
|
|
|
|
|
@command
|
|
def cmd_ship(bot: telegram.Bot, update: telegram.Update):
|
|
try:
|
|
names = update.message.text.split(" ")
|
|
name_one = names[1]
|
|
name_two = names[2]
|
|
except ValueError:
|
|
reply(bot, update, strings.SHIP.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
name_one = name_one.lower()
|
|
name_two = name_two.lower()
|
|
match_one = re.search(r"^[A-Za-z][^aeiouAEIOU]*[aeiouAEIOU]?", name_one)
|
|
if match_one is None:
|
|
part_one = name_one[:int(len(name_one) / 2)]
|
|
else:
|
|
part_one = match_one.group(0)
|
|
match_two = re.search(r"[^aeiouAEIOU]*[aeiouAEIOU]?[A-Za-z]$", name_two)
|
|
if match_two is None:
|
|
part_two = name_two[int(len(name_two) / 2):]
|
|
else:
|
|
part_two = match_two.group(0)
|
|
mixed = part_one + part_two # TODO: find out what exceptions this could possibly raise
|
|
reply(bot, update, strings.SHIP.RESULT,
|
|
one=name_one.capitalize(),
|
|
two=name_two.capitalize(),
|
|
result=mixed.capitalize())
|
|
|
|
|
|
@command
|
|
def cmd_bridge(bot: telegram.Bot, update: telegram.Update):
|
|
if discord_connection is None:
|
|
reply(bot, update, strings.BRIDGE.ERRORS.INACTIVE_BRIDGE)
|
|
return
|
|
try:
|
|
data = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reply(bot, update, strings.BRIDGE.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
discord_connection.send(f"!{data}")
|
|
result = discord_connection.recv()
|
|
if result == "error":
|
|
reply(bot, update, strings.BRIDGE.FAILURE)
|
|
if result == "success":
|
|
reply(bot, update, strings.BRIDGE.SUCCESS)
|
|
|
|
|
|
def parse_timestring(timestring: str) -> typing.Union[datetime.timedelta, datetime.datetime]:
|
|
# Unix time
|
|
try:
|
|
unix_timestamp = float(timestring)
|
|
return datetime.datetime.fromtimestamp(unix_timestamp)
|
|
except ValueError:
|
|
pass
|
|
# Dashed time
|
|
try:
|
|
split_date = timestring.split("-")
|
|
now = datetime.datetime.now()
|
|
if len(split_date) == 5:
|
|
# yyyy-mm-dd-hh-mm
|
|
return datetime.datetime(year=int(split_date[0]),
|
|
month=int(split_date[1]),
|
|
day=int(split_date[2]),
|
|
hour=int(split_date[3]),
|
|
minute=int(split_date[4]))
|
|
elif len(split_date) == 4:
|
|
return now.replace(month=int(split_date[0]),
|
|
day=int(split_date[1]),
|
|
hour=int(split_date[2]),
|
|
minute=int(split_date[3]))
|
|
elif len(split_date) == 3:
|
|
return now.replace(day=int(split_date[0]),
|
|
hour=int(split_date[1]),
|
|
minute=int(split_date[2]))
|
|
elif len(split_date) == 2:
|
|
return now.replace(hour=int(split_date[0]),
|
|
minute=int(split_date[1]))
|
|
except (IndexError, ValueError):
|
|
pass
|
|
# Simple time from now
|
|
try:
|
|
if timestring.endswith("w"):
|
|
return datetime.timedelta(weeks=float(timestring[:-1]))
|
|
elif timestring.endswith("d"):
|
|
return datetime.timedelta(days=float(timestring[:-1]))
|
|
elif timestring.endswith("h"):
|
|
return datetime.timedelta(hours=float(timestring[:-1]))
|
|
elif timestring.endswith("m"):
|
|
return datetime.timedelta(minutes=float(timestring[:-1]))
|
|
except Exception:
|
|
pass
|
|
# Nothing found
|
|
raise ValueError("Nothing was found.")
|
|
|
|
|
|
@command
|
|
def cmd_newevent(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
try:
|
|
_, timestring, name_desc = update.message.text.split(" ", 2)
|
|
except ValueError:
|
|
bot.send_message(update.message.chat.id, "⚠️ Sintassi del comando non valida.\n"
|
|
"Sintassi corretta:\n"
|
|
"```/newevent <timestamp|[[[anno-]mese-]giorno-]ore-minuti"
|
|
"|{numero}{w|d|h|m}> <nome>\n"
|
|
"[descrizione]```", parse_mode="Markdown")
|
|
return
|
|
try:
|
|
name, description = name_desc.split("\n", 1)
|
|
except ValueError:
|
|
name = name_desc
|
|
description = None
|
|
# Parse the timestring
|
|
try:
|
|
parsed_time = parse_timestring(timestring)
|
|
if parsed_time < datetime.datetime.now():
|
|
raise errors.PastDateError()
|
|
except ValueError:
|
|
bot.send_message(update.message.chat.id, "⚠ Non è stato possibile leggere la data.\n"
|
|
"Sintassi corretta:\n"
|
|
"```/newevent <timestamp|[[[anno-]mese-]giorno-]ore-minuti"
|
|
"|{numero}{w|d|h|m}> <nome>\n"
|
|
"[descrizione]```", parse_mode="Markdown")
|
|
return
|
|
except errors.PastDateError:
|
|
bot.send_message(update.message.chat.id, "⚠ La data inserita è una data passata.\n"
|
|
"per favore inserisci una data futura.\n", parse_mode="Markdown")
|
|
return
|
|
# Create the event
|
|
telegram_user = session.query(db.Telegram)\
|
|
.filter_by(telegram_id=update.message.from_user.id)\
|
|
.join(db.Royal)\
|
|
.one_or_none()
|
|
event = db.Event(author=telegram_user.royal,
|
|
name=name,
|
|
description=description,
|
|
time=datetime.datetime.fromtimestamp(0))
|
|
# Set the time
|
|
if isinstance(parsed_time, datetime.datetime):
|
|
event.time = parsed_time
|
|
else:
|
|
event.time_left = parsed_time
|
|
# Save the event
|
|
session.add(event)
|
|
session.commit()
|
|
bot.send_message(update.message.chat.id, "✅ Evento aggiunto al Calendario Royal Games!")
|
|
|
|
|
|
@command
|
|
@database_access
|
|
def cmd_calendar(bot: telegram.Bot, update: telegram.Update, session: db.Session):
|
|
next_events = session.query(db.Event).filter(db.Event.time > datetime.datetime.now()).order_by(db.Event.time).all()
|
|
msg = "📆 Prossimi eventi\n"
|
|
for event in next_events:
|
|
if event.time_left.days >= 1:
|
|
msg += event.time.strftime('%Y-%m-%d %H:%M')
|
|
else:
|
|
msg += f"{int(event.time_left.total_seconds() // 3600)}h" \
|
|
f" {int((event.time_left.total_seconds() % 3600) // 60)}m"
|
|
msg += f" <b>{event.name}</b>\n"
|
|
msg += '\nPer ulteriori dettagli, visita <a href="https://ryg.steffo.eu">Royalnet</a>'
|
|
bot.send_message(update.message.chat.id, msg, parse_mode="HTML", disable_web_page_preview=True)
|
|
|
|
|
|
@command
|
|
def cmd_markov(bot: telegram.Bot, update: telegram.Update):
|
|
if default_model is None:
|
|
reply(bot, update, strings.MARKOV.ERRORS.NO_MODEL)
|
|
return
|
|
try:
|
|
first_word = update.message.text.split(" ")[1]
|
|
except IndexError:
|
|
# Any word
|
|
sentence = default_model.make_sentence(tries=1000)
|
|
if sentence is None:
|
|
reply(bot, update, strings.MARKOV.ERRORS.GENERATION_FAILED)
|
|
return
|
|
reply(bot, update, sentence)
|
|
return
|
|
# Specific word
|
|
try:
|
|
sentence = default_model.make_sentence_with_start(first_word, tries=1000)
|
|
except KeyError:
|
|
reply(bot, update, strings.MARKOV.ERRORS.MISSING_WORD)
|
|
return
|
|
if sentence is None:
|
|
reply(bot, update, strings.MARKOV.ERRORS.SPECIFIC_WORD_FAILED)
|
|
return
|
|
reply(bot, update, sentence)
|
|
|
|
|
|
@command
|
|
def cmd_dndmarkov(bot: telegram.Bot, update: telegram.Update):
|
|
if dnd4_model is None:
|
|
reply(bot, update, strings.MARKOV.ERRORS.NO_MODEL)
|
|
return
|
|
try:
|
|
first_word = update.message.text.split(" ")[1]
|
|
except IndexError:
|
|
# Any word
|
|
sentence = dnd4_model.make_sentence(tries=1000)
|
|
if sentence is None:
|
|
reply(bot, update, strings.MARKOV.ERRORS.GENERATION_FAILED)
|
|
return
|
|
reply(bot, update, sentence)
|
|
return
|
|
# Specific word
|
|
try:
|
|
sentence = dnd4_model.make_sentence_with_start(first_word, tries=1000)
|
|
except KeyError:
|
|
reply(bot, update, strings.MARKOV.ERRORS.MISSING_WORD)
|
|
return
|
|
if sentence is None:
|
|
reply(bot, update, strings.MARKOV.ERRORS.SPECIFIC_WORD_FAILED)
|
|
return
|
|
reply(bot, update, sentence)
|
|
|
|
|
|
def exec_roll(roll) -> str:
|
|
result = int(roll.evaluate())
|
|
string = ""
|
|
if isinstance(roll, dice.elements.Dice):
|
|
string += f"<b>{result}</b>"
|
|
else:
|
|
for index, operand in enumerate(roll.original_operands):
|
|
if operand != roll.operands[index]:
|
|
string += f"<i>{roll.operands[index]}</i>"
|
|
else:
|
|
string += f"{operand}"
|
|
if index + 1 != len(roll.original_operands):
|
|
|
|
string += strings.ROLL.SYMBOLS[roll.__class__]
|
|
string += f"=<b>{result}</b>"
|
|
return string
|
|
|
|
|
|
@command
|
|
def cmd_roll(bot: telegram.Bot, update: telegram.Update):
|
|
dice_string = update.message.text.split(" ", 1)[1]
|
|
try:
|
|
roll = dice.roll(f"{dice_string}", raw=True)
|
|
except dice.DiceBaseException:
|
|
reply(bot, update, strings.ROLL.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
try:
|
|
result = exec_roll(roll)
|
|
except dice.DiceFatalException:
|
|
reply(bot, update, strings.ROLL.ERRORS.DICE_ERROR)
|
|
return
|
|
reply(bot, update, strings.ROLL.SUCCESS, result=result, ignore_escaping=True)
|
|
|
|
|
|
@command
|
|
def cmd_start(bot: telegram.Bot, update: telegram.Update):
|
|
reply(bot, update, strings.TELEGRAM.BOT_STARTED)
|
|
|
|
|
|
@command
|
|
def cmd_spell(bot: telegram.Bot, update: telegram.Update):
|
|
try:
|
|
spell_name: str = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reply(bot, update, strings.SPELL.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
spell = cast.Spell(spell_name)
|
|
reply(bot, update, spell.stringify())
|
|
|
|
|
|
@command
|
|
def cmd_emojify(bot: telegram.Bot, update: telegram.Update):
|
|
try:
|
|
string: str = update.message.text.split(" ", 1)[1]
|
|
except IndexError:
|
|
reply(bot, update, strings.EMOJIFY.ERRORS.INVALID_SYNTAX)
|
|
return
|
|
msg = emojify(string)
|
|
reply(bot, update, strings.EMOJIFY.RESPONSE, emojified=msg)
|
|
|
|
|
|
@command
|
|
def cmd_pug(bot: telegram.Bot, update: telegram.Update):
|
|
j = requests.get("https://dog.ceo/api/breed/pug/images/random").json()
|
|
reply(bot, update, strings.PUG.HERE_HAVE_A_PUG, disable_web_page_preview=False, image_url=j["message"])
|
|
|
|
|
|
def process(arg_discord_connection):
|
|
if arg_discord_connection is not None:
|
|
global discord_connection
|
|
discord_connection = arg_discord_connection
|
|
logger.info("Creating updater...")
|
|
u = Updater(config["Telegram"]["bot_token"])
|
|
logger.info("Registering handlers...")
|
|
u.dispatcher.add_handler(CommandHandler("ping", cmd_ping))
|
|
u.dispatcher.add_handler(CommandHandler("link", cmd_link))
|
|
u.dispatcher.add_handler(CommandHandler("discord", cmd_cv))
|
|
u.dispatcher.add_handler(CommandHandler("cv", cmd_cv))
|
|
u.dispatcher.add_handler(CommandHandler("cast", cmd_cast))
|
|
u.dispatcher.add_handler(CommandHandler("color", cmd_color))
|
|
u.dispatcher.add_handler(CommandHandler("smecds", cmd_smecds))
|
|
u.dispatcher.add_handler(CommandHandler("ciaoruozi", cmd_ciaoruozi))
|
|
u.dispatcher.add_handler(CommandHandler("ahnonlosoio", cmd_ahnonlosoio))
|
|
u.dispatcher.add_handler(CommandHandler("balurage", cmd_balurage))
|
|
u.dispatcher.add_handler(CommandHandler("diario", cmd_diario))
|
|
u.dispatcher.add_handler(CommandHandler("spaggia", cmd_diario))
|
|
u.dispatcher.add_handler(CommandHandler("vote", cmd_vote))
|
|
u.dispatcher.add_handler(CommandHandler("eat", cmd_eat))
|
|
u.dispatcher.add_handler(CommandHandler("ship", cmd_ship))
|
|
u.dispatcher.add_handler(CommandHandler("bridge", cmd_bridge))
|
|
u.dispatcher.add_handler(CommandHandler("newevent", cmd_newevent))
|
|
u.dispatcher.add_handler(CommandHandler("calendar", cmd_calendar))
|
|
u.dispatcher.add_handler(CommandHandler("markov", cmd_markov))
|
|
u.dispatcher.add_handler(CommandHandler("dndmarkov", cmd_dndmarkov))
|
|
u.dispatcher.add_handler(CommandHandler("roll", cmd_roll))
|
|
u.dispatcher.add_handler(CommandHandler("r", cmd_roll))
|
|
u.dispatcher.add_handler(CommandHandler("mm", cmd_mm))
|
|
u.dispatcher.add_handler(CommandHandler("matchmaking", cmd_mm))
|
|
u.dispatcher.add_handler(CommandHandler("search", cmd_search))
|
|
u.dispatcher.add_handler(CommandHandler("regex", cmd_regex))
|
|
u.dispatcher.add_handler(CommandHandler("start", cmd_start))
|
|
u.dispatcher.add_handler(CommandHandler("spell", cmd_spell))
|
|
u.dispatcher.add_handler(CommandHandler("emojify", cmd_emojify))
|
|
u.dispatcher.add_handler(CommandHandler("pug", cmd_pug))
|
|
u.dispatcher.add_handler(CallbackQueryHandler(on_callback_query))
|
|
logger.info("Handlers registered.")
|
|
u.start_polling()
|
|
logger.info("Polling started.")
|
|
u.idle()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
process(None)
|