1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 21:44:21 +00:00
royalnet/royalpack/commands/dota.py

103 lines
4.1 KiB
Python
Raw Normal View History

2020-05-10 22:46:12 +00:00
from typing import *
2020-01-24 00:07:11 +00:00
import logging
import aiohttp
2020-05-10 22:46:12 +00:00
import royalnet.commands as rc
import royalnet.utils as ru
2020-07-11 00:14:38 +00:00
from royalnet.backpack import tables as rbt
from .abstract.linker import LinkerCommand
2020-05-10 22:46:12 +00:00
2020-01-24 00:07:11 +00:00
from ..tables import Steam, Dota
2020-03-19 15:26:11 +00:00
from ..types import DotaRank
2020-01-24 00:07:11 +00:00
log = logging.getLogger(__name__)
2020-07-11 00:14:38 +00:00
class DotaCommand(LinkerCommand):
2020-01-24 00:07:11 +00:00
name: str = "dota"
2020-03-19 15:26:11 +00:00
aliases = ["dota2", "doto", "doto2", "dotka", "dotka2"]
2020-01-24 00:07:11 +00:00
2020-07-11 00:14:38 +00:00
description: str = "Visualizza le tue statistiche di Dota."
2020-01-24 00:07:11 +00:00
syntax: str = ""
2020-07-11 00:14:38 +00:00
def describe(self, obj: Steam) -> str:
string = f" [b]{obj.persona_name}[/b]\n"
if obj.dota.rank:
string += f"{obj.dota.rank}\n"
2020-01-24 00:07:11 +00:00
string += f"\n" \
2020-07-11 00:14:38 +00:00
f"Wins: [b]{obj.dota.wins}[/b]\n" \
f"Losses: [b]{obj.dota.losses}[/b]\n" \
2020-01-24 00:07:11 +00:00
f"\n"
return string
2020-07-11 00:14:38 +00:00
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Dota]:
return user.steam
async def get_updatables(self, session) -> List[Dota]:
return await ru.asyncify(session.query(self.alchemy.get(Steam)).all)
async def create(self, session, user: rbt.User, args):
raise rc.InvalidInputError("Dota accounts are automatically linked from Steam.")
async def update(self, session, obj: Steam, change: Callable[[str, Any], Awaitable[None]]):
2020-01-24 00:07:11 +00:00
log.debug(f"Getting player data from OpenDota...")
2020-07-11 00:14:38 +00:00
async with aiohttp.ClientSession() as hcs:
2020-01-24 00:07:11 +00:00
# Get profile data
2020-07-11 00:14:38 +00:00
async with hcs.get(f"https://api.opendota.com/api/players/{obj.steamid.as_32}/") as response:
2020-01-24 00:07:11 +00:00
if response.status != 200:
2020-05-10 22:46:12 +00:00
raise rc.ExternalError(f"OpenDota / returned {response.status}!")
2020-01-24 00:07:11 +00:00
p = await response.json()
# No such user
if "profile" not in p:
2020-07-11 00:14:38 +00:00
log.debug(f"Not found: {obj}")
2020-01-24 00:07:11 +00:00
return
# Get win/loss data
2020-07-11 00:14:38 +00:00
async with hcs.get(f"https://api.opendota.com/api/players/{obj.steamid.as_32}/wl") as response:
2020-01-24 00:07:11 +00:00
if response.status != 200:
2020-05-10 22:46:12 +00:00
raise rc.ExternalError(f"OpenDota /wl returned {response.status}!")
2020-01-24 00:07:11 +00:00
wl = await response.json()
# No such user
if wl["win"] == 0 and wl["lose"] == 0:
2020-07-11 00:14:38 +00:00
log.debug(f"Not found: {obj}")
2020-01-24 00:07:11 +00:00
return
# Find the Dota record, if it exists
2020-07-11 00:14:38 +00:00
dota: Dota = obj.dota
2020-01-24 00:07:11 +00:00
if dota is None:
2020-07-11 00:14:38 +00:00
# Autocreate the Dota record
dota = self.alchemy.get(Dota)(steam=obj)
session.add(dota)
session.flush()
# Make a custom change function
async def change(attribute: str, new: Any):
await self._change(session=session, obj=dota, attribute=attribute, new=new)
await change("wins", wl["win"])
await change("losses", wl["lose"])
2020-01-24 00:07:11 +00:00
if p["rank_tier"]:
2020-07-11 00:14:38 +00:00
await change("rank", DotaRank(rank_tier=p["rank_tier"]))
2020-01-24 00:07:11 +00:00
else:
2020-07-11 00:14:38 +00:00
await change("rank", None)
async def on_increase(self, session, obj: Dota, attribute: str, old: Any, new: Any) -> None:
if attribute == "rank":
await self.notify(f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new}[/b] su Dota 2! Congratulazioni!")
async def on_unchanged(self, session, obj: Dota, attribute: str, old: Any, new: Any) -> None:
pass
async def on_decrease(self, session, obj: Dota, attribute: str, old: Any, new: Any) -> None:
if attribute == "rank":
await self.notify(f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new}[/b] su Dota 2.")
async def on_first(self, session, obj: Dota, attribute: str, old: None, new: Any) -> None:
if attribute == "wins":
await self.notify(f"↔️ Account {obj} connesso a {obj.steam.user}!")
elif attribute == "rank":
await self.notify(f"🌟 [b]{obj.steam.user}[/b] si è classificato [b]{new}[/b] su Dota 2!")
async def on_reset(self, session, obj: Dota, attribute: str, old: Any, new: None) -> None:
if attribute == "rank":
await self.notify(f"⬜️ [b]{obj.steam.user}[/b] non ha più un rank su Dota 2.")