1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 13:34:28 +00:00
royalnet/database.py

143 lines
4.2 KiB
Python
Raw Normal View History

2017-03-28 15:43:07 +00:00
import sqlalchemy.exc
2017-03-30 10:13:10 +00:00
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
2017-03-09 14:52:02 +00:00
import bcrypt
2017-03-10 10:59:00 +00:00
2017-03-10 09:34:27 +00:00
class NoUsersMatchingError(Exception):
pass
class InvalidPasswordError(Exception):
pass
# Initialize the database
2017-03-10 09:11:06 +00:00
engine = create_engine("sqlite:///db.sqlite")
Base = declarative_base()
Session = sessionmaker(bind=engine)
2017-03-30 10:13:10 +00:00
2017-03-10 09:11:06 +00:00
class User(Base):
__tablename__ = "members"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, nullable=False)
password = Column(String, nullable=False)
royal = Column(Boolean, nullable=False)
2017-03-10 09:11:06 +00:00
telegram_id = Column(Integer, unique=True)
2017-03-22 18:12:51 +00:00
discord_id = Column(Integer, unique=True)
2017-03-30 10:13:10 +00:00
diario_entries = relationship("Diario")
2017-03-09 14:52:02 +00:00
def __str__(self):
2017-03-22 17:00:12 +00:00
return self.username
def __repr__(self):
return f"<User {self.id}: {self.username}>"
2017-03-09 14:52:02 +00:00
2017-03-30 10:13:10 +00:00
class Diario(Base):
__tablename__ = "diario"
id = Column(Integer, primary_key=True)
text = Column(String, nullable=False)
date = Column(DateTime, nullable=False)
author = Column(Integer, ForeignKey("members.id"))
def __repr__(self):
return f"<Diario {self.date} {self.text}>"
Base.metadata.create_all(engine)
2017-03-09 14:52:02 +00:00
2017-03-30 10:13:10 +00:00
2017-03-10 09:11:06 +00:00
def create_user(username, password, royal=False):
"""Create a new user and add it to the database."""
2017-03-09 14:52:02 +00:00
# Create a new session
session = Session()
# Hash the password with bcrypt
hashed_password = bcrypt.hashpw(password.encode("utf8"), bcrypt.gensalt())
2017-03-10 09:11:06 +00:00
# Create a new user
new_member = User(username=username, password=hashed_password, royal=royal)
2017-03-09 14:52:02 +00:00
# Add the newly created member to the session
session.add(new_member)
# Commit the changes
session.commit()
2017-03-10 09:34:27 +00:00
# TODO: check for vulnerabilities
def change_password(username, newpassword):
# Create a new session
session = Session()
# Hash the new password using bcrypt
hashed_password = bcrypt.hashpw(newpassword.encode("utf8"), bcrypt.gensalt())
# Find the user entry
users = session.query(User).filter_by(username=username).all()
if len(users) == 0:
raise NoUsersMatchingError("No users with the specified username found.")
db_user = users[0]
# Change the password and commit
db_user.password = hashed_password
session.commit()
def login(username, password, enable_exceptions=False):
2017-03-30 10:13:10 +00:00
"""Try to login using the database password.
The session is always returned, while the user object is returned if the login is successful."""
2017-03-09 14:52:02 +00:00
# Create a new session
session = Session()
# Find the matching user
2017-03-29 07:44:32 +00:00
db_user = session.query(User).filter_by(username=username).first()
# Verify that the user exists
if db_user is not None:
return session, None
2017-03-10 09:11:06 +00:00
# Test the password and return the session and the user if successful
2017-03-09 14:52:02 +00:00
if bcrypt.hashpw(password.encode("utf8"), db_user.password) == db_user.password:
2017-03-10 09:11:06 +00:00
return session, db_user
2017-03-09 14:52:02 +00:00
else:
2017-03-10 09:34:27 +00:00
if enable_exceptions:
raise InvalidPasswordError("The specified password doesn't match the user's.")
else:
return session, None
2017-03-10 09:11:06 +00:00
def init_royal_db():
2017-03-28 15:43:07 +00:00
create_user("test", "test", True)
2017-03-29 07:44:32 +00:00
def find_user(username):
"""Find the user with the specified username and return the session and the user object."""
# Create a new session
session = Session()
# Find the matching user
db_user = session.query(User).filter_by(username=username).first()
# Return the session and the user
return session, db_user
2017-03-30 10:13:10 +00:00
def migrate_diario():
import datetime
session = Session()
file = open("diario.txt", encoding="utf8")
for row in file:
entry = row.split("|", 1)
new = Diario()
new.date = datetime.datetime.fromtimestamp(int(entry[0]))
new.text = entry[1]
session.add(new)
session.commit()
def new_diario_entry(dt, text, author):
# Create a new session
session = Session()
# Create a new diario entry
entry = Diario()
entry.date = dt
entry.text = text
entry.author = author.id
# Add the entry to the database
session.add(entry)
# Commit the change
session.commit()