1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-30 15:04:18 +00:00
royalnet/royalpack/commands/steammatch.py

103 lines
3.8 KiB
Python
Raw Normal View History

2020-01-20 23:54:55 +00:00
from typing import *
from royalnet.commands import *
from royalnet.utils import *
2020-02-03 23:54:39 +00:00
from royalnet.backpack.tables import Alias
from ..tables import Steam
2020-01-20 23:54:55 +00:00
import steam
import requests.exceptions
class SteamGame:
def __init__(self,
appid=None,
name=None,
playtime_forever=None,
img_icon_url=None,
img_logo_url=None,
has_community_visible_stats=None,
playtime_windows_forever=None,
playtime_mac_forever=None,
playtime_linux_forever=None,
playtime_2weeks=None):
self.appid = appid
self.name = name
self.playtime_forever = playtime_forever
self.img_icon_url = img_icon_url
self.img_logo_url = img_logo_url
self.has_community_visible_stats = has_community_visible_stats
self.playtime_windows_forever = playtime_windows_forever
self.playtime_mac_forever = playtime_mac_forever
self.playtime_linux_forever = playtime_linux_forever
self.playtime_2weeks = playtime_2weeks
def __hash__(self):
return self.appid
def __eq__(self, other):
if isinstance(other, SteamGame):
return self.appid == other.appid
return False
def __str__(self):
return self.name
def __repr__(self):
return f"<{self.__class__.__qualname__} {self.appid} ({self.name})>"
class SteammatchCommand(Command):
name: str = "steammatch"
description: str = "Vedi quali giochi hai in comune con uno o più membri!"
syntax: str = "{royalnet_username}+"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
if "Steam" not in self.config or "web_api_key" not in self.config["Steam"]:
raise ConfigurationError("[c]Steam.web_api_key[/c] config option is missing!")
self._api = steam.WebAPI(self.config["Steam"]["web_api_key"])
async def run(self, args: CommandArgs, data: CommandData) -> None:
users = []
author = await data.get_author(error_if_none=True)
users.append(author)
for arg in args:
2020-02-11 22:41:24 +00:00
user = await Alias.find_user(self.alchemy, data.session, arg)
2020-01-20 23:54:55 +00:00
users.append(user)
if len(users) < 2:
raise InvalidInputError("Devi specificare almeno un altro utente!")
shared_games: Optional[set] = None
for user in users:
user_games = set()
if len(user.steam) == 0:
raise UserError(f"{user} non ha un account Steam registrato!")
for steam_account in user.steam:
steam_account: Steam
try:
response = await asyncify(self._api.IPlayerService.GetOwnedGames,
steamid=steam_account._steamid,
include_appinfo=True,
include_played_free_games=True,
appids_filter=0)
except requests.exceptions.HTTPError:
raise ExternalError(f"L'account Steam di {user} è privato!")
games = response["response"]["games"]
for game in games:
user_games.add(SteamGame(**game))
if shared_games is None:
shared_games = user_games
else:
shared_games = shared_games.intersection(user_games)
message_rows = [f"🎮 Giochi in comune tra {andformat([str(user) for user in users], final=' e ')}:"]
for game in sorted(list(shared_games), key=lambda g: g.name):
message_rows.append(f"- {game}")
message = "\n".join(message_rows)
await data.reply(message)