mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-24 03:54:20 +00:00
102 lines
3.7 KiB
Python
102 lines
3.7 KiB
Python
|
from typing import *
|
||
|
from royalnet.commands import *
|
||
|
from royalnet.utils import *
|
||
|
from ..tables import Alias, Steam
|
||
|
import steam
|
||
|
import requests.exceptions
|
||
|
|
||
|
|
||
|
class SteamGame:
|
||
|
def __init__(self,
|
||
|
appid=None,
|
||
|
name=None,
|
||
|
playtime_forever=None,
|
||
|
img_icon_url=None,
|
||
|
img_logo_url=None,
|
||
|
has_community_visible_stats=None,
|
||
|
playtime_windows_forever=None,
|
||
|
playtime_mac_forever=None,
|
||
|
playtime_linux_forever=None,
|
||
|
playtime_2weeks=None):
|
||
|
self.appid = appid
|
||
|
self.name = name
|
||
|
self.playtime_forever = playtime_forever
|
||
|
self.img_icon_url = img_icon_url
|
||
|
self.img_logo_url = img_logo_url
|
||
|
self.has_community_visible_stats = has_community_visible_stats
|
||
|
self.playtime_windows_forever = playtime_windows_forever
|
||
|
self.playtime_mac_forever = playtime_mac_forever
|
||
|
self.playtime_linux_forever = playtime_linux_forever
|
||
|
self.playtime_2weeks = playtime_2weeks
|
||
|
|
||
|
def __hash__(self):
|
||
|
return self.appid
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
if isinstance(other, SteamGame):
|
||
|
return self.appid == other.appid
|
||
|
return False
|
||
|
|
||
|
def __str__(self):
|
||
|
return self.name
|
||
|
|
||
|
def __repr__(self):
|
||
|
return f"<{self.__class__.__qualname__} {self.appid} ({self.name})>"
|
||
|
|
||
|
|
||
|
class SteammatchCommand(Command):
|
||
|
name: str = "steammatch"
|
||
|
|
||
|
description: str = "Vedi quali giochi hai in comune con uno o più membri!"
|
||
|
|
||
|
syntax: str = "{royalnet_username}+"
|
||
|
|
||
|
def __init__(self, interface: CommandInterface):
|
||
|
super().__init__(interface)
|
||
|
if "Steam" not in self.config or "web_api_key" not in self.config["Steam"]:
|
||
|
raise ConfigurationError("[c]Steam.web_api_key[/c] config option is missing!")
|
||
|
self._api = steam.WebAPI(self.config["Steam"]["web_api_key"])
|
||
|
|
||
|
async def run(self, args: CommandArgs, data: CommandData) -> None:
|
||
|
users = []
|
||
|
|
||
|
author = await data.get_author(error_if_none=True)
|
||
|
users.append(author)
|
||
|
|
||
|
for arg in args:
|
||
|
user = await asyncify(Alias.find_by_alias, self.alchemy, data.session, arg)
|
||
|
users.append(user)
|
||
|
|
||
|
if len(users) < 2:
|
||
|
raise InvalidInputError("Devi specificare almeno un altro utente!")
|
||
|
|
||
|
shared_games: Optional[set] = None
|
||
|
for user in users:
|
||
|
user_games = set()
|
||
|
if len(user.steam) == 0:
|
||
|
raise UserError(f"{user} non ha un account Steam registrato!")
|
||
|
for steam_account in user.steam:
|
||
|
steam_account: Steam
|
||
|
try:
|
||
|
response = await asyncify(self._api.IPlayerService.GetOwnedGames,
|
||
|
steamid=steam_account._steamid,
|
||
|
include_appinfo=True,
|
||
|
include_played_free_games=True,
|
||
|
appids_filter=0)
|
||
|
except requests.exceptions.HTTPError:
|
||
|
raise ExternalError(f"L'account Steam di {user} è privato!")
|
||
|
games = response["response"]["games"]
|
||
|
for game in games:
|
||
|
user_games.add(SteamGame(**game))
|
||
|
if shared_games is None:
|
||
|
shared_games = user_games
|
||
|
else:
|
||
|
shared_games = shared_games.intersection(user_games)
|
||
|
|
||
|
message_rows = [f"🎮 Giochi in comune tra {andformat([str(user) for user in users], final=' e ')}:"]
|
||
|
for game in sorted(list(shared_games), key=lambda g: g.name):
|
||
|
message_rows.append(f"- {game}")
|
||
|
|
||
|
message = "\n".join(message_rows)
|
||
|
await data.reply(message)
|