1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Create abstract linkercommand

This commit is contained in:
Steffo 2020-07-11 02:14:38 +02:00
parent 2e09b0e30e
commit 395313f9cb
Signed by: steffo
GPG key ID: 896A80F55F7C97F0
11 changed files with 500 additions and 454 deletions

View file

@ -2,7 +2,7 @@
[tool.poetry]
name = "royalpack"
version = "5.11.1"
version = "5.12.0"
description = "A Royalnet command pack for the Royal Games community"
authors = ["Stefano Pigozzi <ste.pigozzi@gmail.com>"]
license = "AGPL-3.0+"

View file

@ -1,10 +1,198 @@
from typing import *
import royalnet
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.serf.telegram as rst
import royalnet.backpack.tables as rbt
import abc
import logging
import asyncio as aio
from ...types import Updatable
log = logging.getLogger(__name__)
class LinkerCommand(rc.Command, metaclass=abc.ABCMeta):
def __init__(self, interface: rc.CommandInterface):
super().__init__(interface)
self.updater_task = None
if self.enabled():
# Run updaters only on Telegram
if self.interface.name == "telegram":
self.updater_task = self.loop.create_task(self.run_updater())
async def run(self, args: rc.CommandArgs, data: rc.CommandData) -> None:
author = await data.get_author(error_if_none=True)
if len(args) == 0:
message = []
for obj in await self.get_updatables_of_user(session=data.session, user=author):
async def change(attribute: str, value: Any):
"""A shortcut for self.__change."""
await self._change(session=data.session,
obj=obj,
attribute=attribute,
new=value)
await self.update(session=data.session, obj=obj, change=change)
message.append(self.describe(obj))
if len(message) == 0:
raise rc.UserError("Nessun account connesso.")
await data.session_commit()
await data.reply("\n".join(message))
else:
message = ["🔗 Account collegato!\n"]
created = await self.create(session=data.session, user=author, args=args)
message.append(self.describe(created))
await data.session_commit()
await data.reply("\n".join(message))
def describe(self, obj: Updatable) -> str:
"""The text that should be appended to the report message for a given Updatable."""
return str(obj)
@abc.abstractmethod
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Updatable]:
"""Get the updatables of a specific user."""
...
@abc.abstractmethod
async def get_updatables(self, session) -> List[Updatable]:
"""Return a list of all objects that should be updated at this updater cycle."""
...
@abc.abstractmethod
async def create(self, session, user: rbt.User, args) -> Updatable:
"""Create a new updatable object for a user."""
...
@abc.abstractmethod
async def update(self, session, obj, change: Callable[[str, Any], Awaitable[None]]):
"""Update a single updatable object. Use the change method to change values on the object!"""
...
@abc.abstractmethod
async def on_increase(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
"""Called when the attribute has increased from the old value."""
...
@abc.abstractmethod
async def on_unchanged(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
"""Called when the attribute stayed the same as the old value."""
...
@abc.abstractmethod
async def on_decrease(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
"""Called when the attribute has decreased from the old value."""
...
@abc.abstractmethod
async def on_first(self, session, obj: Updatable, attribute: str, old: None, new: Any) -> None:
"""Called when the attribute changed from None."""
...
@abc.abstractmethod
async def on_reset(self, session, obj: Updatable, attribute: str, old: Any, new: None) -> None:
"""Called when the attribute changed to None."""
...
async def _change(self,
session,
obj,
attribute: str,
new) -> None:
"""Set the value of an attribute of an object to a value, and call the corresponding method."""
old = obj.__getattribute__(attribute)
if new == old:
await self.on_unchanged(session=session,
obj=obj,
attribute=attribute,
old=old,
new=new)
else:
if old is None:
await self.on_first(session=session,
obj=obj,
attribute=attribute,
old=old,
new=new)
elif new is None:
await self.on_reset(session=session,
obj=obj,
attribute=attribute,
old=old,
new=new)
elif new > old:
await self.on_increase(session=session,
obj=obj,
attribute=attribute,
old=old,
new=new)
else:
await self.on_decrease(session=session,
obj=obj,
attribute=attribute,
old=old,
new=new)
obj.__setattr__(attribute, new)
def enabled(self) -> bool:
"""Whether the updater is enabled or not."""
return self.config[self.name]["updater"]["enabled"]
def period(self) -> int:
"""The time between two updater cycles."""
return self.config[self.name]["updater"]["period"]
def delay(self) -> int:
"""The time between two object updates."""
return self.config[self.name]["updater"]["rate"]
def target(self) -> int:
"""The id of the Telegram chat where notifications should be sent."""
return self.config[self.name]["updater"]["target"]
async def run_updater(self):
log.info(f"Starting updater: {self.name}")
while True:
log.debug(f"Updater cycle: {self.name}")
session = self.alchemy.Session()
objects = await self.get_updatables(session)
for obj in objects:
log.debug(f"Updating: {obj} ({self.name})")
async def change(attribute: str, value: Any):
"""A shortcut for self.__change."""
await self._change(session=session,
obj=obj,
attribute=attribute,
new=value)
try:
await self.update(session=session,
obj=obj,
change=change)
except Exception as e:
ru.sentry_exc(e)
delay = self.delay()
log.debug(f"Waiting for: {delay} seconds (delay)")
await aio.sleep(delay)
log.debug(f"Committing updates: {self.name}")
await ru.asyncify(session.commit)
session.close()
period = self.period()
log.debug(f"Waiting for: {period} seconds (period)")
await aio.sleep(period)
async def notify(self, message):
await self.serf.api_call(self.serf.client.send_message,
chat_id=self.target(),
text=rst.escape(message),
parse_mode="HTML",
disable_webpage_preview=True)

View file

@ -3,41 +3,145 @@ from typing import *
import asyncio
import logging
import aiohttp
from royalnet.backpack import tables as rbt
from royalnet.commands import *
from royalnet.utils import *
from royalnet.serf.telegram.escape import escape as tg_escape
from sqlalchemy import or_, and_
from .abstract.linker import LinkerCommand
from ..tables import Steam, Brawlhalla, BrawlhallaDuo
from ..types import BrawlhallaRank, BrawlhallaMetal, BrawlhallaTier
from ..types import BrawlhallaRank, BrawlhallaMetal, BrawlhallaTier, Updatable
log = logging.getLogger(__name__)
class BrawlhallaCommand(Command):
class BrawlhallaCommand(LinkerCommand):
name: str = "brawlhalla"
aliases = ["bh", "bruhalla", "bruhlalla"]
description: str = "Visualizza le tue statistiche di Dota!"
description: str = "Visualizza le tue statistiche di Brawlhalla."
syntax: str = ""
def __init__(self, interface: CommandInterface):
super().__init__(interface)
if self.interface.name == "telegram" and self.config["Brawlhalla"]["updater"]["enabled"]:
self.loop.create_task(self._updater(int(self.config["Brawlhalla"]["updater"]["delay"])))
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Brawlhalla]:
return user.steam
async def _send(self, message):
client = self.serf.client
await self.serf.api_call(client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=tg_escape(message),
parse_mode="HTML",
disable_webpage_preview=True)
async def get_updatables(self, session) -> List[Brawlhalla]:
return await asyncify(session.query(self.alchemy.get(Steam)).all)
async def create(self, session, user: rbt.User, args) -> Updatable:
raise InvalidInputError("Brawlhalla accounts are automatically linked from Steam.")
async def update(self, session, obj, change: Callable[[str, Any], Awaitable[None]]):
BrawlhallaT = self.alchemy.get(Brawlhalla)
DuoT = self.alchemy.get(BrawlhallaDuo)
log.info(f"Updating: {obj}")
async with aiohttp.ClientSession() as hcs:
bh: Brawlhalla = obj.brawlhalla
if bh is None:
log.debug(f"Checking if player has an account...")
async with hcs.get(f"https://api.brawlhalla.com/search?steamid={obj.steamid.as_64}&api_key={self.config['Brawlhalla']['api_key']}") as response:
if response.status != 200:
raise ExternalError(f"Brawlhalla API /search returned {response.status}!")
j = await response.json()
if j == {} or j == []:
log.debug("No account found.")
return
bh = BrawlhallaT(
steam=obj,
brawlhalla_id=j["brawlhalla_id"],
name=j["name"]
)
session.add(bh)
session.flush()
async with hcs.get(f"https://api.brawlhalla.com/player/{bh.brawlhalla_id}/ranked?api_key={self.config['Brawlhalla']['api_key']}") as response:
if response.status != 200:
raise ExternalError(f"Brawlhalla API /ranked returned {response.status}!")
j = await response.json()
if j == {} or j == []:
log.debug("No ranked info found.")
else:
await self._change(session=session, obj=bh, attribute="rating_1v1", new=j["rating"])
metal_name, tier_name = j["tier"].split(" ", 1)
metal = BrawlhallaMetal[metal_name.upper()]
tier = BrawlhallaTier(int(tier_name))
rank = BrawlhallaRank(metal=metal, tier=tier)
await self._change(session=session, obj=bh, attribute="rank_1v1", new=rank)
for jduo in j.get("2v2", []):
bhduo: Optional[BrawlhallaDuo] = await asyncify(
session.query(DuoT)
.filter(
or_(
and_(
DuoT.id_one == jduo["brawlhalla_id_one"],
DuoT.id_two == jduo["brawlhalla_id_two"]
),
and_(
DuoT.id_one == jduo["brawlhalla_id_two"],
DuoT.id_two == jduo["brawlhalla_id_one"]
)
)
)
.one_or_none
)
if bhduo is None:
if bh.brawlhalla_id == jduo["brawlhalla_id_one"]:
otherbh: Optional[Brawlhalla] = await asyncify(
session.query(BrawlhallaT).get, jduo["brawlhalla_id_two"]
)
else:
otherbh: Optional[Brawlhalla] = await asyncify(
session.query(BrawlhallaT).get, jduo["brawlhalla_id_one"]
)
if otherbh is None:
continue
bhduo = DuoT(
one=bh,
two=otherbh,
)
session.add(bhduo)
await self._change(session=session, obj=bhduo, attribute="rating_2v2", new=jduo["rating"])
metal_name, tier_name = jduo["tier"].split(" ", 1)
metal = BrawlhallaMetal[metal_name.upper()]
tier = BrawlhallaTier(int(tier_name))
rank = BrawlhallaRank(metal=metal, tier=tier)
await self._change(session=session, obj=bhduo, attribute="rank_2v2", new=rank)
async def on_increase(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: Any) -> None:
if attribute == "rank_1v1":
await self.notify(f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla! Congratulazioni!")
elif attribute == "rank_2v2":
await self.notify(f"📈 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] sono saliti a [b]{new}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla! Congratulazioni!")
async def on_unchanged(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: Any) -> None:
pass
async def on_decrease(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: Any) -> None:
if attribute == "rank_1v1":
await self.notify(f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla.")
elif attribute == "rank_2v2":
await self.notify(f"📉 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] sono scesi a [b]{new}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla.")
async def on_first(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: None, new: Any) -> None:
if attribute == "rank_1v1":
await self.notify(f"🌟 [b]{obj.steam.user}[/b] si è classificato a [b]{new}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla!")
elif attribute == "rank_2v2":
await self.notify(f"🌟 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] si sono classificati a [b]{new}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla!")
async def on_reset(self, session, obj: Union[Brawlhalla, BrawlhallaDuo], attribute: str, old: Any, new: None) -> None:
if attribute == "rank_1v1":
await self.notify(f"⬜️ [b]{obj.steam.user}[/b] non ha più un rank su Brawlhalla.")
elif attribute == "rank_2v2":
await self.notify(f"⬜️ [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] non hanno più un rank su Brawlhalla.")
def describe(self, obj: Steam) -> str:
bh = obj.brawlhalla
@staticmethod
def _display(bh: Brawlhalla) -> str:
string = [f" [b]{bh.name}[/b]", ""]
if bh.rank_1v1:
@ -56,150 +160,3 @@ class BrawlhallaCommand(Command):
string.append("")
return "\n".join(string)
async def _notify(self,
obj: Union[Brawlhalla, BrawlhallaDuo],
attribute_name: str,
old_value: Any,
new_value: Any):
if attribute_name == "rank_1v1":
old_rank: Optional[BrawlhallaRank] = old_value
new_rank: Optional[BrawlhallaRank] = new_value
if new_rank > old_rank:
message = f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new_value}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla! Congratulazioni!"
else:
message = f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new_value}[/b] ({obj.rating_1v1} MMR) in 1v1 su Brawlhalla."
await self._send(message)
elif attribute_name == "rank_2v2":
old_rank: Optional[BrawlhallaRank] = old_value
new_rank: Optional[BrawlhallaRank] = new_value
if new_rank > old_rank:
message = f"📈 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] sono saliti a [b]{new_value}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla! Congratulazioni!"
else:
message = f"📉 [b]{obj.one.steam.user}[/b] e [b]{obj.two.steam.user}[/b] sono scesi a [b]{new_value}[/b] ({obj.rating_2v2} MMR) in 2v2 su Brawlhalla."
await self._send(message)
@staticmethod
async def _change(obj: Union[Brawlhalla, BrawlhallaDuo],
attribute_name: str,
new_value: Any,
callback: Callable[[Union[Brawlhalla, BrawlhallaDuo], str, Any, Any], Awaitable[None]]):
old_value = obj.__getattribute__(attribute_name)
if old_value != new_value:
await callback(obj, attribute_name, old_value, new_value)
obj.__setattr__(attribute_name, new_value)
async def _update(self, steam: Steam, db_session):
BrawlhallaT = self.alchemy.get(Brawlhalla)
DuoT = self.alchemy.get(BrawlhallaDuo)
log.info(f"Updating: {steam}")
async with aiohttp.ClientSession() as session:
bh: Brawlhalla = steam.brawlhalla
if bh is None:
log.debug(f"Checking if player has an account...")
async with session.get(f"https://api.brawlhalla.com/search?steamid={steam.steamid.as_64}&api_key={self.config['Brawlhalla']['api_key']}") as response:
if response.status != 200:
raise ExternalError(f"Brawlhalla API /search returned {response.status}!")
j = await response.json()
if j == {} or j == []:
log.debug("No account found.")
return
bh = BrawlhallaT(
steam=steam,
brawlhalla_id=j["brawlhalla_id"],
name=j["name"]
)
db_session.add(bh)
message = f"↔️ Account {bh} connesso a {bh.steam.user}!"
await self._send(message)
async with session.get(f"https://api.brawlhalla.com/player/{bh.brawlhalla_id}/ranked?api_key={self.config['Brawlhalla']['api_key']}") as response:
if response.status != 200:
raise ExternalError(f"Brawlhalla API /ranked returned {response.status}!")
j = await response.json()
if j == {} or j == []:
log.debug("No ranked info found.")
else:
await self._change(bh, "rating_1v1", j["rating"], self._notify)
metal_name, tier_name = j["tier"].split(" ", 1)
metal = BrawlhallaMetal[metal_name.upper()]
tier = BrawlhallaTier(int(tier_name))
rank = BrawlhallaRank(metal=metal, tier=tier)
await self._change(bh, "rank_1v1", rank, self._notify)
for jduo in j.get("2v2", []):
bhduo: Optional[BrawlhallaDuo] = await asyncify(
db_session.query(DuoT)
.filter(
or_(
and_(
DuoT.id_one == jduo["brawlhalla_id_one"],
DuoT.id_two == jduo["brawlhalla_id_two"]
),
and_(
DuoT.id_one == jduo["brawlhalla_id_two"],
DuoT.id_two == jduo["brawlhalla_id_one"]
)
)
)
.one_or_none
)
if bhduo is None:
if bh.brawlhalla_id == jduo["brawlhalla_id_one"]:
otherbh: Optional[Brawlhalla] = await asyncify(
db_session.query(BrawlhallaT).get, jduo["brawlhalla_id_two"]
)
else:
otherbh: Optional[Brawlhalla] = await asyncify(
db_session.query(BrawlhallaT).get, jduo["brawlhalla_id_one"]
)
if otherbh is None:
continue
bhduo = DuoT(
one=bh,
two=otherbh,
)
db_session.add(bhduo)
await self._change(bhduo, "rating_2v2", jduo["rating"], self._notify)
metal_name, tier_name = jduo["tier"].split(" ", 1)
metal = BrawlhallaMetal[metal_name.upper()]
tier = BrawlhallaTier(int(tier_name))
rank = BrawlhallaRank(metal=metal, tier=tier)
await self._change(bhduo, "rank_2v2", rank, self._notify)
await asyncify(db_session.commit)
async def _updater(self, period: int):
log.info(f"Started updater with {period}s period")
while True:
log.info(f"Updating...")
session = self.alchemy.Session()
log.info("")
steams = session.query(self.alchemy.get(Steam)).all()
for steam in steams:
try:
await self._update(steam, session)
except Exception as e:
sentry_exc(e)
await asyncio.sleep(1)
await asyncify(session.commit)
session.close()
log.info(f"Sleeping for {period}s")
await asyncio.sleep(period)
async def run(self, args: CommandArgs, data: CommandData) -> None:
author = await data.get_author(error_if_none=True)
found_something = False
message = ""
for steam in author.steam:
await self._update(steam, data.session)
if steam.brawlhalla is None:
continue
found_something = True
message += self._display(steam.brawlhalla)
message += "\n"
if not found_something:
raise UserError("Nessun account di Brawlhalla trovato.")
await data.reply(message)

View file

@ -1,11 +1,10 @@
from typing import *
import asyncio
import logging
import sentry_sdk
import aiohttp
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.serf.telegram as rst
from royalnet.backpack import tables as rbt
from .abstract.linker import LinkerCommand
from ..tables import Steam, Dota
from ..types import DotaRank
@ -13,139 +12,91 @@ from ..types import DotaRank
log = logging.getLogger(__name__)
class DotaCommand(rc.Command):
class DotaCommand(LinkerCommand):
name: str = "dota"
aliases = ["dota2", "doto", "doto2", "dotka", "dotka2"]
description: str = "Visualizza le tue statistiche di Dota!"
description: str = "Visualizza le tue statistiche di Dota."
syntax: str = ""
def __init__(self, interface: rc.CommandInterface):
super().__init__(interface)
if self.interface.name == "telegram" and self.config["Dota"]["updater"]["enabled"]:
self.loop.create_task(self._updater(int(self.config["Dota"]["updater"]["delay"])))
async def _send(self, message):
client = self.serf.client
await self.serf.api_call(client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=rst.escape(message),
parse_mode="HTML",
disable_webpage_preview=True)
@staticmethod
def _display(dota: Dota) -> str:
string = f" [b]{dota.steam.persona_name}[/b]\n"
if dota.rank:
string += f"{dota.rank}\n"
def describe(self, obj: Steam) -> str:
string = f" [b]{obj.persona_name}[/b]\n"
if obj.dota.rank:
string += f"{obj.dota.rank}\n"
string += f"\n" \
f"Wins: [b]{dota.wins}[/b]\n" \
f"Losses: [b]{dota.losses}[/b]\n" \
f"Wins: [b]{obj.dota.wins}[/b]\n" \
f"Losses: [b]{obj.dota.losses}[/b]\n" \
f"\n"
return string
async def _notify(self,
obj: Dota,
attribute_name: str,
old_value: Any,
new_value: Any):
if attribute_name == "wins":
if old_value is None:
message = f"↔️ Account {obj} connesso a {obj.steam.user}!"
await self._send(message)
elif attribute_name == "rank":
old_rank: Optional[DotaRank] = old_value
new_rank: Optional[DotaRank] = new_value
if new_rank > old_rank:
message = f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new_value}[/b] su Dota 2! Congratulazioni!"
elif new_rank < old_rank:
message = f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new_value}[/b] su Dota 2."
else:
return
await self._send(message)
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Dota]:
return user.steam
@staticmethod
async def _change(obj: Dota,
attribute_name: str,
new_value: Any,
callback: Callable[[Dota, str, Any, Any], Awaitable[None]]):
old_value = obj.__getattribute__(attribute_name)
if old_value != new_value:
await callback(obj, attribute_name, old_value, new_value)
obj.__setattr__(attribute_name, new_value)
async def get_updatables(self, session) -> List[Dota]:
return await ru.asyncify(session.query(self.alchemy.get(Steam)).all)
async def _update(self, steam: Steam, db_session):
log.info(f"Updating: {steam}")
async def create(self, session, user: rbt.User, args):
raise rc.InvalidInputError("Dota accounts are automatically linked from Steam.")
async def update(self, session, obj: Steam, change: Callable[[str, Any], Awaitable[None]]):
log.debug(f"Getting player data from OpenDota...")
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession() as hcs:
# Get profile data
async with session.get(f"https://api.opendota.com/api/players/{steam.steamid.as_32}/") as response:
async with hcs.get(f"https://api.opendota.com/api/players/{obj.steamid.as_32}/") as response:
if response.status != 200:
raise rc.ExternalError(f"OpenDota / returned {response.status}!")
p = await response.json()
# No such user
if "profile" not in p:
log.debug(f"Not found: {steam}")
log.debug(f"Not found: {obj}")
return
# Get win/loss data
async with session.get(f"https://api.opendota.com/api/players/{steam.steamid.as_32}/wl") as response:
async with hcs.get(f"https://api.opendota.com/api/players/{obj.steamid.as_32}/wl") as response:
if response.status != 200:
raise rc.ExternalError(f"OpenDota /wl returned {response.status}!")
wl = await response.json()
# No such user
if wl["win"] == 0 and wl["lose"] == 0:
log.debug(f"Not found: {steam}")
log.debug(f"Not found: {obj}")
return
# Find the Dota record, if it exists
dota: Dota = steam.dota
dota: Dota = obj.dota
if dota is None:
dota = self.alchemy.get(Dota)(steam=steam)
db_session.add(dota)
db_session.flush()
await self._change(dota, "wins", wl["win"], self._notify)
await self._change(dota, "losses", wl["lose"], self._notify)
# Autocreate the Dota record
dota = self.alchemy.get(Dota)(steam=obj)
session.add(dota)
session.flush()
# Make a custom change function
async def change(attribute: str, new: Any):
await self._change(session=session, obj=dota, attribute=attribute, new=new)
await change("wins", wl["win"])
await change("losses", wl["lose"])
if p["rank_tier"]:
await self._change(dota, "rank", DotaRank(rank_tier=p["rank_tier"]), self._notify)
await change("rank", DotaRank(rank_tier=p["rank_tier"]))
else:
await self._change(dota, "rank", None, self._notify)
await change("rank", None)
async def _updater(self, period: int):
log.info(f"Started updater with {period}s period")
while True:
log.info(f"Updating...")
session = self.alchemy.Session()
log.info("")
steams = session.query(self.alchemy.get(Steam)).all()
for steam in steams:
try:
await self._update(steam, session)
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Error while updating {steam.user.username}: {e}")
await asyncio.sleep(1)
await ru.asyncify(session.commit)
session.close()
log.info(f"Sleeping for {period}s")
await asyncio.sleep(period)
async def on_increase(self, session, obj: Dota, attribute: str, old: Any, new: Any) -> None:
if attribute == "rank":
await self.notify(f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new}[/b] su Dota 2! Congratulazioni!")
async def run(self, args: rc.CommandArgs, data: rc.CommandData) -> None:
author = await data.get_author(error_if_none=True)
async def on_unchanged(self, session, obj: Dota, attribute: str, old: Any, new: Any) -> None:
pass
found_something = False
async def on_decrease(self, session, obj: Dota, attribute: str, old: Any, new: Any) -> None:
if attribute == "rank":
await self.notify(f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new}[/b] su Dota 2.")
message = ""
for steam in author.steam:
await self._update(steam, data.session)
if steam.dota is None:
continue
found_something = True
message += self._display(steam.dota)
message += "\n"
if not found_something:
raise rc.UserError("Nessun account di Dota 2 trovato.")
await data.reply(message)
async def on_first(self, session, obj: Dota, attribute: str, old: None, new: Any) -> None:
if attribute == "wins":
await self.notify(f"↔️ Account {obj} connesso a {obj.steam.user}!")
elif attribute == "rank":
await self.notify(f"🌟 [b]{obj.steam.user}[/b] si è classificato [b]{new}[/b] su Dota 2!")
async def on_reset(self, session, obj: Dota, attribute: str, old: Any, new: None) -> None:
if attribute == "rank":
await self.notify(f"⬜️ [b]{obj.steam.user}[/b] non ha più un rank su Dota 2.")

View file

@ -6,168 +6,78 @@ import sentry_sdk
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.serf.telegram as rst
from royalnet.backpack import tables as rbt
from .abstract.linker import LinkerCommand
from ..tables import LeagueOfLegends, FiorygiTransaction
from ..types import LeagueLeague
from ..types import LeagueLeague, Updatable
log = logging.getLogger(__name__)
class LeagueoflegendsCommand(rc.Command):
class LeagueoflegendsCommand(LinkerCommand):
name: str = "leagueoflegends"
aliases = ["lol", "league"]
description: str = "Connetti un account di League of Legends a un account Royalnet, e visualizzane le statistiche."
description: str = "Connetti un account di League of Legends a un account Royalnet, o visualizzane le statistiche."
syntax = "[nomeevocatore]"
def __init__(self, interface: rc.CommandInterface):
super().__init__(interface)
self._riotwatcher: Optional[riotwatcher.RiotWatcher] = None
if self.interface.name == "telegram" and self.config["Lol"]["updater"]["enabled"]:
self._riotwatcher = riotwatcher.RiotWatcher(api_key=self.config["Lol"]["token"])
self.loop.create_task(self._updater(int(self.config["Lol"]["updater"]["delay"])))
async def _send(self, message):
client = self.serf.client
await self.serf.api_call(client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=rst.escape(message),
parse_mode="HTML",
disable_webpage_preview=True)
async def _notify(self,
obj: LeagueOfLegends,
attribute_name: str,
old_value: Any,
new_value: Any):
if isinstance(old_value, LeagueLeague):
# This is a rank change!
# Don't send messages for every rank change, send messages just if the TIER or RANK changes!
if old_value.tier == new_value.tier and old_value.rank == new_value.rank:
return
# Find the queue
queue_names = {
"rank_soloq": "Solo/Duo",
"rank_flexq": "Flex",
"rank_twtrq": "3v3",
"rank_tftq": "TFT"
}
# Prepare the message
if new_value > old_value:
message = f"📈 [b]{obj.user}[/b] è salito a {new_value} su League of Legends " \
f"({queue_names[attribute_name]})! Congratulazioni!"
else:
message = f"📉 [b]{obj.user}[/b] è sceso a {new_value} su League of Legends " \
f"({queue_names[attribute_name]})."
# Send the message
await self._send(message)
# Level up!
elif attribute_name == "summoner_level":
if new_value == 30 or (new_value >= 50 and (new_value % 25 == 0)):
await self._send(f"🆙 [b]{obj.user}[/b] è salito al livello [b]{new_value}[/b] su League of Legends!")
@staticmethod
async def _change(obj: LeagueOfLegends,
attribute_name: str,
new_value: Any,
callback: Callable[
[LeagueOfLegends, str, Any, Any], Awaitable[None]]):
old_value = obj.__getattribute__(attribute_name)
if old_value != new_value:
await callback(obj, attribute_name, old_value, new_value)
obj.__setattr__(attribute_name, new_value)
def __init__(self, interface: rc.CommandInterface):
super().__init__(interface)
self._riotwatcher: Optional[riotwatcher.RiotWatcher] = None
if self.interface.name == "telegram" and self.enabled():
self._riotwatcher = riotwatcher.RiotWatcher(api_key=self.token())
async def _update(self, lol: LeagueOfLegends):
log.info(f"Updating: {lol}")
log.debug(f"Getting summoner data: {lol}")
summoner = await ru.asyncify(self._riotwatcher.summoner.by_id, region=self.config["Lol"]["region"],
encrypted_summoner_id=lol.summoner_id)
await self._change(lol, "profile_icon_id", summoner["profileIconId"], self._notify)
await self._change(lol, "summoner_name", summoner["name"], self._notify)
await self._change(lol, "puuid", summoner["puuid"], self._notify)
await self._change(lol, "summoner_level", summoner["summonerLevel"], self._notify)
await self._change(lol, "summoner_id", summoner["id"], self._notify)
await self._change(lol, "account_id", summoner["accountId"], self._notify)
log.debug(f"Getting leagues data: {lol}")
leagues = await ru.asyncify(self._riotwatcher.league.by_summoner, region=self.config["Lol"]["region"],
encrypted_summoner_id=lol.summoner_id)
soloq = LeagueLeague()
flexq = LeagueLeague()
twtrq = LeagueLeague()
tftq = LeagueLeague()
for league in leagues:
if league["queueType"] == "RANKED_SOLO_5x5":
soloq = LeagueLeague.from_dict(league)
if league["queueType"] == "RANKED_FLEX_SR":
flexq = LeagueLeague.from_dict(league)
if league["queueType"] == "RANKED_FLEX_TT":
twtrq = LeagueLeague.from_dict(league)
if league["queueType"] == "RANKED_TFT":
tftq = LeagueLeague.from_dict(league)
await self._change(lol, "rank_soloq", soloq, self._notify)
await self._change(lol, "rank_flexq", flexq, self._notify)
await self._change(lol, "rank_twtrq", twtrq, self._notify)
await self._change(lol, "rank_tftq", tftq, self._notify)
log.debug(f"Getting mastery data: {lol}")
mastery = await ru.asyncify(self._riotwatcher.champion_mastery.scores_by_summoner,
region=self.config["Lol"]["region"],
encrypted_summoner_id=lol.summoner_id)
await self._change(lol, "mastery_score", mastery, self._notify)
def token(self):
return self.config["leagueoflegends"]["token"]
async def _updater(self, period: int):
log.info(f"Started updater with {period}s period")
while True:
log.info(f"Updating...")
session = self.alchemy.Session()
log.info("")
lols = session.query(self.alchemy.get(LeagueOfLegends)).all()
for lol in lols:
try:
await self._update(lol)
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Error while updating {lol.user.username}: {e}")
await asyncio.sleep(1)
await ru.asyncify(session.commit)
session.close()
log.info(f"Sleeping for {period}s")
await asyncio.sleep(period)
def region(self):
return self.config["leagueoflegends"]["region"]
@staticmethod
def _display(lol: LeagueOfLegends) -> str:
string = f" [b]{lol.summoner_name}[/b]\n" \
f"Lv. {lol.summoner_level}\n" \
f"Mastery score: {lol.mastery_score}\n" \
def describe(self, obj: LeagueOfLegends) -> str:
string = f" [b]{obj.summoner_name}[/b]\n" \
f"Lv. {obj.summoner_level}\n" \
f"Mastery score: {obj.mastery_score}\n" \
f"\n"
if lol.rank_soloq:
string += f"Solo: {lol.rank_soloq}\n"
if lol.rank_flexq:
string += f"Flex: {lol.rank_flexq}\n"
if lol.rank_twtrq:
string += f"3v3: {lol.rank_twtrq}\n"
if lol.rank_tftq:
string += f"TFT: {lol.rank_tftq}\n"
if obj.rank_soloq:
string += f"Solo: {obj.rank_soloq}\n"
if obj.rank_flexq:
string += f"Flex: {obj.rank_flexq}\n"
if obj.rank_twtrq:
string += f"3v3: {obj.rank_twtrq}\n"
if obj.rank_tftq:
string += f"TFT: {obj.rank_tftq}\n"
return string
async def run(self, args: rc.CommandArgs, data: rc.CommandData) -> None:
author = await data.get_author(error_if_none=True)
async def get_updatables_of_user(self, session, user: rbt.User) -> List[LeagueOfLegends]:
return await ru.asyncify(session.query(self.alchemy.get(LeagueOfLegends)).filter_by(user=user).all)
async def get_updatables(self, session) -> List[LeagueOfLegends]:
return await ru.asyncify(session.query(self.alchemy.get(LeagueOfLegends)).all)
async def create(self, session, user: rbt.User, args) -> LeagueOfLegends:
name = args.joined()
if name:
# Connect a new League of Legends account to Royalnet
log.debug(f"Searching for: {name}")
summoner = self._riotwatcher.summoner.by_name(region=self.config["Lol"]["region"], summoner_name=name)
summoner = self._riotwatcher.summoner.by_name(region=self.region(), summoner_name=name)
# Ensure the account isn't already connected to something else
leagueoflegends = await ru.asyncify(
data.session.query(self.alchemy.get(LeagueOfLegends)).filter_by(summoner_id=summoner["id"]).one_or_none)
session.query(self.alchemy.get(LeagueOfLegends)).filter_by(summoner_id=summoner["id"]).one_or_none)
if leagueoflegends:
raise rc.CommandError(f"L'account {leagueoflegends} è già registrato su Royalnet.")
# Get rank information
log.debug(f"Getting leagues data: {name}")
leagues = self._riotwatcher.league.by_summoner(region=self.config["Lol"]["region"],
leagues = self._riotwatcher.league.by_summoner(region=self.region(),
encrypted_summoner_id=summoner["id"])
soloq = LeagueLeague()
flexq = LeagueLeague()
@ -184,12 +94,12 @@ class LeagueoflegendsCommand(rc.Command):
tftq = LeagueLeague.from_dict(league)
# Get mastery score
log.debug(f"Getting mastery data: {name}")
mastery = self._riotwatcher.champion_mastery.scores_by_summoner(region=self.config["Lol"]["region"],
mastery = self._riotwatcher.champion_mastery.scores_by_summoner(region=self.region(),
encrypted_summoner_id=summoner["id"])
# Create database row
leagueoflegends = self.alchemy.get(LeagueOfLegends)(
region=self.config["Lol"]["region"],
user=author,
region=self.region(),
user=user,
profile_icon_id=summoner["profileIconId"],
summoner_name=summoner["name"],
puuid=summoner["puuid"],
@ -202,24 +112,60 @@ class LeagueoflegendsCommand(rc.Command):
rank_tftq=tftq,
mastery_score=mastery
)
log.debug(f"Saving to the DB: {name}")
data.session.add(leagueoflegends)
await data.session_commit()
await data.reply(f"↔️ Account {leagueoflegends} connesso a {author}!")
await FiorygiTransaction.spawn_fiorygi(data, author, 1,
"aver connesso il proprio account di League of Legends a Royalnet")
else:
# Update and display the League of Legends stats for the current account
if len(author.leagueoflegends) == 0:
raise rc.UserError("Nessun account di League of Legends trovato.")
message = ""
for account in author.leagueoflegends:
try:
await self._update(account)
message += self._display(account)
except riotwatcher.ApiError as e:
message += f"⚠️ [b]{account.summoner_name}[/b]\n" \
f"{e}"
message += "\n"
await data.session_commit()
await data.reply(message)
session.add(leagueoflegends)
return leagueoflegends
async def update(self, session, obj: LeagueOfLegends, change: Callable[[str, Any], Awaitable[None]]):
log.debug(f"Getting summoner data: {obj}")
summoner = await ru.asyncify(self._riotwatcher.summoner.by_id, region=self.region(),
encrypted_summoner_id=obj.summoner_id)
await change("profile_icon_id", summoner["profileIconId"])
await change("summoner_name", summoner["name"])
await change("puuid", summoner["puuid"])
await change("summoner_level", summoner["summonerLevel"])
await change("summoner_id", summoner["id"])
await change("account_id", summoner["accountId"])
log.debug(f"Getting leagues data: {obj}")
leagues = await ru.asyncify(self._riotwatcher.league.by_summoner, region=self.region(),
encrypted_summoner_id=obj.summoner_id)
soloq = LeagueLeague()
flexq = LeagueLeague()
twtrq = LeagueLeague()
tftq = LeagueLeague()
for league in leagues:
if league["queueType"] == "RANKED_SOLO_5x5":
soloq = LeagueLeague.from_dict(league)
if league["queueType"] == "RANKED_FLEX_SR":
flexq = LeagueLeague.from_dict(league)
if league["queueType"] == "RANKED_FLEX_TT":
twtrq = LeagueLeague.from_dict(league)
if league["queueType"] == "RANKED_TFT":
tftq = LeagueLeague.from_dict(league)
await change("rank_soloq", soloq)
await change("rank_flexq", flexq)
await change("rank_twtrq", twtrq)
await change("rank_tftq", tftq)
log.debug(f"Getting mastery data: {obj}")
mastery = await ru.asyncify(self._riotwatcher.champion_mastery.scores_by_summoner,
region=self.region(),
encrypted_summoner_id=obj.summoner_id)
await change("mastery_score", mastery)
async def on_increase(self, session, obj: LeagueOfLegends, attribute: str, old: Any, new: Any) -> None:
if attribute in self.queue_names.keys():
await self.notify(f"📈 [b]{obj.user}[/b] è salito a {new} su League of Legends ({self.queue_names[attribute]})! Congratulazioni!")
async def on_unchanged(self, session, obj: LeagueOfLegends, attribute: str, old: Any, new: Any) -> None:
pass
async def on_decrease(self, session, obj: LeagueOfLegends, attribute: str, old: Any, new: Any) -> None:
if attribute in self.queue_names.keys():
await self.notify(f"📉 [b]{obj.user}[/b] è sceso a {new} su League of Legends ({self.queue_names[attribute]}).")
async def on_first(self, session, obj: LeagueOfLegends, attribute: str, old: None, new: Any) -> None:
if attribute in self.queue_names.keys():
await self.notify(f"🌟 [b]{obj.user}[/b] si è classificato {new} su League of Legends ({self.queue_names[attribute]}!")
async def on_reset(self, session, obj: LeagueOfLegends, attribute: str, old: Any, new: None) -> None:
if attribute in self.queue_names.keys():
await self.notify(f"⬜️ [b]{obj.user}[/b] non ha più un rank su League of Legends ({self.queue_names[attribute]}).")

View file

@ -2,11 +2,11 @@ from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declared_attr
import steam.steamid
from ..types import BrawlhallaRank, BrawlhallaTier, BrawlhallaMetal
from ..types import BrawlhallaRank, BrawlhallaTier, BrawlhallaMetal, Updatable
# noinspection PyAttributeOutsideInit
class Brawlhalla:
class Brawlhalla(Updatable):
__tablename__ = "brawlhalla"
@declared_attr

View file

@ -2,11 +2,11 @@ from typing import *
from sqlalchemy import *
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declared_attr
from ..types import DotaMedal, DotaStars, DotaRank
from ..types import DotaMedal, DotaStars, DotaRank, Updatable
import steam.steamid
class Dota:
class Dota(Updatable):
__tablename__ = "dota"
@declared_attr

View file

@ -1,10 +1,10 @@
from sqlalchemy import *
from sqlalchemy.orm import relationship, composite
from sqlalchemy.ext.declarative import declared_attr
from ..types import LeagueRank, LeagueTier, LeagueLeague
from ..types import LeagueRank, LeagueTier, LeagueLeague, Updatable
class LeagueOfLegends:
class LeagueOfLegends(Updatable):
__tablename__ = "leagueoflegends"
@declared_attr

View file

@ -10,6 +10,7 @@ from .brawlhallatier import BrawlhallaTier
from .brawlhallametal import BrawlhallaMetal
from .brawlhallarank import BrawlhallaRank
from .pollmood import PollMood
from .updatable import Updatable
__all__ = [
@ -26,4 +27,5 @@ __all__ = [
"BrawlhallaRank",
"BrawlhallaTier",
"PollMood",
"Updatable",
]

View file

@ -0,0 +1,2 @@
class Updatable:
pass

View file

@ -1 +1 @@
semantic = "5.11.1"
semantic = "5.12.0"