mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
197 lines
7.1 KiB
Python
197 lines
7.1 KiB
Python
|
"""
|
||
|
The PDA ("main" class) for the :mod:`royalnet_telethon` frontend.
|
||
|
"""
|
||
|
|
||
|
from __future__ import annotations
|
||
|
import royalnet.royaltyping as t
|
||
|
|
||
|
import logging
|
||
|
import asyncio
|
||
|
import royalnet.engineer as engi
|
||
|
import telethon as tt
|
||
|
import telethon.tl.custom as tlc
|
||
|
import enum
|
||
|
|
||
|
from .bullet.projectiles.message import TelegramMessageReceived, TelegramMessageEdited, TelegramMessageDeleted
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class TelethonPDAMode(enum.Enum):
|
||
|
GLOBAL = enum.auto()
|
||
|
CHAT = enum.auto()
|
||
|
USER = enum.auto()
|
||
|
CHAT_USER = enum.auto()
|
||
|
|
||
|
|
||
|
class TelethonPDA:
|
||
|
"""
|
||
|
A PDA which handles :mod:`royalnet` input and output using a Telegram bot as a source.
|
||
|
"""
|
||
|
|
||
|
def __init__(self,
|
||
|
tg_api_id: int,
|
||
|
tg_api_hash: str,
|
||
|
bot_username: str,
|
||
|
mode: TelethonPDAMode = TelethonPDAMode.CHAT_USER,
|
||
|
):
|
||
|
"""
|
||
|
Create a new :class:`.TelethonPDA` .
|
||
|
|
||
|
Get API properties `here <https://my.telegram.org/apps>`_.
|
||
|
|
||
|
:param tg_api_id: The Telegram ``api_id``.
|
||
|
:param tg_api_hash: The Telegram ``api_hash``.
|
||
|
:param mode: The mode to use for mapping dispensers.
|
||
|
"""
|
||
|
|
||
|
log.debug(f"Creating new TelethonPDA...")
|
||
|
|
||
|
self.dispensers: dict[t.Any, engi.Dispenser] = {}
|
||
|
"""
|
||
|
The :class:`royalnet.engineer.dispenser.Dispenser`\\ s of this PDA.
|
||
|
"""
|
||
|
|
||
|
self.conversations: t.List[engi.Conversation] = []
|
||
|
"""
|
||
|
A :class:`list` of conversations to run before a new event is :meth:`.put` in a
|
||
|
:class:`~royalnet.engineer.dispenser.Dispenser`.
|
||
|
"""
|
||
|
|
||
|
self.client: tt.TelegramClient = tt.TelegramClient("bot", api_id=tg_api_id, api_hash=tg_api_hash)
|
||
|
"""
|
||
|
The :mod:`telethon` Telegram client that this PDA will use to interface with Telegram.
|
||
|
"""
|
||
|
|
||
|
self._register_events()
|
||
|
|
||
|
self.mode: TelethonPDAMode = mode
|
||
|
"""
|
||
|
The mode to use for mapping dispensers.
|
||
|
"""
|
||
|
|
||
|
self.bot_username: str = bot_username
|
||
|
|
||
|
def _register_events(self):
|
||
|
self.client.add_event_handler(callback=self._message_new, event=tt.events.NewMessage())
|
||
|
self.client.add_event_handler(callback=self._message_edit, event=tt.events.MessageEdited())
|
||
|
self.client.add_event_handler(callback=self._message_delete, event=tt.events.MessageDeleted())
|
||
|
# self.client.add_event_handler(callback=self._message_read, event=tt.events.MessageRead())
|
||
|
# self.client.add_event_handler(callback=self._chat_action, event=tt.events.ChatAction())
|
||
|
# self.client.add_event_handler(callback=self._user_update, event=tt.events.UserUpdate())
|
||
|
# self.client.add_event_handler(callback=self._callback_query, event=tt.events.CallbackQuery())
|
||
|
# self.client.add_event_handler(callback=self._inline_query, event=tt.events.InlineQuery())
|
||
|
# self.client.add_event_handler(callback=self._album, event=tt.events.Album())
|
||
|
|
||
|
def _determine_key(self, event: tlc.message.Message):
|
||
|
if self.mode == TelethonPDAMode.GLOBAL:
|
||
|
return None
|
||
|
elif self.mode == TelethonPDAMode.USER:
|
||
|
return event.from_id.user_id
|
||
|
elif self.mode == TelethonPDAMode.CHAT:
|
||
|
return event.chat_id
|
||
|
elif self.mode == TelethonPDAMode.CHAT_USER:
|
||
|
return event.chat_id, event.from_id.user_id
|
||
|
else:
|
||
|
raise TypeError("Invalid mode")
|
||
|
|
||
|
async def _message_new(self, event: tlc.message.Message):
|
||
|
await self.put_projectile(
|
||
|
key=self._determine_key(event),
|
||
|
proj=TelegramMessageReceived(event=event),
|
||
|
)
|
||
|
|
||
|
async def _message_edit(self, event: tlc.message.Message):
|
||
|
await self.put_projectile(
|
||
|
key=self._determine_key(event),
|
||
|
proj=TelegramMessageEdited(event=event),
|
||
|
)
|
||
|
|
||
|
async def _message_delete(self, event: tlc.message.Message):
|
||
|
await self.put_projectile(
|
||
|
key=self._determine_key(event),
|
||
|
proj=TelegramMessageDeleted(event=event),
|
||
|
)
|
||
|
|
||
|
async def run(self, bot_token: str) -> t.NoReturn:
|
||
|
"""
|
||
|
Run the main loop of the :class:`.ConsolePDA` for ``cycles`` cycles, or unlimited cycles if the parameter is
|
||
|
:data:`True`.
|
||
|
"""
|
||
|
# Login to the Telegram API
|
||
|
self.client: tt.TelegramClient = await self.client.start(bot_token=bot_token)
|
||
|
await self.client.connect()
|
||
|
await self.client.get_me()
|
||
|
await self.client.catch_up()
|
||
|
await self.client.run_until_disconnected()
|
||
|
|
||
|
def register_conversation(self, conv: engi.Conversation) -> None:
|
||
|
"""
|
||
|
Register a new conversation in the PDA.
|
||
|
|
||
|
:param conv: The conversation to register.
|
||
|
"""
|
||
|
log.info(f"Registering conversation: {conv!r}")
|
||
|
self.conversations.append(conv)
|
||
|
|
||
|
def unregister_conversation(self, conv: engi.Conversation) -> None:
|
||
|
"""
|
||
|
Unregister a conversation from the PDA.
|
||
|
|
||
|
:param conv: The conversation to unregister.
|
||
|
"""
|
||
|
log.info(f"Unregistering conversation: {conv!r}")
|
||
|
self.conversations.remove(conv)
|
||
|
|
||
|
def register_partial(self, part: engi.PartialCommand, names: t.List[str]) -> engi.Command:
|
||
|
"""
|
||
|
Register a new :class:`~royalnet.engineer.command.PartialCommand` in the PDA, converting it to a
|
||
|
:class:`royalnet.engineer.Command` in the process.
|
||
|
|
||
|
:param part: The :class:`~royalnet.engineer.command.PartialCommand` to register.
|
||
|
:param names: The :attr:`~royalnet.engineer.command.Command.names` to register the command with.
|
||
|
:return: The resulting :class:`~royalnet.engineer.command.Command`.
|
||
|
"""
|
||
|
log.debug(f"Completing partial: {part!r}")
|
||
|
if part.syntax:
|
||
|
command = part.complete(pattern=rf"^/{{name}}(?:@{self.bot_username})?\s+{{syntax}}$", names=names)
|
||
|
else:
|
||
|
command = part.complete(pattern=rf"^/{{name}}(?:@{self.bot_username})?$", names=names)
|
||
|
self.register_conversation(command)
|
||
|
return command
|
||
|
|
||
|
async def put_projectile(self, key: t.Any, proj: engi.Projectile) -> None:
|
||
|
"""
|
||
|
Insert a new projectile into the dispenser.
|
||
|
|
||
|
:param key: The key of the dispenser to interact with.
|
||
|
:param proj: The projectile to put in the dispenser.
|
||
|
"""
|
||
|
if key not in self.dispensers:
|
||
|
log.debug(f"Dispenser not found, creating one...")
|
||
|
self.dispensers[key] = engi.Dispenser()
|
||
|
|
||
|
dispenser = self.dispensers[key]
|
||
|
|
||
|
log.debug("Getting running loop...")
|
||
|
loop = asyncio.get_running_loop()
|
||
|
|
||
|
for conversation in self.conversations:
|
||
|
log.debug(f"Creating run task for: {conversation!r}")
|
||
|
loop.create_task(dispenser.run(conversation, _pda=self), name=f"{repr(conversation)}")
|
||
|
|
||
|
log.debug("Running a event loop cycle...")
|
||
|
await asyncio.sleep(0)
|
||
|
|
||
|
log.debug(f"Putting projectile {proj!r} in dispenser {dispenser!r}...")
|
||
|
await dispenser.put(proj)
|
||
|
|
||
|
log.debug("Awaiting another event loop cycle...")
|
||
|
await asyncio.sleep(0)
|
||
|
|
||
|
|
||
|
# Objects exported by this module
|
||
|
__all__ = (
|
||
|
"TelethonPDA",
|
||
|
)
|