diff --git a/bots.py b/bots.py index b3bdd464..99b03571 100644 --- a/bots.py +++ b/bots.py @@ -21,4 +21,4 @@ if __name__ == "__main__": del telegram telegram = multiprocessing.Process(target=telegrambot.process, args=(discord_telegram_pipe[1],), daemon=True) telegram.start() - time.sleep(60) \ No newline at end of file + time.sleep(60) diff --git a/db.py b/db.py index ef17bcd3..4034b9d0 100644 --- a/db.py +++ b/db.py @@ -1,7 +1,7 @@ import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship -from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, UniqueConstraint +from sqlalchemy import Column, BigInteger, Integer, String, DateTime, ForeignKey, Float, Enum, create_engine, UniqueConstraint, PrimaryKeyConstraint, Boolean, or_ import requests from errors import RequestError, NotFoundError, AlreadyExistingError import re @@ -66,11 +66,17 @@ class Telegram(Base): username=telegram_user.username) def __repr__(self): - return f"" + return f"" + + def mention(self): + if self.username is not None: + return f"@{self.username}" + else: + return self.first_name def __str__(self): if self.username is not None: - return f"@{self.username}" + return self.username elif self.last_name is not None: return f"{self.first_name} {self.last_name}" else: @@ -672,8 +678,75 @@ class PlayedMusic(Base): def __repr__(self): return f"" + +class VoteQuestion(Base): + __tablename__ = "votequestion" + + id = Column(Integer, primary_key=True) + message_id = Column(BigInteger) + question = Column(String, nullable=False) + anonymous = Column(Boolean, nullable=False) + open = Column(Boolean, default=True) + + def __repr__(self): + return f"" + + def generate_text(self, session: Session): + query = session.execute("SELECT * FROM telegram LEFT JOIN (SELECT voteanswer.question_id, voteanswer.user_id, voteanswer.choice FROM votequestion JOIN voteanswer ON votequestion.id = voteanswer.question_id WHERE votequestion.id = 8) answer ON telegram.telegram_id = answer.user_id ORDER BY answer.choice;") + text = f"{self.question}\n\n" + none, yes, no, abstain = 0, 0, 0, 0 + for record in query: + if record["username"] == "royalgamesbot": + continue + elif record["question_id"] is None: + text += "⚪️" + none += 1 + elif record["choice"] == "YES": + text += "🔵" + yes += 1 + elif record["choice"] == "NO": + text += "🔴" + no += 1 + elif record["choice"] == "ABSTAIN": + text += "⚫️" + abstain += 1 + if not self.anonymous: + text += f" {str(record['username'])}\n" + if self.anonymous: + text += "\n" + text += f"\n" \ + f"⚪ {none}\n" \ + f"🔵 {yes}\n" \ + f"🔴 {no}\n" \ + f"⚫️ {abstain}" + return text + + +class VoteChoices(enum.Enum): + ABSTAIN = 1 + YES = 2 + NO = 3 + + +class VoteAnswer(Base): + __tablename__ = "voteanswer" + + question_id = Column(Integer, ForeignKey("votequestion.id")) + question = relationship("VoteQuestion") + user_id = Column(BigInteger, ForeignKey("telegram.telegram_id")) + user = relationship("Telegram") + choice = Column(Enum(VoteChoices), nullable=False) + + __table_args__ = (PrimaryKeyConstraint("question_id", "user_id"),) + + def __repr__(self): + return f"" + + # If run as script, create all the tables in the db if __name__ == "__main__": - print("Creating new tables...") - Base.metadata.create_all(bind=engine) - print("Done!") \ No newline at end of file + session = Session() + session.query(VoteQuestion).first().generate_text(session) + #print("Creating new tables...") + #Base.metadata.create_all(bind=engine) + #print("Done!") \ No newline at end of file diff --git a/telegrambot.py b/telegrambot.py index fdd77508..83bb2413 100644 --- a/telegrambot.py +++ b/telegrambot.py @@ -4,8 +4,8 @@ import math import db import errors import stagismo -from telegram import Bot, Update -from telegram.ext import Updater, CommandHandler +from telegram import Bot, Update, InlineKeyboardMarkup, InlineKeyboardButton +from telegram.ext import Updater, CommandHandler, CallbackQueryHandler from discord import Status as DiscordStatus # Init the config reader @@ -186,7 +186,7 @@ def cmd_balurage(bot: Bot, update: Update): session.add(br) session.commit() bot.send_message(update.message.chat.id, f"😡 Stai sfogando la tua ira sul bot!") - except Exception as e: + except Exception: raise finally: session.close() @@ -227,6 +227,86 @@ def cmd_diario(bot: Bot, update: Update): session.close() +def cmd_vote(bot: Bot, update: Update): + session = db.Session() + try: + user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none() + if user is None: + bot.send_message(update.message.chat.id, + "⚠ Il tuo account Telegram non è registrato al RYGdb! Registrati con `/register@royalgamesbot `.") + return + try: + _, mode, question = update.message.text.split(" ", 2) + except IndexError: + bot.send_message(update.message.chat.id, + "⚠ Non hai specificato tutti i parametri necessari!" + "Sintassi: `/vote@royalgamesbot `") + return + if mode == "public": + vote = db.VoteQuestion(question=question, anonymous=False) + elif mode == "secret": + vote = db.VoteQuestion(question=question, anonymous=True) + else: + bot.send_message(update.message.chat.id, + "⚠ Non hai specificato una modalità valida!" + "Sintassi: `/vote@royalgamesbot `") + return + session.add(vote) + session.flush() + inline_keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔵 Sì", callback_data="vote_yes")], + [InlineKeyboardButton("🔴 No", callback_data="vote_no")], + [InlineKeyboardButton("⚫️ Astieniti", callback_data="vote_abstain")]]) + message = bot.send_message(update.message.chat.id, vote.generate_text(session=session), reply_markup=inline_keyboard, + parse_mode="HTML") + vote.message_id = message.message_id + session.commit() + except Exception: + raise + finally: + session.close() + + +def on_callback_query(bot: Bot, update: Update): + if update.callback_query.data == "vote_yes": + choice = db.VoteChoices.YES + emoji = "🔵" + elif update.callback_query.data == "vote_no": + choice = db.VoteChoices.NO + emoji = "🔴" + elif update.callback_query.data == "vote_abstain": + choice = db.VoteChoices.ABSTAIN + emoji = "⚫️" + else: + raise NotImplementedError() + if update.callback_query.data.startswith("vote_"): + session = db.Session() + try: + user = session.query(db.Telegram).filter_by(telegram_id=update.callback_query.from_user.id).one_or_none() + if user is None: + bot.answer_callback_query(update.callback_query.id, show_alert=True, + text="⚠ Il tuo account Telegram non è registrato al RYGdb! Registrati con `/register@royalgamesbot `.") + return + question = session.query(db.VoteQuestion).filter_by(message_id=update.callback_query.message.message_id).one() + answer = session.query(db.VoteAnswer).filter_by(question=question, user=user).one_or_none() + if answer is None: + answer = db.VoteAnswer(question=question, choice=choice, user=user) + session.add(answer) + else: + answer.choice = choice + session.commit() + bot.answer_callback_query(update.callback_query.id, text=f"Hai votato {emoji}.", cache_time=1) + inline_keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔵 Sì", callback_data="vote_yes")], + [InlineKeyboardButton("🔴 No", callback_data="vote_no")], + [InlineKeyboardButton("⚫️ Astieniti", callback_data="vote_abstain")]]) + bot.edit_message_text(message_id=update.callback_query.message.message_id, chat_id=update.callback_query.message.chat.id, + text=question.generate_text(session), reply_markup=inline_keyboard, + parse_mode="HTML") + except Exception as e: + raise + finally: + session.close() + + def process(arg_discord_connection): print("Telegrambot starting...") if arg_discord_connection is not None: @@ -243,6 +323,8 @@ def process(arg_discord_connection): u.dispatcher.add_handler(CommandHandler("ahnonlosoio", cmd_ahnonlosoio)) u.dispatcher.add_handler(CommandHandler("balurage", cmd_balurage)) u.dispatcher.add_handler(CommandHandler("diario", cmd_diario)) + u.dispatcher.add_handler(CommandHandler("vote", cmd_vote)) + u.dispatcher.add_handler(CallbackQueryHandler(on_callback_query)) u.start_polling() u.idle()