From 9d590c711c57414dbbe9ffe2434cdfc1fc1953dd Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Mon, 19 Apr 2021 16:56:36 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20/whoami?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- royalpack.iml | 5 +- royalpack/__main__.py | 1 + royalpack/bolts/__init__.py | 1 + royalpack/bolts/login.py | 65 ++++++++++++++++ royalpack/commands/__init__.py | 1 + royalpack/commands/login.py | 135 ++++++++++++++++----------------- royalpack/commands/whoami.py | 21 +++++ 7 files changed, 160 insertions(+), 69 deletions(-) create mode 100644 royalpack/bolts/__init__.py create mode 100644 royalpack/bolts/login.py create mode 100644 royalpack/commands/whoami.py diff --git a/royalpack.iml b/royalpack.iml index 9b1475a0..580393d1 100644 --- a/royalpack.iml +++ b/royalpack.iml @@ -2,7 +2,10 @@ - + + + + diff --git a/royalpack/__main__.py b/royalpack/__main__.py index da8d29b1..e334763e 100644 --- a/royalpack/__main__.py +++ b/royalpack/__main__.py @@ -56,6 +56,7 @@ register_telegram(commands.spell, ["spell"], "(?P.+)") register_telegram(commands.smecds, ["smecds"]) register_telegram(commands.man, ["man", "help"], "(?P[A-Za-z]+)") register_telegram(commands.login, ["login"]) +register_telegram(commands.whoami, ["whoami"]) pda.implementations["telethon.1"].register_conversation(r) diff --git a/royalpack/bolts/__init__.py b/royalpack/bolts/__init__.py new file mode 100644 index 00000000..b7ed19b9 --- /dev/null +++ b/royalpack/bolts/__init__.py @@ -0,0 +1 @@ +from .login import * diff --git a/royalpack/bolts/login.py b/royalpack/bolts/login.py new file mode 100644 index 00000000..4d1d8d64 --- /dev/null +++ b/royalpack/bolts/login.py @@ -0,0 +1,65 @@ +""" + +""" + +from __future__ import annotations +import typing as t +import functools +import sqlalchemy.orm as so +import sqlalchemy.sql as ss +import royalnet.engineer as engi +import logging +import royalnet_telethon +import royalpack.database as db + +log = logging.getLogger(__name__) + + +def use_ryglogin(allow_anonymous=False): + """ + Decorator factory which allows a :class:`~royalnet.engineer.conversation.Conversation` to find out what RYGaccount + user called a certain function. + + It requires the :func:`~royalnet.engineer.bolts.use_database` decorator. + + :param allow_anonymous: If users who are not logged in should be allowed to use the command, or should be displayed + an error instead. + :return: The decorator to use to decorate the function. + """ + def decorator(f): + @functools.wraps(f) + async def decorated(_session: so.Session, _imp: engi.PDAImplementation, _msg: engi.Message, **f_kwargs): + + if isinstance(_imp, royalnet_telethon.TelethonPDAImplementation): + _sender = await _msg.sender + supported = True + account = _session.execute( + ss.select(db.TelegramAccount).where(db.TelegramAccount.id == _sender._user.id) + ).scalar() + + else: + supported = False + account = None + + if account: + user = account.user + else: + if allow_anonymous: + user = None + else: + if supported: + await _msg.reply(text=f"⚠️ Non sei loggato al tuo RYGaccount! " + f"Usa il comando login per effettuare la connessione.") + else: + await _msg.reply(text=f"⚠️ Il login non è disponibile su questa implementazione di Royalnet, " + f"e questo comando non consente l'esecuzione anonima.") + return + + return await f(_session=_session, _imp=_imp, _msg=_msg, **f_kwargs, _account=account, _user=user) + return decorated + return decorator + + +__all__ = ( + "use_ryglogin", +) diff --git a/royalpack/commands/__init__.py b/royalpack/commands/__init__.py index 60f1b3b0..502d49ed 100644 --- a/royalpack/commands/__init__.py +++ b/royalpack/commands/__init__.py @@ -13,3 +13,4 @@ from .spell import * from .smecds import * from .man import * from .login import * +from .whoami import * diff --git a/royalpack/commands/login.py b/royalpack/commands/login.py index eb822235..18d7c92b 100644 --- a/royalpack/commands/login.py +++ b/royalpack/commands/login.py @@ -1,6 +1,5 @@ import royalnet.royaltyping as t import royalnet.engineer as engi -import sqlalchemy.sql as ss import sqlalchemy.orm as so import royalpack.database as db import royalpack.config as cfg @@ -17,6 +16,73 @@ log = logging.getLogger(__name__) # FIXME: Properly handle errors in this function! +@engi.use_database(db.lazy_session_class) +@engi.TeleportingConversation +async def login(*, _msg: engi.Message, _session: so.Session, _imp, **__): + """ + Fai il login al tuo account Royalnet. + """ + log.debug("Evaluating config...") + config = cfg.lazy_config.evaluate() + + private = await enforce_private_message(msg=_msg) + + async with aiohttp.ClientSession() as http_session: + + dc = await device_code_request( + http_session=http_session, + client_id=config["auth.client.id"], + device_url=config["auth.url.device"], + scopes=["profile", "email", "openid"], + ) + + await prompt_login( + channel=private, + verification_url=dc['verification_uri_complete'], + user_code=dc['user_code'] + ) + + try: + async with async_timeout.timeout(dc["expires_in"]): + at = await device_code_exchange( + http_session=http_session, + client_id=config["auth.client.id"], + token_url=config["auth.url.token"], + device_code=dc["device_code"], + sleep_time=9 + ) + except asyncio.TimeoutError: + await notify_expiration( + channel=private + ) + return + + ui = await get_user_info( + http_session=http_session, + userinfo_url=config["auth.url.userinfo"], + token_type=at["token_type"], + access_token=at["access_token"], + ) + + user = await register_user_generic(session=_session, user_info=ui) + + log.debug(f"Committing session...") + _session.commit() + + log.debug(f"Done, notifying the user...") + await private.send_message(text=f"✅ Login riuscito! Sei loggato come {user.name}!") + + if isinstance(_imp, royalnet_telethon.TelethonPDAImplementation): + sender = await _msg.sender + tg = await register_user_telethon(session=_session, user_info=ui, telethon_user=sender._user) + + log.debug(f"Committing session...") + _session.commit() + + log.debug(f"Done, notifying the user...") + await private.send_message(text=f"↔️ Sincronizzazione con Telegram riuscita! Sei loggato come {tg.mention()}!") + + async def enforce_private_message(msg: engi.Message) -> engi.Channel: """ Get the private chat for an user and notify them of the switch. @@ -210,71 +276,4 @@ async def register_user_telethon( return tg -@engi.use_database(db.lazy_session_class) -@engi.TeleportingConversation -async def login(*, _msg: engi.Message, _session: so.Session, _imp, **__): - """ - Fai il login al tuo account Royalnet. - """ - log.debug("Evaluating config...") - config = cfg.lazy_config.evaluate() - - private = await enforce_private_message(msg=_msg) - - async with aiohttp.ClientSession() as http_session: - - dc = await device_code_request( - http_session=http_session, - client_id=config["auth.client.id"], - device_url=config["auth.url.device"], - scopes=["profile", "email", "openid"], - ) - - await prompt_login( - channel=private, - verification_url=dc['verification_uri_complete'], - user_code=dc['user_code'] - ) - - try: - async with async_timeout.timeout(dc["expires_in"]): - at = await device_code_exchange( - http_session=http_session, - client_id=config["auth.client.id"], - token_url=config["auth.url.token"], - device_code=dc["device_code"], - sleep_time=9 - ) - except asyncio.TimeoutError: - await notify_expiration( - channel=private - ) - return - - ui = await get_user_info( - http_session=http_session, - userinfo_url=config["auth.url.userinfo"], - token_type=at["token_type"], - access_token=at["access_token"], - ) - - user = await register_user_generic(session=_session, user_info=ui) - - log.debug(f"Committing session...") - _session.commit() - - log.debug(f"Done, notifying the user...") - await private.send_message(text=f"✅ Login riuscito! Sei loggato come {user.name}!") - - if isinstance(_imp, royalnet_telethon.TelethonPDAImplementation): - sender = await _msg.sender - tg = await register_user_telethon(session=_session, user_info=ui, telethon_user=sender._user) - - log.debug(f"Committing session...") - _session.commit() - - log.debug(f"Done, notifying the user...") - await private.send_message(text=f"↔️ Sincronizzazione con Telegram riuscita! Sei loggato come {tg.mention()}!") - - __all__ = ("login",) diff --git a/royalpack/commands/whoami.py b/royalpack/commands/whoami.py new file mode 100644 index 00000000..cef27165 --- /dev/null +++ b/royalpack/commands/whoami.py @@ -0,0 +1,21 @@ +import royalnet.engineer as engi +import royalpack.database as db +import royalpack.bolts as rb + + +@engi.use_database(db.lazy_session_class) +@rb.use_ryglogin(allow_anonymous=True) +@engi.TeleportingConversation +async def whoami(*, _msg: engi.Message, _account, **__): + """ + Scopri con che account sei loggato. + """ + + # TODO: improve output + if _account: + await _msg.reply(text=f"☀️ {_account}") + else: + await _msg.reply(text="☁️ Non sei loggato.") + + +__all__ = ("whoami",)