1
Fork 0
mirror of https://github.com/pds-nest/nest.git synced 2024-11-30 00:14:19 +00:00
pds-2021-g2-nest/code/backend/nest_backend/gestione.py
2021-05-10 17:03:10 +02:00

112 lines
3.4 KiB
Python

"""
Gestione adds many fancy thingamajigs to the flask application, such as a login system and such.
"""
from .database import *
import bcrypt
import functools
from flask_jwt_extended import get_jwt_identity
from flask import jsonify
def authenticate(username, password):
"""
Authentication method. It checks if the combination of username+password is a valid match. If not, it returns None.
:param username: the user's email
:param password: the user's password
:return: if the credentials are correct, it returns the user. Else, it returns None.
"""
user = User.query.filter_by(email=username).first()
try:
if bcrypt.checkpw(bytes(password, encoding="utf-8"), user.password):
return user
except AttributeError:
# Se non esiste l'Utente
return None
def identity(payload):
"""
Authentication verification method. It checks if the user is in fact registered on the server.
It is required by Flask-JWT, and shouldnt be used alone.
:param payload: the reqest payload.
:return: an User or None. It depends whether the user is actually registered on the platform.
"""
user_id = payload['identity']
user = User.query.filter_by(id=user_id).first()
if user:
return user.id
return None
def gen_password(password):
"""
It generates an hashed password.
:param password: the password that needs to be hashed.
:return: the password's hash.
"""
return bcrypt.hashpw(bytes(password, "utf-8"), bcrypt.gensalt())
def find_user(email):
return User.query.filter_by(email=email).first()
def admin_or_403(f):
@functools.wraps(f)
def func(*args, **kwargs):
current_user = get_jwt_identity()
return f(*args, **kwargs)
return func
def repository_auth(f):
@functools.wraps(f)
def func(*args, **kwargs):
user = find_user(get_jwt_identity())
repository_id = kwargs["rid"]
if not repository_id:
return json_error("Missing one or more parameters."), 400
repository = Repository.query.filter_by(id=repository_id).first()
if not repository:
return json_error("Cant't find the repository."), 404
if repository.owner_id != user.email and user.email not in [a.email for a in
repository.authorizations] and not user.isAdmin:
return json_error("Stop right there, criminal scum! Nobody accesses protected data under MY watch!"), 403
return f(*args, **kwargs)
return func
def json_error(msg):
"""
Returns an error in json format
:param msg: the error message.
:return: a json formatted string.
"""
return jsonify({"result": "failure", 'msg': msg})
def json_success(data):
"""
An happy little function. Its happy because the operation was successful.
:param data: the thing you want to be returned
:return: a json formatted string
"""
return jsonify({"result": "success", "data": data})
def error_handler(e):
try:
print(f"{e.description} - {e.code}")
return json_error(f"{e.description} - {e.code}"), 500
except Exception:
print(e)
return json_error(f"{e.__repr__()}"), 500
def json_request_authorizer(json, serializable):
json_keys = json.keys()
serializable_keys = serializable.to_json().keys()
return all(key in json_keys for key in serializable_keys)