1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00
royalnet/telegrambot.py

881 lines
36 KiB
Python
Raw Normal View History

2018-01-25 14:24:17 +00:00
import datetime
2017-11-11 17:55:13 +00:00
import random
2018-07-23 17:34:41 +00:00
import typing
2017-10-30 09:46:37 +00:00
import db
2019-02-26 15:24:28 +00:00
from utils import smecds, cast, errors, emojify, reply_msg
2019-01-02 20:10:54 +00:00
# python-telegram-bot has a different name
# noinspection PyPackageRequirements
import telegram
2019-01-02 20:10:54 +00:00
# noinspection PyPackageRequirements
2018-03-14 10:53:26 +00:00
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler
2019-02-08 22:17:44 +00:00
# noinspection PyPackageRequirements
2019-02-01 19:05:21 +00:00
from telegram.error import TimedOut, Unauthorized, BadRequest, TelegramError
2018-09-11 00:02:09 +00:00
import dice
2018-09-13 16:38:21 +00:00
import sys
2018-04-09 17:56:43 +00:00
import os
2018-05-31 20:14:50 +00:00
import re
2018-08-08 19:59:47 +00:00
import logging
import configparser
import markovify
2018-09-13 16:38:21 +00:00
import raven
2018-09-17 22:26:01 +00:00
import coloredlogs
2019-01-25 14:28:47 +00:00
import strings
import time
import requests
2019-02-08 22:17:44 +00:00
IKMarkup = telegram.InlineKeyboardMarkup
IKButton = telegram.InlineKeyboardButton
2018-08-08 19:59:47 +00:00
2019-02-15 12:35:51 +00:00
# Markov models
2018-08-08 19:59:47 +00:00
try:
2019-02-15 12:35:51 +00:00
with open("markovmodels/default.json") as file:
default_model = markovify.Text.from_json(file.read())
2018-08-08 19:59:47 +00:00
except Exception:
2019-02-15 12:35:51 +00:00
default_model = None
try:
with open("markovmodels/dnd4.json") as file:
dnd4_model = markovify.Text.from_json(file.read())
except Exception:
dnd4_model = None
2018-08-08 19:59:47 +00:00
2018-09-18 22:02:39 +00:00
logging.getLogger().disabled = True
2018-08-16 17:46:07 +00:00
logger = logging.getLogger(__name__)
2018-09-18 22:13:41 +00:00
os.environ["COLOREDLOGS_LOG_FORMAT"] = "%(asctime)s %(levelname)s %(name)s %(message)s"
2018-09-17 22:26:01 +00:00
coloredlogs.install(level="DEBUG", logger=logger)
2017-10-30 09:46:37 +00:00
# Init the config reader
config = configparser.ConfigParser()
config.read("config.ini")
main_group_id = int(config["Telegram"]["main_group"])
2017-10-30 09:46:37 +00:00
2017-11-11 17:55:13 +00:00
discord_connection = None
2018-09-13 16:38:21 +00:00
# Init the Sentry client
sentry = raven.Client(config["Sentry"]["token"],
release=raven.fetch_git_sha(os.path.dirname(__file__)),
install_logging_hook=False,
hook_libraries=[])
def reply(bot: telegram.Bot, update: telegram.Update, string: str, ignore_escaping=False, disable_web_page_preview=True, **kwargs) -> telegram.Message:
while True:
try:
return reply_msg(bot, update.message.chat.id, string, ignore_escaping=ignore_escaping, disable_web_page_preview=disable_web_page_preview, **kwargs)
except Unauthorized:
if update.message.chat.type == telegram.Chat.PRIVATE:
return reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.UNAUTHORIZED_USER,
mention=update.message.from_user.mention_html())
else:
return reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.UNAUTHORIZED_GROUP,
group=(update.message.chat.title or update.message.chat.id))
except TimedOut:
time.sleep(1)
pass
# noinspection PyUnresolvedReferences
def command(func: "function"):
def new_func(bot: telegram.Bot, update: telegram.Update):
2018-09-13 16:38:21 +00:00
# noinspection PyBroadException
try:
bot.send_chat_action(update.message.chat.id, telegram.ChatAction.TYPING)
return func(bot, update)
2019-01-25 14:28:47 +00:00
except TimedOut:
logger.warning(f"Telegram timed out in {update}")
2018-09-13 16:38:21 +00:00
except Exception:
2018-09-13 21:11:56 +00:00
logger.error(f"Critical error: {sys.exc_info()}")
# noinspection PyBroadException
try:
reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.CRITICAL_ERROR,
2019-02-15 18:42:02 +00:00
exc_info=repr(sys.exc_info()))
2018-09-13 21:11:56 +00:00
except Exception:
logger.error(f"Double critical error: {sys.exc_info()}")
2019-01-23 22:11:17 +00:00
sentry.user_context({
"id": update.effective_user.id,
"telegram": {
"username": update.effective_user.username,
"first_name": update.effective_user.first_name,
"last_name": update.effective_user.last_name
}
})
sentry.extra_context({
"update": update.to_dict()
})
sentry.captureException()
return new_func
2019-02-10 01:37:34 +00:00
# noinspection PyUnresolvedReferences
def database_access(func: "function"):
def new_func(bot: telegram.Bot, update: telegram.Update):
try:
session = db.Session()
return func(bot, update, session)
except Exception:
logger.error(f"Database error: {sys.exc_info()}")
sentry.captureException()
finally:
try:
session.close()
except Exception:
pass
2018-09-13 21:11:56 +00:00
return new_func
@command
def cmd_ping(bot: telegram.Bot, update: telegram.Update):
reply(bot, update, strings.PONG)
2019-01-02 21:19:25 +00:00
@command
@database_access
def cmd_link(bot: telegram.Bot, update: telegram.Update, session: db.Session):
2017-10-30 09:46:37 +00:00
try:
username = update.message.text.split(" ", 1)[1]
except IndexError:
reply(bot, update, strings.LINK.ERRORS.INVALID_SYNTAX)
session.close()
return
try:
t = db.Telegram.create(session,
royal_username=username,
telegram_user=update.message.from_user)
except errors.NotFoundError:
reply(bot, update, strings.LINK.ERRORS.NOT_FOUND)
session.close()
return
except errors.AlreadyExistingError:
reply(bot, update, strings.LINK.ERRORS.ALREADY_EXISTING)
2018-01-25 14:24:17 +00:00
session.close()
return
session.add(t)
session.commit()
reply(bot, update, strings.LINK.SUCCESS)
2017-10-30 09:46:37 +00:00
2017-10-30 12:45:38 +00:00
@command
def cmd_cv(bot: telegram.Bot, update: telegram.Update):
2018-01-20 14:28:04 +00:00
if discord_connection is None:
2019-02-10 14:55:39 +00:00
reply(bot, update, strings.BRIDGE.ERRORS.INACTIVE_BRIDGE)
2018-01-20 14:28:04 +00:00
return
2018-12-29 00:26:30 +00:00
# dirty hack as usual
if update.message.text.endswith("full"):
discord_connection.send("get cv full")
else:
discord_connection.send("get cv")
2018-08-28 13:48:12 +00:00
message = discord_connection.recv()
2019-02-10 13:50:18 +00:00
reply(bot, update, message)
2017-10-30 12:45:38 +00:00
@command
2019-02-15 18:42:02 +00:00
def cmd_cast(bot: telegram.Bot, update: telegram.Update):
reply(bot, update, strings.CAST.ERRORS.NOT_YET_AVAILABLE)
2018-01-25 14:24:17 +00:00
@command
def cmd_color(bot: telegram.Bot, update: telegram.Update):
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.COLOR)
2018-01-25 14:24:17 +00:00
@command
def cmd_smecds(bot: telegram.Bot, update: telegram.Update):
2019-02-15 12:43:44 +00:00
ds = random.sample(smecds, 1)[0]
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.SMECDS, ds=ds)
2018-01-25 14:24:17 +00:00
@command
def cmd_ciaoruozi(bot: telegram.Bot, update: telegram.Update):
2018-01-25 14:24:17 +00:00
if update.message.from_user.username.lstrip("@") == "MeStakes":
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.CIAORUOZI.THE_LEGEND_HIMSELF)
2018-01-25 14:24:17 +00:00
else:
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.CIAORUOZI.SOMEBODY_ELSE)
2018-01-25 14:24:17 +00:00
@command
def cmd_ahnonlosoio(bot: telegram.Bot, update: telegram.Update):
2019-01-02 20:10:54 +00:00
if update.message.reply_to_message is not None and update.message.reply_to_message.text in [
2019-02-10 01:29:11 +00:00
"/ahnonlosoio", "/ahnonlosoio@royalgamesbot", strings.AHNONLOSOIO.ONCE, strings.AHNONLOSOIO.AGAIN
2019-01-02 20:10:54 +00:00
]:
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.AHNONLOSOIO.AGAIN)
2018-01-25 14:24:17 +00:00
else:
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.AHNONLOSOIO.ONCE)
2018-01-25 14:24:17 +00:00
@command
@database_access
def cmd_balurage(bot: telegram.Bot, update: telegram.Update, session: db.Session):
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.LINK.ERRORS.ROYALNET_NOT_LINKED)
return
2018-01-25 14:24:17 +00:00
try:
reason = update.message.text.split(" ", 1)[1]
except IndexError:
reason = None
br = db.BaluRage(royal_id=user.royal_id, reason=reason)
session.add(br)
session.commit()
bot.send_message(update.message.chat.id, f"😡 Stai sfogando la tua ira sul bot!")
2018-01-25 14:24:17 +00:00
2019-03-05 09:18:22 +00:00
def parse_diario(session: db.Session, text: str):
2019-03-05 09:30:09 +00:00
match = re.match(r'"?([^"]+)"? (?:—|-{1,2}) ?@?([A-Za-z0-9_]+)$', text)
2019-03-05 09:18:22 +00:00
if match is None:
2019-03-05 09:28:21 +00:00
return None, text
2019-03-05 09:18:22 +00:00
text_string = match.group(1)
author_string = match.group(2).lower()
royal = session.query(db.Royal).filter(db.func.lower(db.Royal.username) == author_string).first()
if royal is not None:
author = royal.telegram[0]
return author, text_string
author = session.query(db.Telegram).filter(db.func.lower(db.Telegram.username) == author_string).first()
if author is not None:
2019-03-05 09:18:22 +00:00
return author, text_string
return None, text_string
@command
@database_access
def cmd_diario(bot: telegram.Bot, update: telegram.Update, session: db.Session):
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.LINK.ERRORS.ROYALNET_NOT_LINKED)
return
2018-01-25 14:24:17 +00:00
try:
text = update.message.text.split(" ", 1)[1]
saver = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
2019-03-05 09:18:22 +00:00
author, actual_text = parse_diario(session, text)
except IndexError:
if update.message.reply_to_message is None:
reply(bot, update, strings.DIARIO.ERRORS.INVALID_SYNTAX)
2018-01-25 14:24:17 +00:00
return
text = update.message.reply_to_message.text
if update.message.forward_from:
author = session.query(db.Telegram) \
.filter_by(telegram_id=update.message.forward_from.id) \
.one_or_none()
else:
author = session.query(db.Telegram)\
.filter_by(telegram_id=update.message.reply_to_message.from_user.id)\
.one_or_none()
saver = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if text is None:
reply(bot, update, strings.DIARIO.ERRORS.NO_TEXT)
return
diario = db.Diario(timestamp=datetime.datetime.now(),
saver=saver,
author=author,
2019-03-05 09:18:22 +00:00
text=actual_text)
session.add(diario)
session.commit()
reply(bot, update, strings.DIARIO.SUCCESS, ignore_escaping=True, diario=diario.to_telegram())
2017-11-11 17:55:13 +00:00
@command
@database_access
def cmd_vote(bot: telegram.Bot, update: telegram.Update, session: db.Session):
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
bot.send_message(update.message.chat.id,
"⚠ Il tuo account Telegram non è registrato al RYGdb!"
" Registrati con `/register@royalgamesbot <nomeutenteryg>`.", parse_mode="Markdown")
return
2018-03-14 10:53:26 +00:00
try:
_, mode, question = update.message.text.split(" ", 2)
except IndexError:
bot.send_message(update.message.chat.id,
"⚠ Non hai specificato tutti i parametri necessari!"
"Sintassi: `/vote@royalgamesbot <public|secret> <domanda>`", parse_mode="Markdown")
return
if mode == "public":
vote = db.VoteQuestion(question=question, anonymous=False)
elif mode == "secret":
vote = db.VoteQuestion(question=question, anonymous=True)
else:
bot.send_message(update.message.chat.id,
"⚠ Non hai specificato una modalità valida!"
"Sintassi: `/vote@royalgamesbot <public|secret> <domanda>`", parse_mode="Markdown")
return
session.add(vote)
session.flush()
inline_keyboard = IKMarkup([[IKButton("🔵 Sì", callback_data="vote_yes")],
[IKButton("🔴 No", callback_data="vote_no")],
[IKButton("⚫️ Astieniti", callback_data="vote_abstain")]])
message = bot.send_message(update.message.chat.id, vote.generate_text(session=session),
reply_markup=inline_keyboard,
parse_mode="HTML")
vote.message_id = message.message_id
session.commit()
2019-01-23 22:11:17 +00:00
2019-01-28 22:37:01 +00:00
2019-02-01 18:46:00 +00:00
def generate_search_message(term, entries):
msg = strings.DIARIOSEARCH.HEADER.format(term=term)
2019-02-01 19:05:21 +00:00
if len(entries) < 100:
for entry in entries[:5]:
msg += f'<a href="https://ryg.steffo.eu/diario#entry-{entry.id}">#{entry.id}</a> di <i>{entry.author or "Anonimo"}</i>\n{entry.text}\n\n'
2019-02-01 18:46:00 +00:00
if len(entries) > 5:
msg += "I termini comapiono anche nelle righe:\n"
for entry in entries[5:]:
msg += f'<a href="https://ryg.steffo.eu/diario#entry-{entry.id}">#{entry.id}</a> '
2019-02-01 19:05:21 +00:00
else:
for entry in entries[:100]:
msg += f'<a href="https://ryg.steffo.eu/diario#entry-{entry.id}">#{entry.id}</a> '
for entry in entries[100:]:
msg += f"#{entry.id} "
2019-02-01 18:46:00 +00:00
return msg
@command
@database_access
def cmd_search(bot: telegram.Bot, update: telegram.Update, session: db.Session):
2019-01-28 21:15:19 +00:00
try:
query = update.message.text.split(" ", 1)[1]
except IndexError:
reply(bot, update, strings.DIARIOSEARCH.ERRORS.INVALID_SYNTAX, command="search")
return
query = query.replace('%', '\\%').replace('_', '\\_')
entries = session.query(db.Diario)\
.filter(db.Diario.text.op("~*")(r"(?:[\s\.,:;!?\"'<{([]+|^)"
+ query +
r"(?:[\s\.,:;!?\"'>\})\]]+|$)"))\
.order_by(db.Diario.id.desc())\
.all()
bot.send_message(update.message.chat.id, generate_search_message(f"<b>{query}</b>", entries), parse_mode="HTML")
@command
@database_access
def cmd_regex(bot: telegram.Bot, update: telegram.Update, session: db.Session):
try:
query = update.message.text.split(" ", 1)[1]
except IndexError:
reply(bot, update, strings.DIARIOSEARCH.ERRORS.INVALID_SYNTAX, command="regex")
return
query = query.replace('%', '\\%').replace('_', '\\_')
entries = session.query(db.Diario).filter(db.Diario.text.op("~*")(query)).order_by(db.Diario.id.desc()).all()
try:
bot.send_message(update.message.chat.id, generate_search_message(f"<code>{query}</code>", entries), parse_mode="HTML")
except (BadRequest, TelegramError):
reply(bot, update, strings.DIARIOSEARCH.ERRORS.RESULTS_TOO_LONG)
2019-01-28 22:37:01 +00:00
@command
@database_access
def cmd_mm(bot: telegram.Bot, update: telegram.Update, session: db.Session):
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
2019-02-10 01:29:11 +00:00
reply(bot, update, strings.LINK.ERRORS.ROYALNET_NOT_LINKED)
return
2019-02-10 01:37:34 +00:00
match = re.match(r"/(?:mm|matchmaking)(?:@royalgamesbot)?(?: (?:([0-9]+)-)?([0-9]+))? (?:per )?([A-Za-z0-9!\-_. ]+)(?:.*\n(.+))?",
update.message.text)
if match is None:
reply(bot, update, strings.MATCHMAKING.ERRORS.INVALID_SYNTAX)
return
min_players, max_players, match_name, match_desc = match.group(1, 2, 3, 4)
db_match = db.Match(timestamp=datetime.datetime.now(),
match_title=match_name,
match_desc=match_desc,
min_players=min_players,
max_players=max_players,
creator=user)
session.add(db_match)
session.flush()
inline_keyboard = IKMarkup([([IKButton(strings.MATCHMAKING.BUTTONS[key], callback_data=key)]) for key in strings.MATCHMAKING.BUTTONS])
message = bot.send_message(config["Telegram"]["announcement_group"], db_match.generate_text(session=session),
parse_mode="HTML",
reply_markup=inline_keyboard)
db_match.message_id = message.message_id
session.commit()
2018-03-14 10:53:26 +00:00
2019-02-10 13:46:06 +00:00
def on_callback_query(bot: telegram.Bot, update: telegram.Update):
try:
session = db.Session()
if update.callback_query.data.startswith("vote_"):
if update.callback_query.data == "vote_yes":
status = db.VoteChoices.YES
emoji = "🔵"
elif update.callback_query.data == "vote_no":
status = db.VoteChoices.NO
emoji = "🔴"
elif update.callback_query.data == "vote_abstain":
status = db.VoteChoices.ABSTAIN
emoji = "⚫️"
else:
raise NotImplementedError()
user = session.query(db.Telegram).filter_by(telegram_id=update.callback_query.from_user.id).one_or_none()
if user is None:
bot.answer_callback_query(update.callback_query.id, show_alert=True,
text=strings.LINK.ERRORS.ROYALNET_NOT_LINKED,
parse_mode="Markdown")
return
2019-02-10 13:46:06 +00:00
question = session.query(db.VoteQuestion)\
.filter_by(message_id=update.callback_query.message.message_id)\
.one()
answer = session.query(db.VoteAnswer).filter_by(question=question, user=user).one_or_none()
if answer is None:
answer = db.VoteAnswer(question=question, choice=status, user=user)
session.add(answer)
bot.answer_callback_query(update.callback_query.id, text=f"Hai votato {emoji}.", cache_time=1)
elif answer.choice == status:
session.delete(answer)
bot.answer_callback_query(update.callback_query.id, text=f"Hai ritratto il tuo voto.", cache_time=1)
2019-01-23 22:11:17 +00:00
else:
2019-02-10 13:46:06 +00:00
answer.choice = status
bot.answer_callback_query(update.callback_query.id, text=f"Hai cambiato il tuo voto in {emoji}.",
cache_time=1)
session.commit()
inline_keyboard = IKMarkup([[IKButton("🔵 Sì", callback_data="vote_yes")],
[IKButton("🔴 No", callback_data="vote_no")],
[IKButton("⚫️ Astieniti", callback_data="vote_abstain")]])
bot.edit_message_text(message_id=update.callback_query.message.message_id,
2019-02-10 13:46:06 +00:00
chat_id=update.callback_query.message.chat.id,
text=question.generate_text(session),
reply_markup=inline_keyboard,
parse_mode="HTML")
2019-02-10 13:46:06 +00:00
elif update.callback_query.data.startswith("match_"):
user = session.query(db.Telegram).filter_by(telegram_id=update.callback_query.from_user.id).one_or_none()
if user is None:
bot.answer_callback_query(update.callback_query.id,
show_alert=True,
text=strings.LINK.ERRORS.ROYALNET_NOT_LINKED,
parse_mode="Markdown")
return
match = session.query(db.Match).filter_by(message_id=update.callback_query.message.message_id).one()
if update.callback_query.data == "match_close":
2019-02-15 20:05:00 +00:00
if not (match.creator == user or user.telegram_id == 25167391):
2019-02-10 13:46:06 +00:00
bot.answer_callback_query(update.callback_query.id,
show_alert=True,
text=strings.MATCHMAKING.ERRORS.NOT_ADMIN)
return
match.closed = True
for partecipation in match.players:
if int(partecipation.status) >= 1:
2019-02-15 21:34:45 +00:00
try:
reply_msg(bot, partecipation.user.telegram_id, strings.MATCHMAKING.GAME_START[int(partecipation.status)], **match.format_dict())
2019-02-15 21:34:45 +00:00
except Unauthorized:
reply_msg(bot, main_group_id, strings.TELEGRAM.ERRORS.UNAUTHORIZED_USER,
mention=partecipation.user.mention())
2019-02-10 13:46:06 +00:00
elif update.callback_query.data == "match_cancel":
if not (match.creator == user or user.telegram_id == 25167391):
bot.answer_callback_query(update.callback_query.id,
show_alert=True,
text=strings.MATCHMAKING.ERRORS.NOT_ADMIN)
return
match.closed = True
status = {
"match_ready": db.MatchmakingStatus.READY,
"match_wait_for_me": db.MatchmakingStatus.WAIT_FOR_ME,
"match_maybe": db.MatchmakingStatus.MAYBE,
"match_ignore": db.MatchmakingStatus.IGNORED,
"match_close": None,
"match_cancel": None,
}.get(update.callback_query.data)
if status:
if match.closed:
bot.answer_callback_query(update.callback_query.id,
show_alert=True,
text=strings.MATCHMAKING.ERRORS.MATCH_CLOSED)
return
partecipation = session.query(db.MatchPartecipation).filter_by(match=match, user=user).one_or_none()
if partecipation is None:
partecipation = db.MatchPartecipation(match=match, status=status.value, user=user)
session.add(partecipation)
2019-02-10 13:46:06 +00:00
else:
partecipation.status = status.value
2019-02-10 13:46:06 +00:00
session.commit()
bot.answer_callback_query(update.callback_query.id,
text=strings.MATCHMAKING.TICKER_TEXT[update.callback_query.data],
cache_time=1)
if not match.closed:
inline_keyboard = IKMarkup([([IKButton(strings.MATCHMAKING.BUTTONS[key], callback_data=key)]) for key in strings.MATCHMAKING.BUTTONS])
else:
inline_keyboard = None
while True:
try:
bot.edit_message_text(message_id=update.callback_query.message.message_id,
chat_id=config["Telegram"]["announcement_group"],
text=match.generate_text(session),
reply_markup=inline_keyboard,
parse_mode="HTML")
break
except BadRequest:
break
except TimedOut:
time.sleep(1)
2019-02-10 13:46:06 +00:00
except Exception:
2019-02-15 21:21:41 +00:00
try:
bot.answer_callback_query(update.callback_query.id,
show_alert=True,
text=strings.TELEGRAM.ERRORS.CRITICAL_ERROR_QUERY)
except Exception:
pass
logger.error(f"Critical error: {sys.exc_info()}")
2019-02-10 13:46:06 +00:00
sentry.user_context({
"id": update.effective_user.id,
"telegram": {
"username": update.effective_user.username,
"first_name": update.effective_user.first_name,
"last_name": update.effective_user.last_name
}
})
sentry.extra_context({
"update": update.to_dict()
})
sentry.captureException()
finally:
2019-02-10 13:48:00 +00:00
try:
2019-02-10 13:48:38 +00:00
# noinspection PyUnboundLocalVariable
2019-02-10 13:48:00 +00:00
session.close()
except Exception:
pass
2018-03-14 10:53:26 +00:00
@command
def cmd_eat(bot: telegram.Bot, update: telegram.Update):
try:
food: str = update.message.text.split(" ", 1)[1].capitalize()
except IndexError:
reply(bot, update, strings.EAT.ERRORS.INVALID_SYNTAX)
return
2019-02-10 14:55:39 +00:00
reply(bot, update, strings.EAT.FOODS.get(food.lower(), strings.EAT.FOODS["_default"]), food=food)
@command
def cmd_ship(bot: telegram.Bot, update: telegram.Update):
2018-05-31 20:14:50 +00:00
try:
2019-02-21 01:01:15 +00:00
names = update.message.text.split(" ")
name_one = names[1]
name_two = names[2]
except ValueError:
reply(bot, update, strings.SHIP.ERRORS.INVALID_SYNTAX)
2018-05-31 20:14:50 +00:00
return
name_one = name_one.lower()
name_two = name_two.lower()
2019-02-21 01:01:15 +00:00
match_one = re.search(r"^[A-Za-z][^aeiouAEIOU]*[aeiouAEIOU]?", name_one)
if match_one is None:
part_one = name_one[:int(len(name_one) / 2)]
else:
part_one = match_one.group(0)
match_two = re.search(r"[^aeiouAEIOU]*[aeiouAEIOU]?[A-Za-z]$", name_two)
if match_two is None:
part_two = name_two[int(len(name_two) / 2):]
else:
part_two = match_two.group(0)
mixed = part_one + part_two # TODO: find out what exceptions this could possibly raise
reply(bot, update, strings.SHIP.RESULT,
one=name_one.capitalize(),
two=name_two.capitalize(),
result=mixed.capitalize())
2018-05-31 20:14:50 +00:00
@command
def cmd_bridge(bot: telegram.Bot, update: telegram.Update):
2019-02-10 14:55:39 +00:00
if discord_connection is None:
reply(bot, update, strings.BRIDGE.ERRORS.INACTIVE_BRIDGE)
return
2018-06-13 22:10:57 +00:00
try:
data = update.message.text.split(" ", 1)[1]
except IndexError:
2019-02-10 14:55:39 +00:00
reply(bot, update, strings.BRIDGE.ERRORS.INVALID_SYNTAX)
return
2018-06-13 22:10:57 +00:00
discord_connection.send(f"!{data}")
result = discord_connection.recv()
if result == "error":
2019-02-10 14:55:39 +00:00
reply(bot, update, strings.BRIDGE.FAILURE)
2018-06-13 22:10:57 +00:00
if result == "success":
2019-02-10 14:55:39 +00:00
reply(bot, update, strings.BRIDGE.SUCCESS)
2018-06-13 22:10:57 +00:00
2018-07-23 17:34:41 +00:00
def parse_timestring(timestring: str) -> typing.Union[datetime.timedelta, datetime.datetime]:
# Unix time
try:
unix_timestamp = float(timestring)
return datetime.datetime.fromtimestamp(unix_timestamp)
except ValueError:
pass
# Dashed time
try:
split_date = timestring.split("-")
now = datetime.datetime.now()
if len(split_date) == 5:
# yyyy-mm-dd-hh-mm
2018-07-24 17:31:33 +00:00
return datetime.datetime(year=int(split_date[0]),
month=int(split_date[1]),
day=int(split_date[2]),
hour=int(split_date[3]),
minute=int(split_date[4]))
2018-07-23 17:34:41 +00:00
elif len(split_date) == 4:
return now.replace(month=int(split_date[0]),
day=int(split_date[1]),
hour=int(split_date[2]),
minute=int(split_date[3]))
elif len(split_date) == 3:
return now.replace(day=int(split_date[0]),
hour=int(split_date[1]),
minute=int(split_date[2]))
elif len(split_date) == 2:
return now.replace(hour=int(split_date[0]),
minute=int(split_date[1]))
except (IndexError, ValueError):
pass
# Simple time from now
try:
if timestring.endswith("w"):
return datetime.timedelta(weeks=float(timestring[:-1]))
elif timestring.endswith("d"):
return datetime.timedelta(days=float(timestring[:-1]))
elif timestring.endswith("h"):
return datetime.timedelta(hours=float(timestring[:-1]))
elif timestring.endswith("m"):
return datetime.timedelta(minutes=float(timestring[:-1]))
except Exception:
pass
# Nothing found
raise ValueError("Nothing was found.")
@command
2019-02-26 20:06:33 +00:00
@database_access
def cmd_newevent(bot: telegram.Bot, update: telegram.Update, session: db.Session):
2018-07-23 17:34:41 +00:00
try:
_, timestring, name_desc = update.message.text.split(" ", 2)
2018-09-14 22:51:25 +00:00
except ValueError:
2018-07-23 17:34:41 +00:00
bot.send_message(update.message.chat.id, "⚠️ Sintassi del comando non valida.\n"
"Sintassi corretta:\n"
"```/newevent <timestamp|[[[anno-]mese-]giorno-]ore-minuti"
"|{numero}{w|d|h|m}> <nome>\n"
2018-09-13 21:36:56 +00:00
"[descrizione]```", parse_mode="Markdown")
2018-07-23 17:34:41 +00:00
return
try:
2018-07-24 17:29:34 +00:00
name, description = name_desc.split("\n", 1)
2018-09-14 22:51:25 +00:00
except ValueError:
2018-07-23 17:34:41 +00:00
name = name_desc
description = None
# Parse the timestring
try:
parsed_time = parse_timestring(timestring)
2019-01-03 17:01:43 +00:00
if parsed_time < datetime.datetime.now():
2019-01-03 17:13:05 +00:00
raise errors.PastDateError()
2018-07-23 17:34:41 +00:00
except ValueError:
bot.send_message(update.message.chat.id, "⚠ Non è stato possibile leggere la data.\n"
"Sintassi corretta:\n"
"```/newevent <timestamp|[[[anno-]mese-]giorno-]ore-minuti"
"|{numero}{w|d|h|m}> <nome>\n"
2018-09-13 21:36:56 +00:00
"[descrizione]```", parse_mode="Markdown")
2018-07-23 17:34:41 +00:00
return
2019-01-03 17:13:05 +00:00
except errors.PastDateError:
2019-01-03 17:01:43 +00:00
bot.send_message(update.message.chat.id, "⚠ La data inserita è una data passata.\n"
"per favore inserisci una data futura.\n", parse_mode="Markdown")
return
2018-07-23 17:34:41 +00:00
# Create the event
2019-01-02 20:10:54 +00:00
telegram_user = session.query(db.Telegram)\
.filter_by(telegram_id=update.message.from_user.id)\
.join(db.Royal)\
.one_or_none()
2018-07-23 17:34:41 +00:00
event = db.Event(author=telegram_user.royal,
name=name,
description=description,
time=datetime.datetime.fromtimestamp(0))
# Set the time
if isinstance(parsed_time, datetime.datetime):
event.time = parsed_time
else:
event.time_left = parsed_time
# Save the event
session.add(event)
session.commit()
bot.send_message(update.message.chat.id, "✅ Evento aggiunto al Calendario Royal Games!")
@command
@database_access
def cmd_calendar(bot: telegram.Bot, update: telegram.Update, session: db.Session):
2018-07-24 17:23:19 +00:00
next_events = session.query(db.Event).filter(db.Event.time > datetime.datetime.now()).order_by(db.Event.time).all()
msg = "📆 Prossimi eventi\n"
for event in next_events:
if event.time_left.days >= 1:
msg += event.time.strftime('%Y-%m-%d %H:%M')
else:
msg += f"{int(event.time_left.total_seconds() // 3600)}h" \
f" {int((event.time_left.total_seconds() % 3600) // 60)}m"
msg += f" <b>{event.name}</b>\n"
msg += '\nPer ulteriori dettagli, visita <a href="https://ryg.steffo.eu">Royalnet</a>'
bot.send_message(update.message.chat.id, msg, parse_mode="HTML", disable_web_page_preview=True)
@command
def cmd_markov(bot: telegram.Bot, update: telegram.Update):
2019-02-15 12:35:51 +00:00
if default_model is None:
reply(bot, update, strings.MARKOV.ERRORS.NO_MODEL)
return
try:
first_word = update.message.text.split(" ")[1]
except IndexError:
# Any word
sentence = default_model.make_sentence(tries=1000)
if sentence is None:
reply(bot, update, strings.MARKOV.ERRORS.GENERATION_FAILED)
return
reply(bot, update, sentence)
return
# Specific word
try:
sentence = default_model.make_sentence_with_start(first_word, tries=1000)
except KeyError:
reply(bot, update, strings.MARKOV.ERRORS.MISSING_WORD)
return
if sentence is None:
reply(bot, update, strings.MARKOV.ERRORS.SPECIFIC_WORD_FAILED)
return
reply(bot, update, sentence)
@command
def cmd_dndmarkov(bot: telegram.Bot, update: telegram.Update):
if dnd4_model is None:
2019-02-10 01:37:34 +00:00
reply(bot, update, strings.MARKOV.ERRORS.NO_MODEL)
2018-08-08 19:59:47 +00:00
return
try:
2019-01-28 19:45:31 +00:00
first_word = update.message.text.split(" ")[1]
except IndexError:
# Any word
2019-02-15 12:35:51 +00:00
sentence = dnd4_model.make_sentence(tries=1000)
2018-08-08 19:59:47 +00:00
if sentence is None:
2019-02-10 01:37:34 +00:00
reply(bot, update, strings.MARKOV.ERRORS.GENERATION_FAILED)
2018-08-08 19:59:47 +00:00
return
2019-02-10 01:37:34 +00:00
reply(bot, update, sentence)
2019-01-28 19:45:31 +00:00
return
# Specific word
try:
2019-02-15 12:35:51 +00:00
sentence = dnd4_model.make_sentence_with_start(first_word, tries=1000)
2019-01-28 19:45:31 +00:00
except KeyError:
2019-02-10 01:37:34 +00:00
reply(bot, update, strings.MARKOV.ERRORS.MISSING_WORD)
2019-01-28 19:45:31 +00:00
return
if sentence is None:
2019-02-10 01:37:34 +00:00
reply(bot, update, strings.MARKOV.ERRORS.SPECIFIC_WORD_FAILED)
2019-01-28 19:45:31 +00:00
return
2019-02-10 01:37:34 +00:00
reply(bot, update, sentence)
2018-08-08 19:59:47 +00:00
2019-02-10 15:40:29 +00:00
def exec_roll(roll) -> str:
result = int(roll.evaluate())
string = ""
if isinstance(roll, dice.elements.Dice):
string += f"<b>{result}</b>"
else:
for index, operand in enumerate(roll.original_operands):
if operand != roll.operands[index]:
string += f"<i>{roll.operands[index]}</i>"
else:
string += f"{operand}"
if index + 1 != len(roll.original_operands):
string += strings.ROLL.SYMBOLS[roll.__class__]
string += f"=<b>{result}</b>"
return string
@command
def cmd_roll(bot: telegram.Bot, update: telegram.Update):
2018-09-11 00:02:09 +00:00
dice_string = update.message.text.split(" ", 1)[1]
try:
2019-02-10 15:40:29 +00:00
roll = dice.roll(f"{dice_string}", raw=True)
2018-09-11 00:02:09 +00:00
except dice.DiceBaseException:
2019-02-10 13:40:20 +00:00
reply(bot, update, strings.ROLL.ERRORS.INVALID_SYNTAX)
2018-09-11 00:02:09 +00:00
return
2019-02-10 15:40:29 +00:00
try:
result = exec_roll(roll)
except dice.DiceFatalException:
reply(bot, update, strings.ROLL.ERRORS.DICE_ERROR)
return
reply(bot, update, strings.ROLL.SUCCESS, result=result, ignore_escaping=True)
2018-09-11 00:02:09 +00:00
@command
def cmd_start(bot: telegram.Bot, update: telegram.Update):
reply(bot, update, strings.TELEGRAM.BOT_STARTED)
2019-02-01 18:21:46 +00:00
2019-02-15 18:42:02 +00:00
@command
def cmd_spell(bot: telegram.Bot, update: telegram.Update):
try:
2019-02-19 18:37:17 +00:00
spell_name: str = update.message.text.split(" ", 1)[1]
2019-02-15 18:42:02 +00:00
except IndexError:
reply(bot, update, strings.SPELL.ERRORS.INVALID_SYNTAX)
return
2019-02-19 18:37:17 +00:00
spell = cast.Spell(spell_name)
2019-02-15 18:42:02 +00:00
reply(bot, update, spell.stringify())
2019-02-19 18:37:17 +00:00
@command
def cmd_emojify(bot: telegram.Bot, update: telegram.Update):
try:
string: str = update.message.text.split(" ", 1)[1]
except IndexError:
reply(bot, update, strings.EMOJIFY.ERRORS.INVALID_SYNTAX)
return
msg = emojify(string)
reply(bot, update, strings.EMOJIFY.RESPONSE, emojified=msg)
@command
def cmd_pug(bot: telegram.Bot, update: telegram.Update):
2019-02-22 01:05:49 +00:00
if update.effective_chat.type != telegram.Chat.PRIVATE:
reply(bot, update, strings.PUG.ERRORS.PRIVATE_CHAT_ONLY)
return
j = requests.get("https://dog.ceo/api/breed/pug/images/random").json()
reply(bot, update, strings.PUG.HERE_HAVE_A_PUG, disable_web_page_preview=False, image_url=j["message"])
2017-11-11 17:55:13 +00:00
def process(arg_discord_connection):
2018-01-20 14:28:04 +00:00
if arg_discord_connection is not None:
global discord_connection
discord_connection = arg_discord_connection
2018-09-11 00:02:09 +00:00
logger.info("Creating updater...")
2017-10-30 12:45:38 +00:00
u = Updater(config["Telegram"]["bot_token"])
2018-09-11 00:02:09 +00:00
logger.info("Registering handlers...")
2019-01-02 21:19:25 +00:00
u.dispatcher.add_handler(CommandHandler("ping", cmd_ping))
2019-02-26 21:46:32 +00:00
u.dispatcher.add_handler(CommandHandler("pong", cmd_ping))
u.dispatcher.add_handler(CommandHandler("link", cmd_link))
u.dispatcher.add_handler(CommandHandler("discord", cmd_cv))
u.dispatcher.add_handler(CommandHandler("cv", cmd_cv))
2017-11-11 17:55:13 +00:00
u.dispatcher.add_handler(CommandHandler("cast", cmd_cast))
2018-01-25 14:24:17 +00:00
u.dispatcher.add_handler(CommandHandler("color", cmd_color))
2019-02-26 21:46:32 +00:00
u.dispatcher.add_handler(CommandHandler("error", cmd_color))
2018-01-25 14:24:17 +00:00
u.dispatcher.add_handler(CommandHandler("smecds", cmd_smecds))
u.dispatcher.add_handler(CommandHandler("ciaoruozi", cmd_ciaoruozi))
u.dispatcher.add_handler(CommandHandler("ahnonlosoio", cmd_ahnonlosoio))
u.dispatcher.add_handler(CommandHandler("balurage", cmd_balurage))
u.dispatcher.add_handler(CommandHandler("diario", cmd_diario))
2018-11-08 11:46:53 +00:00
u.dispatcher.add_handler(CommandHandler("spaggia", cmd_diario))
2019-02-26 21:46:32 +00:00
u.dispatcher.add_handler(CommandHandler("spaggio", cmd_diario))
2018-03-14 10:53:26 +00:00
u.dispatcher.add_handler(CommandHandler("vote", cmd_vote))
u.dispatcher.add_handler(CommandHandler("eat", cmd_eat))
2018-05-31 20:14:50 +00:00
u.dispatcher.add_handler(CommandHandler("ship", cmd_ship))
2018-06-13 22:10:57 +00:00
u.dispatcher.add_handler(CommandHandler("bridge", cmd_bridge))
2018-07-23 17:34:41 +00:00
u.dispatcher.add_handler(CommandHandler("newevent", cmd_newevent))
2018-07-24 17:23:19 +00:00
u.dispatcher.add_handler(CommandHandler("calendar", cmd_calendar))
2018-08-08 19:59:47 +00:00
u.dispatcher.add_handler(CommandHandler("markov", cmd_markov))
2019-02-15 12:35:51 +00:00
u.dispatcher.add_handler(CommandHandler("dndmarkov", cmd_dndmarkov))
2018-09-11 00:02:09 +00:00
u.dispatcher.add_handler(CommandHandler("roll", cmd_roll))
u.dispatcher.add_handler(CommandHandler("r", cmd_roll))
2019-01-23 22:11:17 +00:00
u.dispatcher.add_handler(CommandHandler("mm", cmd_mm))
u.dispatcher.add_handler(CommandHandler("matchmaking", cmd_mm))
2019-02-01 18:46:00 +00:00
u.dispatcher.add_handler(CommandHandler("search", cmd_search))
2019-02-01 18:23:30 +00:00
u.dispatcher.add_handler(CommandHandler("regex", cmd_regex))
2019-02-01 19:05:21 +00:00
u.dispatcher.add_handler(CommandHandler("start", cmd_start))
2019-02-15 18:42:02 +00:00
u.dispatcher.add_handler(CommandHandler("spell", cmd_spell))
2019-02-19 18:37:28 +00:00
u.dispatcher.add_handler(CommandHandler("emojify", cmd_emojify))
u.dispatcher.add_handler(CommandHandler("pug", cmd_pug))
2019-02-26 21:46:32 +00:00
u.dispatcher.add_handler(CommandHandler("carlino", cmd_pug))
u.dispatcher.add_handler(CommandHandler("carlini", cmd_pug))
2018-03-14 10:53:26 +00:00
u.dispatcher.add_handler(CallbackQueryHandler(on_callback_query))
2018-08-16 17:46:07 +00:00
logger.info("Handlers registered.")
2018-09-17 22:07:00 +00:00
u.start_polling()
logger.info("Polling started.")
u.idle()
2018-01-20 14:28:04 +00:00
2018-01-20 14:28:04 +00:00
if __name__ == "__main__":
2018-08-28 15:38:17 +00:00
process(None)