mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 11:34:18 +00:00
unfinished
This commit is contained in:
parent
5399c81e55
commit
eeba4d8e3f
5 changed files with 179 additions and 88 deletions
147
database.py
147
database.py
|
@ -1,8 +1,9 @@
|
|||
import datetime
|
||||
import sqlalchemy.exc
|
||||
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
|
||||
from sqlalchemy.orm import sessionmaker, relationship
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
import bcrypt
|
||||
import lol
|
||||
|
||||
|
||||
class NoUsersMatchingError(Exception):
|
||||
|
@ -19,95 +20,49 @@ Base = declarative_base()
|
|||
Session = sessionmaker(bind=engine)
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "members"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
username = Column(String, unique=True, nullable=False)
|
||||
password = Column(String, nullable=False)
|
||||
telegram_id = Column(Integer, unique=True)
|
||||
discord_id = Column(Integer, unique=True)
|
||||
|
||||
def __str__(self):
|
||||
return self.username
|
||||
|
||||
def __repr__(self):
|
||||
return f"<User {self.id}: {self.username}>"
|
||||
|
||||
|
||||
class Diario(Base):
|
||||
__tablename__ = "diario"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
text = Column(String, nullable=False)
|
||||
date = Column(DateTime, nullable=False)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Diario {self.date} {self.text}>"
|
||||
|
||||
|
||||
class Account(Base):
|
||||
__tablename__ = "account"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
lol = relationship("LoL")
|
||||
|
||||
|
||||
class LoL(Base):
|
||||
__tablename__ = "lol"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
parentid = Column(Integer, ForeignKey("account.id"))
|
||||
|
||||
last_updated = Column(DateTime)
|
||||
summoner_name = Column(String, nullable=False)
|
||||
level = Column(Integer)
|
||||
soloq_division = Column(Integer)
|
||||
soloq_tier = Column(Integer)
|
||||
flexq_division = Column(Integer)
|
||||
flexq_tier = Column(Integer)
|
||||
ttq_division = Column(Integer)
|
||||
ttq_tier = Column(Integer)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<LoL {self.id} {self.summoner_name}>"
|
||||
|
||||
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
|
||||
def create_user(username, password):
|
||||
"""Create a new user and add it to the database."""
|
||||
# Create a new session
|
||||
session = Session()
|
||||
# Hash the password with bcrypt
|
||||
hashed_password = bcrypt.hashpw(password.encode("utf8"), bcrypt.gensalt())
|
||||
# Create a new user
|
||||
new_member = User(username=username, password=hashed_password)
|
||||
# Add the newly created member to the session
|
||||
session.add(new_member)
|
||||
# Commit the changes
|
||||
session.commit()
|
||||
|
||||
|
||||
# TODO: check for vulnerabilities
|
||||
def change_password(username, newpassword):
|
||||
# Create a new session
|
||||
session = Session()
|
||||
# Hash the new password using bcrypt
|
||||
hashed_password = bcrypt.hashpw(newpassword.encode("utf8"), bcrypt.gensalt())
|
||||
# Find the user entry
|
||||
users = session.query(User).filter_by(username=username).all()
|
||||
if len(users) == 0:
|
||||
raise NoUsersMatchingError("No users with the specified username found.")
|
||||
db_user = users[0]
|
||||
# Change the password and commit
|
||||
db_user.password = hashed_password
|
||||
session.commit()
|
||||
|
||||
|
||||
def login(username, password, enable_exceptions=False):
|
||||
"""Try to login using the database password.
|
||||
The session is always returned, while the user object is returned if the login is successful."""
|
||||
# Create a new session
|
||||
session = Session()
|
||||
# Find the matching user
|
||||
db_user = session.query(User).filter_by(username=username).first()
|
||||
# Verify that the user exists
|
||||
if db_user is not None:
|
||||
return session, None
|
||||
# Test the password and return the session and the user if successful
|
||||
if bcrypt.hashpw(password.encode("utf8"), db_user.password) == db_user.password:
|
||||
return session, db_user
|
||||
else:
|
||||
if enable_exceptions:
|
||||
raise InvalidPasswordError("The specified password doesn't match the user's.")
|
||||
else:
|
||||
return session, None
|
||||
|
||||
|
||||
def find_user(username):
|
||||
"""Find the user with the specified username and return the session and the user object."""
|
||||
# Create a new session
|
||||
session = Session()
|
||||
# Find the matching user
|
||||
db_user = session.query(User).filter_by(username=username).first()
|
||||
# Return the session and the user
|
||||
return session, db_user
|
||||
|
||||
|
||||
def migrate_diario():
|
||||
import datetime
|
||||
session = Session()
|
||||
|
@ -133,7 +88,37 @@ def new_diario_entry(dt, text):
|
|||
# Commit the change
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
if len(session.query(User).all()) < 1:
|
||||
# Look! A plaintext password!
|
||||
create_user("steffo", "v3n0m-sn4k3")
|
||||
|
||||
async def update_lol(lid):
|
||||
# Create a new database session
|
||||
session = Session()
|
||||
# Find the user
|
||||
user = session.query(Account).join(LoL).filter_by(id=lid).first()
|
||||
# Poll the League API for more information
|
||||
data = await lol.get_summoner_data("euw", summoner_id=user.lol.id)
|
||||
# Update the user data
|
||||
user.lol.summoner_name = data["name"]
|
||||
user.lol.level = data["level"]
|
||||
# Poll the League API for ranked data
|
||||
soloq, flexq, ttq = await lol.get_rank_data("euw", lid)
|
||||
# Update the user data
|
||||
if soloq is not None:
|
||||
user.lol.soloq_tier = lol.tiers[soloq["tier"]]
|
||||
user.lol.soloq_division = lol.divisions[soloq["entries"][0]["division"]]
|
||||
else:
|
||||
user.lol.soloq_tier = None
|
||||
user.lol.soloq_division = None
|
||||
if flexq is not None:
|
||||
user.lol.flexq_tier = lol.tiers[flexq["tier"]]
|
||||
user.lol.flexq_division = lol.divisions[flexq["entries"][0]["division"]]
|
||||
else:
|
||||
user.lol.flexq_tier = None
|
||||
user.lol.flexq_division = None
|
||||
if ttq is not None:
|
||||
user.lol.ttq_tier = lol.tiers[ttq["tier"]]
|
||||
user.lol.ttq_division = lol.divisions[ttq["entries"][0]["division"]]
|
||||
else:
|
||||
user.lol.ttq_tier = None
|
||||
user.lol.ttq_division = None
|
||||
# Mark the user as updated
|
||||
user.lol.last_updated = datetime.datetime.now()
|
68
lol.py
Normal file
68
lol.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import asyncio
|
||||
import aiohttp
|
||||
import royalbotconfig
|
||||
import enum
|
||||
|
||||
# https://euw.api.riotgames.com/api/lol/EUW/v1.4/summoner/52348350?api_key=RGAPI-1008c33d-b0a4-4091-8600-27022d570964
|
||||
|
||||
class LoLAPIError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
tiers = {
|
||||
"BRONZE": 0,
|
||||
"SILVER": 1,
|
||||
"GOLD": 2,
|
||||
"PLATINUM": 3,
|
||||
"DIAMOND": 4,
|
||||
"MASTER": 5,
|
||||
"CHALLENGER": 6
|
||||
}
|
||||
|
||||
divisions = {
|
||||
"I": 0,
|
||||
"II": 1,
|
||||
"III": 2,
|
||||
"IV": 3,
|
||||
"V": 4
|
||||
}
|
||||
|
||||
|
||||
async def get_json(url, **kwargs):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, **kwargs) as response:
|
||||
json = await session.json()
|
||||
if response.status != 200:
|
||||
raise LoLAPIError(f"Riot API returned {response.status}")
|
||||
return json
|
||||
|
||||
|
||||
async def get_summoner_data(region: str, summoner_id=None, summoner_name=None):
|
||||
# Check for the number of arguments
|
||||
if bool(summoner_id) == bool(summoner_name):
|
||||
# TODO: use the correct exception
|
||||
raise Exception("Invalid number of arguments specified")
|
||||
params = {
|
||||
"api_key": royalbotconfig.lol_token
|
||||
}
|
||||
if summoner_id is not None:
|
||||
data = await get_json(f"https://{region.lower()}.api.riotgames.com/api/lol/{region.upper()}/v1.4/summoner/{summoner_id}")
|
||||
return data[summoner_id]
|
||||
elif summoner_name is not None:
|
||||
data = await get_json(f"https://{region.lower()}.api.riotgames.com/api/lol/{region.upper()}/v1.4/summoner/by-name/{summoner_name}")
|
||||
return data[summoner_name]
|
||||
|
||||
|
||||
async def get_rank_data(region: str, summoner_id: int):
|
||||
data = await get_json(f"https://{region.lower()}.api.riotgames.com/api/lol/{region.upper()}/v2.5/league/by-summoner/{summoner_id}/entry")
|
||||
soloq = None
|
||||
flexq = None
|
||||
ttq = None
|
||||
for entry in data:
|
||||
if data["queue"] == "RANKED_SOLO_5x5":
|
||||
soloq = entry
|
||||
elif data["queue"] == "RANKED_FLEX_SR":
|
||||
flexq = entry
|
||||
elif data["queue"] == "RANKED_FLEX_TT":
|
||||
ttq = entry
|
||||
return soloq, flexq, ttq
|
|
@ -2,5 +2,4 @@ aiohttp
|
|||
async_timeout
|
||||
markovify
|
||||
discord.py[voice]
|
||||
sqlalchemy
|
||||
bcrypt
|
||||
sqlalchemy
|
43
royalbot.py
43
royalbot.py
|
@ -137,7 +137,7 @@ Sintassi: `{symbol}cv`"""
|
|||
await status_typing(bot, thing)
|
||||
# Check command syntax
|
||||
if len(arguments) != 0:
|
||||
await answer(bot, thing, cv.__doc__)
|
||||
await display_help(bot, thing, cv)
|
||||
return
|
||||
# Wait for the Discord bot to login
|
||||
while not d.client.is_logged_in:
|
||||
|
@ -220,12 +220,50 @@ Sintassi: `{symbol}roll <max>`"""
|
|||
await status_typing(bot, thing)
|
||||
# Check the command syntax
|
||||
if len(arguments) != 1:
|
||||
await answer(bot, thing, "⚠ Sintassi del comando non valida.\n`/roll <max>`",)
|
||||
await display_help(bot, thing, roll)
|
||||
return
|
||||
# Roll the dice!
|
||||
await answer(bot, thing, f"*Numero generato:* {random.randrange(0, int(arguments[0])) + 1}")
|
||||
|
||||
|
||||
# DISCORD ONLY!
|
||||
async def syncdiscord(bot, thing: extradiscord.discord.Message, arguments):
|
||||
"""Crea un nuovo account usando il tuo ID di Discord.
|
||||
|
||||
Sintassi: `{symbol}syncdiscord`"""
|
||||
# Set status to typing
|
||||
await status_typing(bot, thing)
|
||||
# Check the command syntax
|
||||
if len(arguments) != 0:
|
||||
await display_help(bot, thing, syncdiscord)
|
||||
return
|
||||
# Open a new database session
|
||||
session = database.Session()
|
||||
# Check if the user already exists
|
||||
user = session.query(database.Account).filter_by(id=thing.author.id).first()
|
||||
if user is not None:
|
||||
await answer(bot, thing, "⚠ L'account è già stato registrato.")
|
||||
return
|
||||
# Create a new user
|
||||
user = database.Account(id=thing.author.id)
|
||||
session.add(user)
|
||||
session.commit()
|
||||
# Notify the sender
|
||||
await answer(bot, thing, "✅ Account registrato con successo!")
|
||||
|
||||
|
||||
async def synclol(bot, thing, arguments):
|
||||
"""Connetti il tuo account di LoL all'account Royal Games!
|
||||
|
||||
Sintassi: `{symbol}synclol <nome evocatore>`"""
|
||||
# Set status to typing
|
||||
await status_typing(bot, thing)
|
||||
# Check the command syntax
|
||||
if len(arguments) != 0:
|
||||
await display_help(bot, thing, synclol)
|
||||
return
|
||||
# Create a new lol account and connect it to the user
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Init universal bot commands
|
||||
b.commands["diario"] = diario
|
||||
|
@ -236,6 +274,7 @@ if __name__ == "__main__":
|
|||
d.commands["help"] = helpme
|
||||
d.commands["helpme"] = helpme
|
||||
b.commands["cv"] = cv
|
||||
d.commands["syncdiscord"] = syncdiscord
|
||||
# Init Telegram bot
|
||||
loop.create_task(b.run())
|
||||
print("Telegram bot start scheduled!")
|
||||
|
|
|
@ -13,7 +13,7 @@ if "discord_token" in os.environ:
|
|||
else:
|
||||
raise MissingTokenError("discord_token")
|
||||
|
||||
if "discord_webhook" in os.environ:
|
||||
discord_webhook = os.environ["discord_webhook"]
|
||||
if "lol_token" in os.environ:
|
||||
lol_token = os.environ["lol_token"]
|
||||
else:
|
||||
raise MissingTokenError("discord_webhook")
|
||||
raise MissingTokenError("lol_token")
|
Loading…
Reference in a new issue