1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 11:34:18 +00:00

Add /whoami

This commit is contained in:
Steffo 2021-04-19 16:56:36 +02:00
parent 093618bc39
commit 9d590c711c
Signed by: steffo
GPG key ID: 6965406171929D01
7 changed files with 160 additions and 69 deletions

View file

@ -2,7 +2,10 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true"> <component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output /> <exclude-output />
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/royalpack" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/dist" />
</content>
<orderEntry type="jdk" jdkName="Poetry (royalpack)" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Poetry (royalpack)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>

View file

@ -56,6 +56,7 @@ register_telegram(commands.spell, ["spell"], "(?P<spellname>.+)")
register_telegram(commands.smecds, ["smecds"]) register_telegram(commands.smecds, ["smecds"])
register_telegram(commands.man, ["man", "help"], "(?P<commandname>[A-Za-z]+)") register_telegram(commands.man, ["man", "help"], "(?P<commandname>[A-Za-z]+)")
register_telegram(commands.login, ["login"]) register_telegram(commands.login, ["login"])
register_telegram(commands.whoami, ["whoami"])
pda.implementations["telethon.1"].register_conversation(r) pda.implementations["telethon.1"].register_conversation(r)

View file

@ -0,0 +1 @@
from .login import *

65
royalpack/bolts/login.py Normal file
View file

@ -0,0 +1,65 @@
"""
"""
from __future__ import annotations
import typing as t
import functools
import sqlalchemy.orm as so
import sqlalchemy.sql as ss
import royalnet.engineer as engi
import logging
import royalnet_telethon
import royalpack.database as db
log = logging.getLogger(__name__)
def use_ryglogin(allow_anonymous=False):
"""
Decorator factory which allows a :class:`~royalnet.engineer.conversation.Conversation` to find out what RYGaccount
user called a certain function.
It requires the :func:`~royalnet.engineer.bolts.use_database` decorator.
:param allow_anonymous: If users who are not logged in should be allowed to use the command, or should be displayed
an error instead.
:return: The decorator to use to decorate the function.
"""
def decorator(f):
@functools.wraps(f)
async def decorated(_session: so.Session, _imp: engi.PDAImplementation, _msg: engi.Message, **f_kwargs):
if isinstance(_imp, royalnet_telethon.TelethonPDAImplementation):
_sender = await _msg.sender
supported = True
account = _session.execute(
ss.select(db.TelegramAccount).where(db.TelegramAccount.id == _sender._user.id)
).scalar()
else:
supported = False
account = None
if account:
user = account.user
else:
if allow_anonymous:
user = None
else:
if supported:
await _msg.reply(text=f"⚠️ Non sei loggato al tuo RYGaccount! "
f"Usa il comando login per effettuare la connessione.")
else:
await _msg.reply(text=f"⚠️ Il login non è disponibile su questa implementazione di Royalnet, "
f"e questo comando non consente l'esecuzione anonima.")
return
return await f(_session=_session, _imp=_imp, _msg=_msg, **f_kwargs, _account=account, _user=user)
return decorated
return decorator
__all__ = (
"use_ryglogin",
)

View file

@ -13,3 +13,4 @@ from .spell import *
from .smecds import * from .smecds import *
from .man import * from .man import *
from .login import * from .login import *
from .whoami import *

View file

@ -1,6 +1,5 @@
import royalnet.royaltyping as t import royalnet.royaltyping as t
import royalnet.engineer as engi import royalnet.engineer as engi
import sqlalchemy.sql as ss
import sqlalchemy.orm as so import sqlalchemy.orm as so
import royalpack.database as db import royalpack.database as db
import royalpack.config as cfg import royalpack.config as cfg
@ -17,6 +16,73 @@ log = logging.getLogger(__name__)
# FIXME: Properly handle errors in this function! # FIXME: Properly handle errors in this function!
@engi.use_database(db.lazy_session_class)
@engi.TeleportingConversation
async def login(*, _msg: engi.Message, _session: so.Session, _imp, **__):
"""
Fai il login al tuo account Royalnet.
"""
log.debug("Evaluating config...")
config = cfg.lazy_config.evaluate()
private = await enforce_private_message(msg=_msg)
async with aiohttp.ClientSession() as http_session:
dc = await device_code_request(
http_session=http_session,
client_id=config["auth.client.id"],
device_url=config["auth.url.device"],
scopes=["profile", "email", "openid"],
)
await prompt_login(
channel=private,
verification_url=dc['verification_uri_complete'],
user_code=dc['user_code']
)
try:
async with async_timeout.timeout(dc["expires_in"]):
at = await device_code_exchange(
http_session=http_session,
client_id=config["auth.client.id"],
token_url=config["auth.url.token"],
device_code=dc["device_code"],
sleep_time=9
)
except asyncio.TimeoutError:
await notify_expiration(
channel=private
)
return
ui = await get_user_info(
http_session=http_session,
userinfo_url=config["auth.url.userinfo"],
token_type=at["token_type"],
access_token=at["access_token"],
)
user = await register_user_generic(session=_session, user_info=ui)
log.debug(f"Committing session...")
_session.commit()
log.debug(f"Done, notifying the user...")
await private.send_message(text=f"✅ Login riuscito! Sei loggato come {user.name}!")
if isinstance(_imp, royalnet_telethon.TelethonPDAImplementation):
sender = await _msg.sender
tg = await register_user_telethon(session=_session, user_info=ui, telethon_user=sender._user)
log.debug(f"Committing session...")
_session.commit()
log.debug(f"Done, notifying the user...")
await private.send_message(text=f"↔️ Sincronizzazione con Telegram riuscita! Sei loggato come {tg.mention()}!")
async def enforce_private_message(msg: engi.Message) -> engi.Channel: async def enforce_private_message(msg: engi.Message) -> engi.Channel:
""" """
Get the private chat for an user and notify them of the switch. Get the private chat for an user and notify them of the switch.
@ -210,71 +276,4 @@ async def register_user_telethon(
return tg return tg
@engi.use_database(db.lazy_session_class)
@engi.TeleportingConversation
async def login(*, _msg: engi.Message, _session: so.Session, _imp, **__):
"""
Fai il login al tuo account Royalnet.
"""
log.debug("Evaluating config...")
config = cfg.lazy_config.evaluate()
private = await enforce_private_message(msg=_msg)
async with aiohttp.ClientSession() as http_session:
dc = await device_code_request(
http_session=http_session,
client_id=config["auth.client.id"],
device_url=config["auth.url.device"],
scopes=["profile", "email", "openid"],
)
await prompt_login(
channel=private,
verification_url=dc['verification_uri_complete'],
user_code=dc['user_code']
)
try:
async with async_timeout.timeout(dc["expires_in"]):
at = await device_code_exchange(
http_session=http_session,
client_id=config["auth.client.id"],
token_url=config["auth.url.token"],
device_code=dc["device_code"],
sleep_time=9
)
except asyncio.TimeoutError:
await notify_expiration(
channel=private
)
return
ui = await get_user_info(
http_session=http_session,
userinfo_url=config["auth.url.userinfo"],
token_type=at["token_type"],
access_token=at["access_token"],
)
user = await register_user_generic(session=_session, user_info=ui)
log.debug(f"Committing session...")
_session.commit()
log.debug(f"Done, notifying the user...")
await private.send_message(text=f"✅ Login riuscito! Sei loggato come {user.name}!")
if isinstance(_imp, royalnet_telethon.TelethonPDAImplementation):
sender = await _msg.sender
tg = await register_user_telethon(session=_session, user_info=ui, telethon_user=sender._user)
log.debug(f"Committing session...")
_session.commit()
log.debug(f"Done, notifying the user...")
await private.send_message(text=f"↔️ Sincronizzazione con Telegram riuscita! Sei loggato come {tg.mention()}!")
__all__ = ("login",) __all__ = ("login",)

View file

@ -0,0 +1,21 @@
import royalnet.engineer as engi
import royalpack.database as db
import royalpack.bolts as rb
@engi.use_database(db.lazy_session_class)
@rb.use_ryglogin(allow_anonymous=True)
@engi.TeleportingConversation
async def whoami(*, _msg: engi.Message, _account, **__):
"""
Scopri con che account sei loggato.
"""
# TODO: improve output
if _account:
await _msg.reply(text=f"☀️ {_account}")
else:
await _msg.reply(text="☁️ Non sei loggato.")
__all__ = ("whoami",)