1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-30 15:04:18 +00:00
royalnet/royalpack/commands/steampowered.py

128 lines
5.4 KiB
Python
Raw Normal View History

2020-01-20 21:47:45 +00:00
from typing import *
2020-05-28 14:54:43 +00:00
import steam.steamid
import steam.webapi
2020-01-20 21:47:45 +00:00
import datetime
2020-05-10 22:46:12 +00:00
import royalnet.commands as rc
import royalnet.utils as ru
2020-07-18 00:50:00 +00:00
import logging
from royalnet.backpack import tables as rbt
from .abstract.linker import LinkerCommand
2020-05-10 22:46:12 +00:00
from ..tables import Steam, FiorygiTransaction
2020-07-18 00:50:00 +00:00
from ..types import Updatable
log = logging.getLogger(__name__)
2020-01-20 21:47:45 +00:00
2020-07-18 00:50:00 +00:00
class SteampoweredCommand(LinkerCommand):
2020-01-20 21:47:45 +00:00
name: str = "steampowered"
2020-07-18 00:50:00 +00:00
description: str = "Connetti e visualizza informazioni sul tuo account di Steam!"
2020-01-20 21:47:45 +00:00
2020-07-18 00:50:00 +00:00
syntax: str = "{url_profilo}"
2020-01-20 21:47:45 +00:00
2020-08-20 01:20:53 +00:00
def __init__(self, serf, config):
super().__init__(serf, config)
2020-07-18 00:50:00 +00:00
self._api = steam.webapi.WebAPI(self.token())
2020-01-20 21:47:45 +00:00
2020-07-18 00:50:00 +00:00
def token(self):
return self.config["steampowered"]["token"]
2020-04-02 14:56:52 +00:00
2020-07-18 00:50:00 +00:00
async def get_updatables_of_user(self, session, user: rbt.User) -> List[Steam]:
return user.steam
async def get_updatables(self, session) -> List[Steam]:
return await ru.asyncify(session.query(self.alchemy.get(Steam)).all)
2020-01-20 21:47:45 +00:00
2020-07-21 21:12:14 +00:00
async def create(self,
session,
user: rbt.User,
args: rc.CommandArgs,
data: Optional[rc.CommandData] = None) -> Optional[Steam]:
2020-07-18 00:50:00 +00:00
url = args.joined()
steamid64 = await self._call(steam.steamid.steam64_from_url, url)
if steamid64 is None:
raise rc.InvalidInputError("Quel link non è associato ad alcun account Steam.")
response = await self._call(self._api.ISteamUser.GetPlayerSummaries_v2, steamids=steamid64)
r = response["response"]["players"][0]
steam_account = self.alchemy.get(Steam)(
user=user,
_steamid=int(steamid64),
persona_name=r["personaname"],
profile_url=r["profileurl"],
avatar=r["avatarfull"],
primary_clan_id=r["primaryclanid"],
account_creation_date=datetime.datetime.fromtimestamp(r["timecreated"])
)
2020-07-21 21:12:14 +00:00
await FiorygiTransaction.spawn_fiorygi(
user=user,
qty=1,
2020-08-23 22:13:38 +00:00
reason="aver collegato a Royalnet il proprio account di Steam",
data=data,
session=session,
2020-07-21 21:12:14 +00:00
)
2020-07-18 00:50:00 +00:00
session.add(steam_account)
return steam_account
2020-07-09 02:13:54 +00:00
2020-07-18 00:50:00 +00:00
async def update(self, session, obj: Steam, change: Callable[[str, Any], Awaitable[None]]):
response = await self._call(self._api.ISteamUser.GetPlayerSummaries_v2, steamids=obj.steamid.as_64)
r = response["response"]["players"][0]
obj.persona_name = r["personaname"]
obj.profile_url = r["profileurl"]
obj.avatar = r["avatar"]
obj.primary_clan_id = r["primaryclanid"]
obj.account_creation_date = datetime.datetime.fromtimestamp(r["timecreated"])
response = await self._call(self._api.IPlayerService.GetSteamLevel_v1, steamid=obj.steamid.as_64)
obj.account_level = response["response"]["player_level"]
2020-07-09 02:13:54 +00:00
response = await self._call(self._api.IPlayerService.GetOwnedGames_v1,
2020-07-18 00:50:00 +00:00
steamid=obj.steamid.as_64,
2020-07-09 02:13:54 +00:00
include_appinfo=False,
include_played_free_games=True,
include_free_sub=False,
appids_filter=None)
2020-07-18 00:50:00 +00:00
obj.owned_games_count = response["response"]["game_count"]
2020-07-09 02:13:54 +00:00
if response["response"]["game_count"] >= 0:
2020-07-18 00:50:00 +00:00
obj.most_played_game_2weeks = sorted(response["response"]["games"], key=lambda g: -g.get("playtime_2weeks", 0))[0]["appid"]
obj.most_played_game_forever = sorted(response["response"]["games"], key=lambda g: -g.get("playtime_forever", 0))[0]["appid"]
async def on_increase(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
pass
async def on_unchanged(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
pass
async def on_decrease(self, session, obj: Updatable, attribute: str, old: Any, new: Any) -> None:
pass
async def on_first(self, session, obj: Updatable, attribute: str, old: None, new: Any) -> None:
pass
async def on_reset(self, session, obj: Updatable, attribute: str, old: Any, new: None) -> None:
pass
def describe(self, obj: Steam):
return f" [url={obj.profile_url}]{obj.persona_name}[/url]\n" \
f"[b]Level {obj.account_level}[/b]\n" \
f"\n" \
f"Owned games: [b]{obj.owned_games_count}[/b]\n" \
f"Most played 2 weeks: [url=https://store.steampowered.com/app/{obj.most_played_game_2weeks}]{obj.most_played_game_2weeks}[/url]\n" \
f"Most played forever: [url=https://store.steampowered.com/app/{obj.most_played_game_forever}]{obj.most_played_game_forever}[/url]\n" \
f"\n" \
f"SteamID32: [c]{obj.steamid.as_32}[/c]\n" \
f"SteamID64: [c]{obj.steamid.as_64}[/c]\n" \
f"SteamID2: [c]{obj.steamid.as_steam2}[/c]\n" \
f"SteamID3: [c]{obj.steamid.as_steam3}[/c]\n" \
f"\n" \
f"Created on: [b]{obj.account_creation_date}[/b]\n"
async def _call(self, method, *args, **kwargs):
log.debug(f"Calling {method}")
try:
return await ru.asyncify(method, *args, **kwargs)
except Exception as e:
raise rc.ExternalError("\n".join(e.args).replace(self.token(), "HIDDEN"))