1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Add dota command

This commit is contained in:
Steffo 2020-01-24 01:07:11 +01:00
parent 0b6139ac90
commit 7b4a3b8fb3
11 changed files with 365 additions and 32 deletions

View file

@ -33,6 +33,7 @@ from .exec import ExecCommand
from .trivia import TriviaCommand
from .steampowered import SteampoweredCommand
from .steammatch import SteammatchCommand
from .dota import DotaCommand
# Enter the commands of your Pack here!
available_commands = [
@ -69,7 +70,8 @@ available_commands = [
FunkwhaleCommand,
TriviaCommand,
SteampoweredCommand,
SteammatchCommand
SteammatchCommand,
DotaCommand,
]
# Don't change this, it should automatically generate __all__

152
royalpack/commands/dota.py Normal file
View file

@ -0,0 +1,152 @@
import asyncio
import logging
import sentry_sdk
import aiohttp
from typing import *
from royalnet.commands import *
from royalnet.utils import *
from royalnet.serf.telegram.escape import escape as tg_escape
from ..tables import Steam, Dota
from ..utils import DotaRank
log = logging.getLogger(__name__)
class DotaCommand(Command):
name: str = "dota"
aliases = ["dota2", "doto", "doto2"]
description: str = "Visualizza le tue statistiche di Dota!"
syntax: str = ""
def __init__(self, interface: CommandInterface):
super().__init__(interface)
if self.interface.name == "telegram":
self.loop.create_task(self._updater(900))
async def _send(self, message):
client = self.serf.client
await self.serf.api_call(client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=tg_escape(message),
parse_mode="HTML",
disable_webpage_preview=True)
@staticmethod
def _display(dota: Dota) -> str:
string = f" [b]{dota.steam}[/b]\n"
if dota.rank:
string += f"{dota.rank}\n"
string += f"\n" \
f"Wins: [b]{dota.wins}[/b]\n" \
f"Losses: [b]{dota.losses}[/b]\n" \
f"\n"
return string
async def _notify(self,
obj: Dota,
attribute_name: str,
old_value: Any,
new_value: Any):
if attribute_name == "wins":
if old_value is None:
message = f"↔️ Account {obj} connesso a {obj.steam.user}!"
await self._send(message)
elif attribute_name == "rank":
old_rank: Optional[DotaRank] = old_value
new_rank: Optional[DotaRank] = new_value
if new_rank > old_rank:
message = f"📈 [b]{obj.steam.user}[/b] è salito a [b]{new_value}[/b] su Dota 2! Congratulazioni!"
elif new_rank < old_rank:
message = f"📉 [b]{obj.steam.user}[/b] è sceso a [b]{new_value}[/b] su Dota 2."
else:
return
await self._send(message)
@staticmethod
async def _change(obj: Dota,
attribute_name: str,
new_value: Any,
callback: Callable[[Dota, str, Any, Any], Awaitable[None]]):
old_value = obj.__getattribute__(attribute_name)
if old_value != new_value:
await callback(obj, attribute_name, old_value, new_value)
obj.__setattr__(attribute_name, new_value)
async def _update(self, steam: Steam, db_session):
log.info(f"Updating: {steam}")
log.debug(f"Getting player data from OpenDota...")
async with aiohttp.ClientSession() as session:
# Get profile data
async with session.get(f"https://api.opendota.com/api/players/{steam.steamid.as_32}/") as response:
if response.status != 200:
raise ExternalError(f"OpenDota / returned {response.status}!")
p = await response.json()
# No such user
if "profile" not in p:
log.debug(f"Not found: {steam}")
return
# Get win/loss data
async with session.get(f"https://api.opendota.com/api/players/{steam.steamid.as_32}/wl") as response:
if response.status != 200:
raise ExternalError(f"OpenDota /wl returned {response.status}!")
wl = await response.json()
# No such user
if wl["win"] == 0 and wl["lose"] == 0:
log.debug(f"Not found: {steam}")
return
# Find the Dota record, if it exists
dota: Dota = steam.dota
if dota is None:
dota = self.alchemy.get(Dota)(steam=steam)
db_session.add(dota)
db_session.flush()
await self._change(dota, "wins", wl["win"], self._notify)
await self._change(dota, "losses", wl["lose"], self._notify)
if p["rank_tier"]:
await self._change(dota, "rank", DotaRank(rank_tier=p["rank_tier"]), self._notify)
else:
await self._change(dota, "rank", None, self._notify)
async def _updater(self, period: int):
log.info(f"Started updater with {period}s period")
while True:
log.info(f"Updating...")
session = self.alchemy.Session()
log.info("")
steams = session.query(self.alchemy.get(Steam)).all()
for steam in steams:
try:
await self._update(steam, session)
except Exception as e:
sentry_sdk.capture_exception(e)
log.error(f"Error while updating {steam.user.username}: {e}")
await asyncio.sleep(1)
await asyncify(session.commit)
session.close()
log.info(f"Sleeping for {period}s")
await asyncio.sleep(period)
async def run(self, args: CommandArgs, data: CommandData) -> None:
author = await data.get_author(error_if_none=True)
found_something = False
message = ""
for steam in author.steam:
dota = steam.dota
if dota is None:
continue
found_something = True
await self._update(steam)
message += self._display(steam)
message += "\n"
if not found_something:
if len(author.leagueoflegends) == 0:
raise UserError("Nessun account di Dota 2 trovato.")
await data.reply(message)

View file

@ -40,32 +40,31 @@ class LeagueoflegendsCommand(Command):
attribute_name: str,
old_value: typing.Any,
new_value: typing.Any):
if self.interface.name == "telegram":
if isinstance(old_value, LeagueLeague):
# This is a rank change!
# Don't send messages for every rank change, send messages just if the TIER or RANK changes!
if old_value.tier == new_value.tier and old_value.rank == new_value.rank:
return
# Find the queue
queue_names = {
"rank_soloq": "Solo/Duo",
"rank_flexq": "Flex",
"rank_twtrq": "3v3",
"rank_tftq": "TFT"
}
# Prepare the message
if new_value > old_value:
message = f"📈 [b]{obj.user}[/b] è salito a {new_value} su League of Legends " \
f"({queue_names[attribute_name]})! Congratulazioni!"
else:
message = f"📉 [b]{obj.user}[/b] è sceso a {new_value} su League of Legends " \
f"({queue_names[attribute_name]})."
# Send the message
await self._send(message)
# Level up!
elif attribute_name == "summoner_level":
if new_value == 30 or (new_value >= 50 and (new_value % 25 == 0)):
await self._send(f"🆙 [b]{obj.user}[/b] è salito al livello [b]{new_value}[/b] su League of Legends!")
if isinstance(old_value, LeagueLeague):
# This is a rank change!
# Don't send messages for every rank change, send messages just if the TIER or RANK changes!
if old_value.tier == new_value.tier and old_value.rank == new_value.rank:
return
# Find the queue
queue_names = {
"rank_soloq": "Solo/Duo",
"rank_flexq": "Flex",
"rank_twtrq": "3v3",
"rank_tftq": "TFT"
}
# Prepare the message
if new_value > old_value:
message = f"📈 [b]{obj.user}[/b] è salito a {new_value} su League of Legends " \
f"({queue_names[attribute_name]})! Congratulazioni!"
else:
message = f"📉 [b]{obj.user}[/b] è sceso a {new_value} su League of Legends " \
f"({queue_names[attribute_name]})."
# Send the message
await self._send(message)
# Level up!
elif attribute_name == "summoner_level":
if new_value == 30 or (new_value >= 50 and (new_value % 25 == 0)):
await self._send(f"🆙 [b]{obj.user}[/b] è salito al livello [b]{new_value}[/b] su League of Legends!")
@staticmethod
async def _change(obj: LeagueOfLegends,

View file

@ -10,11 +10,13 @@ class RageCommand(Command):
description: str = "Arrabbiati per qualcosa, come una software house californiana."
_MAD = ["MADDEN MADDEN MADDEN MADDEN",
"EA bad, praise Geraldo!",
"Stai sfogando la tua ira sul bot!",
"Basta, io cambio gilda!",
"Fondiamo la RRYG!"]
_MAD = [
"MADDEN MADDEN MADDEN MADDEN",
"EA bad, praise Geraldo!",
"Stai sfogando la tua ira sul bot!",
"Basta, io cambio gilda!",
"Fondiamo la RRYG!"
]
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply(f"😠 {random.sample(self._MAD, 1)[0]}")

View file

@ -44,6 +44,8 @@ class UserinfoCommand(Command):
for account in user.steam:
r.append(f"{account}")
if account.dota is not None:
r.append(f"{account.dota}")
for account in user.leagueoflegends:
r.append(f"{account}")

View file

@ -11,6 +11,7 @@ from .mmresponse import MMResponse
from .leagueoflegends import LeagueOfLegends
from .fiorygi import Fiorygi
from .steam import Steam
from .dota import Dota
# Enter the tables of your Pack here!
available_tables = [
@ -26,6 +27,7 @@ available_tables = [
LeagueOfLegends,
Fiorygi,
Steam,
Dota,
]
# Don't change this, it should automatically generate __all__

81
royalpack/tables/dota.py Normal file
View file

@ -0,0 +1,81 @@
from typing import *
from sqlalchemy import *
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declared_attr
from ..utils.dotamedal import DotaMedal
from ..utils.dotastars import DotaStars
from ..utils.dotarank import DotaRank
import steam
class Dota:
__tablename__ = "dota"
@declared_attr
def _steamid(self):
return Column(BigInteger, ForeignKey("steam._steamid"), primary_key=True)
@declared_attr
def steam(self):
return relationship("Steam", backref=backref("dota", uselist=False))
@property
def steamid(self):
return steam.SteamID(self._steamid)
@declared_attr
def _rank_tier(self):
return Column(Integer)
@property
def medal(self) -> Optional[DotaMedal]:
if self._rank_tier is None:
return None
return DotaMedal(self._rank_tier // 10)
@medal.setter
def medal(self, value: DotaMedal):
if not isinstance(value, DotaMedal):
raise AttributeError("medal can only be set to DotaMedal objects.")
self._rank_tier = value.value * 10 + self.stars.value
@property
def stars(self) -> Optional[DotaStars]:
if self._rank_tier is None:
return None
return DotaStars(self._rank_tier % 10)
@stars.setter
def stars(self, value: DotaStars):
if not isinstance(value, DotaStars):
raise AttributeError("stars can only be set to DotaStars objects.")
self._rank_tier = self.medal.value * 10 + value.value
@property
def rank(self) -> Optional[DotaRank]:
if self._rank_tier is None:
return None
return DotaRank(self.medal, self.stars)
@rank.setter
def rank(self, value: Optional[DotaRank]):
if value is None:
self._rank_tier = None
return
if not isinstance(value, DotaRank):
raise AttributeError("rank can only be set to DotaRank objects (or None).")
self._rank_tier = value.rank_tier
@declared_attr
def wins(self):
return Column(Integer)
@declared_attr
def losses(self):
return Column(Integer)
def __repr__(self):
return f"<Dota account {self._steamid}>"
def __str__(self):
return f"[c]dota:{self._steamid}[/c]"

View file

@ -4,6 +4,9 @@ from .leaguetier import LeagueTier
from .leaguerank import LeagueRank
from .leagueleague import LeagueLeague
from .royalqueue import RoyalQueue
from .dotamedal import DotaMedal
from .dotastars import DotaStars
from .dotarank import DotaRank
__all__ = [
"MMChoice",
@ -13,4 +16,7 @@ __all__ = [
"LeagueRank",
"LeagueLeague",
"RoyalQueue",
"DotaMedal",
"DotaStars",
"DotaRank",
]

View file

@ -0,0 +1,21 @@
import enum
class DotaMedal(enum.Enum):
HERALD = 1
GUARDIAN = 2
CRUSADER = 3
ARCHON = 4
LEGEND = 5
ANCIENT = 6
DIVINE = 7
IMMORTAL = 8
def __str__(self):
return self.name.capitalize()
def __repr__(self):
return f"{self.__class__.__qualname__}.{self.name}"
def __gt__(self, other):
return self.value > other.value

View file

@ -0,0 +1,46 @@
from .dotamedal import DotaMedal
from .dotastars import DotaStars
class DotaRank:
__slots__ = "medal", "stars"
def __init__(self, medal: DotaMedal = None, stars: DotaStars = None, *, rank_tier: int = None):
if rank_tier is not None:
self.medal: DotaMedal = DotaMedal(rank_tier // 10)
self.stars: DotaStars = DotaStars(rank_tier % 10)
else:
if medal is None or stars is None:
raise AttributeError("Missing medal, stars or rank_tier.")
self.medal = medal
self.stars = stars
def __gt__(self, other):
if other is None:
return True
if not isinstance(other, DotaRank):
raise TypeError(f"Can't compare {self.__class__.__qualname__} with {other.__class__.__qualname__}")
if self.medal > other.medal:
return True
elif self.medal < other.medal:
return False
elif self.stars > other.stars:
return True
return False
def __eq__(self, other):
if other is None:
return False
if not isinstance(other, DotaRank):
raise TypeError(f"Can't compare {self.__class__.__qualname__} with {other.__class__.__qualname__}")
return self.medal == other.medal and self.stars == other.stars
def __repr__(self):
return f"<{self.__class__.__qualname__}: {self.medal.name} {self.stars.name}>"
def __str__(self):
return f"{self.medal} {self.stars}"
@property
def rank_tier(self) -> int:
return (self.medal.value * 10 + self.stars.value)

View file

@ -0,0 +1,20 @@
import enum
class DotaStars(enum.Enum):
I = 1
II = 2
III = 3
IV = 4
V = 5
VI = 6
VII = 7
def __str__(self):
return self.name.upper()
def __repr__(self):
return f"{self.__class__.__qualname__}.{self.name}"
def __gt__(self, other):
return self.value > other.value