1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Added more commands to the bot

This commit is contained in:
Steffo 2017-03-05 18:14:45 +01:00
parent 6a75d760e0
commit b24d4d7350
2 changed files with 77 additions and 20 deletions

View file

@ -1,22 +1,74 @@
import asyncio import asyncio
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
import telegram import telegram
import random
import datetime
b = telegram.Bot("lul")
b = telegram.Bot("hidden")
async def diario(bot, update, arguments): async def diario(bot, update, arguments):
# Sì, ho copiato la funzione dal bot vecchio """Aggiungi una frase al diario Royal Games.
if len(arguments) > 0:
entry = " ".join(arguments)
if entry.isprintable():
entry = entry.replace("\n", " ")
time = update.message.date.timestamp()
# TODO: add better file handling
fdiario = open("diario.txt", "a")
fdiario.write(f"{int(time)}|{entry}\n")
fdiario.close()
del fdiario
await update.message.chat.send_message(bot, "Aggiunto al diario!")
Sintassi: `/diario <frase>`"""
# Sì, ho copiato la funzione dal bot vecchio
if len(arguments) == 0:
await update.message.chat.send_message(bot, "⚠ Sintassi del comando non valida.\n`/diario <random | numerofrase>`")
return
entry = " ".join(arguments)
if not entry.isprintable():
await update.message.chat.send_message(bot, "⚠ La frase che stai provando ad aggiungere contiene caratteri non ASCII, quindi non è stata aggiunta.\nToglili e riprova!")
return
entry = entry.replace("\n", " ")
time = update.message.date.timestamp()
# TODO: add better file handling, maybe use GET requests?
file = open("diario.txt", "a")
file.write(f"{int(time)}|{entry}\n")
file.close()
del file
await update.message.chat.send_message(bot, "Aggiunto al diario!")
async def leggi(bot, update, arguments):
"""Leggi una frase dal diario Royal Games.
Puoi visualizzare il diario [qui](https://royal.steffo.me/diario.htm), leggere una frase casuale scrivendo `/leggi random` o leggere una frase specifica scrivendo `/leggi <numero>`.
Sintassi: `/leggi <random | numerofrase>`"""
if len(arguments) == 0 or len(arguments) > 1:
await update.message.chat.send_message(bot, "⚠ Sintassi del comando non valida.\n`/leggi <random | numerofrase>`")
return
# TODO: add better file handling, maybe use GET requests?
file = open("diario.txt", "r")
entries = file.read().split("\n")
file.close()
if arguments[0] == "random":
entry_number = random.randrange(len(entries))
else:
entry_number = arguments[0]
entry = entries[entry_number].split("|", 1)
date = datetime.datetime.fromtimestamp(entry[0]).isoformat()
text = entry[1]
await update.message.chat.send_message(bot, f"Frase #{entry_number} | {date}\n{text}")
async def help(bot, update, arguments):
"""Visualizza la descrizione di un comando.
Sintassi: `/help [comando]`"""
if len(arguments) == 0:
await update.message.chat.send_message(bot, help.__doc__)
elif len(arguments) > 1:
await update.message.chat.send_message(bot, "⚠ Sintassi del comando non valida.\n`/help [comando]`")
else:
if arguments[0] in b.commands:
await update.message.chat.send_message(bot, b.commands[arguments[0]].__doc__)
else:
await update.message.chat.send_message(bot, "⚠ Il comando specificato non esiste.")
b.commands["leggi"] = leggi
b.commands["diario"] = diario b.commands["diario"] = diario
b.commands["help"] = help
b.run() b.run()

View file

@ -1,16 +1,19 @@
import asyncio import asyncio
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
import aiohttp import aiohttp
import async_timeout import async_timeout
import datetime import datetime
import os
class TelegramAPIError(Exception): class TelegramAPIError(Exception):
pass pass
class UpdateError(Exception): class UpdateError(Exception):
pass pass
class Bot: class Bot:
def __init__(self, token): def __init__(self, token):
self.token = token self.token = token
@ -44,7 +47,7 @@ class Bot:
async def get_updates(self): async def get_updates(self):
"""Get the latest updates from the Telegram API with /getUpdates.""" """Get the latest updates from the Telegram API with /getUpdates."""
try: try:
data = await self.api_request("getUpdates", offset=self.offset, timeout=300) data = await self.api_request("getUpdates", 300, offset=self.offset, timeout=300)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return return
for update in data: for update in data:
@ -83,7 +86,8 @@ class Bot:
# TODO: use message entities? # TODO: use message entities?
if isinstance(update.message.content, str) and update.message.content.startswith("/"): if isinstance(update.message.content, str) and update.message.content.startswith("/"):
split_msg = update.message.content.split(" ") split_msg = update.message.content.split(" ")
command = split_msg[0].lstrip("/") # Ignore the left slash and the right @botname
command = split_msg[0].lstrip("/").split("@")[0]
if command in self.commands: if command in self.commands:
arguments = split_msg[1:] arguments = split_msg[1:]
loop.create_task(self.commands[command](bot=self, update=update, arguments=arguments)) loop.create_task(self.commands[command](bot=self, update=update, arguments=arguments))
@ -111,7 +115,7 @@ class Bot:
# New pinned message # New pinned message
elif update.message.content.type == "pinned_message": elif update.message.content.type == "pinned_message":
chat.pinned_msg = update.message.content.content chat.pinned_msg = update.message.content.content
# TODO: handle group -> supergroup migrations # TODO: handle group -> supergroup migrations
def find_update(self, upd_id): def find_update(self, upd_id):
for update in self.updates: for update in self.updates:
@ -123,15 +127,16 @@ class Bot:
if chat.chat_id == chat_id: if chat.chat_id == chat_id:
return chat return chat
async def api_request(self, endpoint, **params): async def api_request(self, endpoint, t=10, **params):
"""Send a request to the Telegram API at the specified endpoint.""" """Send a request to the Telegram API at the specified endpoint."""
# Request timeout is 10 seconds. # Request timeout is 10 seconds.
with async_timeout.timeout(10): with async_timeout.timeout(t):
# Create a new session for each request. # Create a new session for each request.
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
# Send the request to the Telegram API # Send the request to the Telegram API
token = self.token token = self.token
async with session.request("GET", f"https://api.telegram.org/bot{token}/{endpoint}", params=params) as response: async with session.request("GET", f"https://api.telegram.org/bot{token}/{endpoint}",
params=params) as response:
# Check for errors in the request # Check for errors in the request
if response.status != 200: if response.status != 200:
raise TelegramAPIError(f"Request returned {response.status} {response.reason}") raise TelegramAPIError(f"Request returned {response.status} {response.reason}")