import os from flask import Flask, session, url_for, redirect, request, render_template, abort from flask_sqlalchemy import SQLAlchemy from sqlalchemy.exc import IntegrityError import bcrypt app = Flask(__name__) app.secret_key = "pepsecret" # SQL app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///data.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) class User(db.Model): """Utente per il login sul sito dell'inventario.""" __tablename__ = "website_users" uid = db.Column(db.Integer, primary_key=True) username = db.Column(db.String, unique=True) passwd = db.Column(db.LargeBinary) def __init__(self, username, passwd): self.username = username self.passwd = passwd def __repr__(self): return "".format(self.username, self.passwd) class Ente(db.Model): """Ente (Unione Terre di Castelli, Comune di Vignola...).""" __tablename__ = "enti" eid = db.Column(db.Integer, primary_key=True) nomeente = db.Column(db.String) nomebreveente = db.Column(db.String) servizi = db.relationship("Servizio", backref='ente', lazy='dynamic') def __init__(self, nomeente, nomebreveente): self.nomeente = nomeente self.nomebreveente = nomebreveente def __repr__(self): return "".format(self.nomebreveente) class Servizio(db.Model): """Servizio di un ente (Anagrafe, Ufficio Scuola, Sindaco).""" __tablename__ = "servizi" sid = db.Column(db.Integer, primary_key=True) eid = db.Column(db.Integer, db.ForeignKey('enti.eid')) nomeservizio = db.Column(db.String) locazione = db.Column(db.String) impiegati = db.relationship("Impiegato", backref='servizio', lazy='dynamic') def __init__(self, eid, nomeservizio, locazione): self.eid = eid self.nomeservizio = nomeservizio self.locazione = locazione def __repr__(self): return "".format(self.nomeservizio) class Impiegato(db.Model): """Impiegato in uno dei servizi.""" __tablename__ = "impiegati" iid = db.Column(db.Integer, primary_key=True) sid = db.Column(db.Integer, db.ForeignKey('servizi.sid')) nomeimpiegato = db.Column(db.String) username = db.Column(db.String, unique=True) passwd = db.Column(db.String) dispositivi = db.relationship("Accesso", backref='impiegato', lazy='dynamic') def __init__(self, sid, nomeimpiegato, username, passwd): self.sid = sid self.nomeimpiegato = nomeimpiegato self.username = username self.passwd = passwd def __repr__(self): return "".format(self.nome) class Dispositivo(db.Model): """Dispositivo gestito dal CED registrato nell'inventario.""" __tablename__ = "dispositivi" did = db.Column(db.Integer, primary_key=True) accessi = db.relationship("Accesso", backref='dispositivo', lazy='dynamic') tipo = db.Column(db.String) marca = db.Column(db.String) modello = db.Column(db.String) inv_ced = db.Column(db.String) inv_ente = db.Column(db.String) fornitore = db.Column(db.String) seriale = db.Column(db.String) ip = db.Column(db.String) nid = db.Column(db.Integer, db.ForeignKey('reti.nid')) rete = db.relationship("Rete", backref='dispositivi') def __init__(self, tipo, marca, modello, inv_ced, inv_ente, fornitore, nid, seriale, ip): self.tipo = tipo self.marca = marca self.modello = modello self.inv_ced = inv_ced self.inv_ente = inv_ente self.fornitore = fornitore self.nid = nid self.seriale = seriale self.ip=ip def __repr__(self): return "".format(self.inv_ced) class Accesso(db.Model): """Tabella di associazione tra dispositivi e impiegati.""" __tablename__ = "assoc_accessi" iid = db.Column(db.Integer, db.ForeignKey('impiegati.iid'), primary_key=True) did = db.Column(db.Integer, db.ForeignKey('dispositivi.did'), primary_key=True) def __init__(self, iid, did): self.iid = iid self.did = did def __repr__(self): return "".format(self.iid, self.did) class Rete(db.Model): """Configurazione di rete di uno o più computer.""" __tablename__ = "reti" nid = db.Column(db.Integer, primary_key=True) nome = db.Column(db.String) network_ip = db.Column(db.String, unique=True, nullable=False) subnet = db.Column(db.Integer, nullable=False) primary_dns = db.Column(db.String) secondary_dns = db.Column(db.String) def __init__(self, nome, network_ip, subnet, primary_dns, secondary_dns): self.nome = nome self.network_ip = network_ip self.subnet = subnet self.primary_dns = primary_dns self.secondary_dns = secondary_dns def __repr__(self): return "".format(self.nid, self.nome) class FakeAccesso: """Hackerino usato nel caso in cui non ci sia nessun impiegato assegnato a un dispositivo. Viva il duck typing!""" def __init__(self, dispositivo): self.did = dispositivo.did self.iid = None self.dispositivo = dispositivo def __getitem__(self, key): if key == 0: return self.dispositivo # Funzioni del sito def login(username, password): """Controlla se l'username e la password di un utente del sito sono corrette.""" user = User.query.filter_by(username=username).first() return user is not None and bcrypt.checkpw(bytes(password, encoding="utf-8"), user.passwd) def subnet_to_string(integer): """Converte una subnet mask in numero in una stringa""" still_int = (0xFFFFFFFF << (32 - integer)) & 0xFFFFFFFF return f"{still_int >> 24}.{(still_int >> 16) & 0xFF}.{(still_int >> 8)}.{still_int & 0xFF}" # Sito @app.route('/') def page_home(): """Pagina principale del sito: se non sei loggato reindirizza alla pagina del login, se sei loggato effettua il logout e dopo reindirizza al login""" if 'username' not in session: return abort(403) else: return redirect(url_for('page_dashboard')) @app.route('/login', methods=['GET', 'POST']) def page_login(): """Pagina di login: accetta richieste GET per la visualizzazione della pagina e richieste POST con form data per il login""" if request.method == 'GET': goldfish = url_for("static", filename="goldfish.png") return render_template("login.htm", goldfish=goldfish) else: if login(request.form['username'], request.form['password']): session['username'] = request.form['username'] return redirect(url_for('page_dashboard')) else: return render_template('error.htm', error="Username o password non validi.") @app.route('/logout') def page_logout(): """Pagina di logout: slogga l'utente se visitata""" if 'username' in session: session.pop('username') return redirect(url_for('page_login')) @app.route('/dashboard') def page_dashboard(): """Dashboard del sito: Conteggia i servizi e visualizza la navbar Sì, è un po' inutile.""" enti = Ente.query.all() conteggioservizi = dict() goldfish = url_for("static", filename="goldfish.png") for ente in enti: conteggioservizi[ente.nomeente] = Servizio.query.join(Ente).filter_by(eid=ente.eid).count() conteggioutenti = dict() for ente in enti: conteggioutenti[ente.nomeente] = Impiegato.query.join(Servizio).join(Ente).filter_by(eid=ente.eid).count() return render_template("dashboard.htm", type="main", user=session["username"], conteggioutenti=conteggioutenti, conteggioservizi=conteggioservizi, goldfish=goldfish) @app.route('/ente_add', methods=['GET', 'POST']) def page_ente_add(): """Pagina di creazione nuovo ente: come tutte le altre pagine di creazione del sito, accetta GET per visualizzare la pagina e POST con form data per l'aggiunta effettiva.""" if 'username' not in session: return abort(403) if request.method == 'GET': return render_template("ente/add.htm", type="ente", user=session["username"]) else: nuovoent = Ente(request.form['nomeente'], request.form['nomebreveente']) db.session.add(nuovoent) db.session.commit() return redirect(url_for('page_ente_list')) @app.route('/ente_del/') def page_ente_del(eid): """Pagina di cancellazione ente: accetta richieste GET per cancellare l'ente specificato.""" if 'username' not in session: return abort(403) ente = Ente.query.get_or_404(eid) servizi = Servizio.query.filter_by(eid=ente.eid).all() for serv in servizi: impiegati = Impiegato.query.filter_by(sid=serv.sid).all() for imp in impiegati: db.session.delete(imp) db.session.delete(serv) db.session.delete(ente) db.session.commit() return redirect(url_for('page_ente_list')) @app.route('/ente_list') def page_ente_list(): """Pagina di elenco degli enti disponibili sul sito.""" if 'username' not in session: return abort(403) enti = Ente.query.all() return render_template("ente/list.htm", enti=enti, type="ente", user=session["username"]) @app.route('/ente_show/', methods=['GET', 'POST']) def page_ente_show(eid): if 'username' not in session: return abort(403) if request.method == "GET": ente = Ente.query.get_or_404(eid) return render_template("ente/show.htm", ente=ente, user=session["username"]) else: ente = Ente.query.get_or_404(eid) ente.nomeente = request.form["nomeente"] ente.nomebreveente = request.form["nomebreveente"] db.session.commit() return redirect(url_for('page_ente_list')) @app.route('/serv_add', methods=['GET', 'POST']) def page_serv_add(): """Pagina di creazione nuovo servizio: come tutte le altre pagine di creazione del sito, accetta GET per visualizzare la pagina e POST con form data per l'aggiunta effettiva.""" if 'username' not in session: return abort(403) if request.method == 'GET': enti = Ente.query.all() return render_template("servizio/add.htm", enti=enti, type="serv", user=session["username"]) else: nuovoserv = Servizio(request.form['eid'], request.form['nomeservizio'], request.form['locazione']) db.session.add(nuovoserv) db.session.commit() return redirect(url_for('page_serv_list')) @app.route('/serv_del/') def page_serv_del(sid): """Pagina di cancellazione servizio: accetta richieste GET per cancellare il servizio specificato.""" if 'username' not in session: return abort(403) serv = Servizio.query.get_or_404(sid) impiegati = Impiegato.query.filter_by(sid=serv.sid).all() for imp in impiegati: db.session.delete(imp) db.session.delete(serv) db.session.commit() return redirect(url_for('page_serv_list')) @app.route('/serv_list') def page_serv_list(): """Pagina di elenco dei servizi registrati sul sito.""" if 'username' not in session: return abort(403) serv = Servizio.query.join(Ente).all() return render_template("servizio/list.htm", serv=serv, type="serv", user=session["username"]) @app.route('/serv_list/') def page_serv_list_plus(eid): """Pagina di elenco dei servizi registrati sul sito, filtrati per ente.""" if 'username' not in session: return abort(403) serv = Servizio.query.join(Ente).filter_by(eid=eid).all() return render_template("servizio/list.htm", serv=serv, type="serv", user=session["username"]) @app.route('/serv_show/', methods=['GET', 'POST']) def page_serv_show(sid): if 'username' not in session: return abort(403) if request.method == "GET": serv = Servizio.query.get_or_404(sid) enti = Ente.query.all() return render_template("servizio/show.htm", serv=serv, enti=enti, user=session["username"]) else: serv = Servizio.query.get_or_404(sid) serv.eid = request.form["eid"] serv.nomeservizio = request.form["nomeservizio"] serv.locazione = request.form["locazione"] db.session.commit() return redirect(url_for('page_serv_list')) @app.route('/imp_add', methods=['GET', 'POST']) def page_imp_add(): """Pagina di creazione nuovo impiegato: come tutte le altre pagine di creazione del sito, accetta GET per visualizzare la pagina e POST con form data per l'aggiunta effettiva.""" if 'username' not in session: return abort(403) if request.method == 'GET': servizi = Servizio.query.join(Ente).all() return render_template("impiegato/add.htm", servizi=servizi, type="imp", user=session["username"]) else: nuovoimp = Impiegato(request.form['sid'], request.form['nomeimpiegato'], request.form['username'], request.form['passwd'],) db.session.add(nuovoimp) db.session.commit() return redirect(url_for('page_imp_list')) @app.route('/imp_del/') def page_imp_del(iid): if 'username' not in session: return abort(403) imp = Impiegato.query.get_or_404(iid) db.session.delete(imp) db.session.commit() return redirect(url_for('page_imp_list')) @app.route('/imp_list') def page_imp_list(): """Pagina di elenco degli impiegati registrati nell'inventario.""" if 'username' not in session: return abort(403) impiegati = Impiegato.query.join(Servizio).join(Ente).all() return render_template("impiegato/list.htm", impiegati=impiegati, type="imp", user=session["username"]) @app.route('/imp_list/') def page_imp_list_plus(sid): """Pagina di elenco degli impiegati registrati nell'inventario, filtrati per servizio.""" if 'username' not in session: return abort(403) impiegati = Impiegato.query.join(Servizio).filter_by(sid=sid).join(Ente).all() return render_template("impiegato/list.htm", impiegati=impiegati, user=session["username"]) @app.route('/imp_show/', methods=['GET', 'POST']) def page_imp_show(iid): """Pagina di cancellazione impiegato: accetta richieste GET per cancellare l'impiegato specificato.""" if 'username' not in session: return abort(403) if request.method == "GET": imp = Impiegato.query.get_or_404(iid) servizi = Servizio.query.all() return render_template("impiegato/show.htm", imp=imp, servizi=servizi, user=session["username"]) else: imp = Impiegato.query.get_or_404(iid) imp.sid = request.form["sid"] imp.nomeimpiegato = request.form["nomeimpiegato"] imp.username = request.form["username"] imp.passwd = request.form["passwd"] db.session.commit() return redirect(url_for('page_imp_list')) @app.route('/imp_details/') def page_imp_details(iid): """Pagina dei dettagli di un impiegato""" if 'username' not in session: return abort(403) impiegato = Impiegato.query.filter_by(iid=iid).first_or_404() return render_template("impiegato/details.htm", imp=impiegato, type="imp", user=session["username"]) @app.route('/disp_add', methods=['GET', 'POST']) def page_disp_add(): """Pagina di creazione nuovo dispositivo: come tutte le altre pagine di creazione del sito, accetta GET per visualizzare la pagina e POST con form data per l'aggiunta effettiva.""" if 'username' not in session: return abort(403) if request.method == 'GET': serial = request.args.get("scanned_barcode") opzioni = ["Centralino", "Dispositivo generico di rete", "Marcatempo", "PC", "Portatile", "POS", "Router", "Server", "Stampante di rete", "Switch", "Telefono IP", "Monitor", "Scanner", "Stampante locale"] reti = Rete.query.all() impiegati = Impiegato.query.all() return render_template("dispositivo/add.htm", impiegati=impiegati, opzioni=opzioni, reti=reti, type="dev", user=session["username"], serial=serial) else: if request.form["inv_ced"]: try: int(request.form["inv_ced"]) except ValueError: return render_template("error.htm", error="Il campo Inventario CED deve contenere un numero.") if request.form["inv_ente"]: try: int(request.form["inv_ente"]) except ValueError: return render_template("error.htm", error="Il campo Inventario ente deve contenere un numero.") nuovodisp = Dispositivo(request.form['tipo'], request.form['marca'], request.form['modello'], request.form['inv_ced'], request.form['inv_ente'], request.form['fornitore'], request.form['rete'], request.form['seriale'], request.form['ip']) db.session.add(nuovodisp) db.session.commit() # Trova tutti gli utenti, edizione sporco hack in html users = list() while True: # Trova tutti gli utenti esistenti userstring = 'utente{}'.format(len(users)) if userstring in request.form: users.append(request.form[userstring]) else: break for user in users: nuovologin = Accesso(int(user), nuovodisp.did) db.session.add(nuovologin) db.session.commit() return redirect(url_for('page_disp_list')) @app.route('/disp_del/') def page_disp_del(did): """Pagina di cancellazione dispositivo: accetta richieste GET per cancellare il dispositivo specificato.""" if 'username' not in session: return abort(403) disp = Dispositivo.query.filter_by(did=did).join(Accesso).first_or_404() for accesso in disp.accessi: db.session.delete(accesso) db.session.delete(disp) db.session.commit() return redirect(url_for('page_disp_list')) @app.route('/disp_list') def page_disp_list(): """Pagina di elenco dei dispositivi registrati nell'inventario.""" if 'username' not in session: return abort(403) accessi = list() dispositivi = Dispositivo.query.all() for dispositivo in dispositivi: accesso = Accesso.query.join(Dispositivo).filter_by(did=dispositivo.did).join(Impiegato).all() if not accesso: # oh dio mio a cosa stavo pensando viva il duck typing accessi.append([FakeAccesso(dispositivo)]) else: accessi.append(accesso) return render_template("dispositivo/list.htm", accessi=accessi, type="disp", user=session["username"]) @app.route('/disp_details/') def page_disp_details(did): """Pagina di dettagli di un dispositivo, contenente anche gli utenti che vi hanno accesso.""" if 'username' not in session: return abort(403) disp = Dispositivo.query.filter_by(did=did).first_or_404() accessi = Accesso.query.filter_by(did=did).all() return render_template("dispositivo/details.htm", disp=disp, accessi=accessi, type="disp", user=session["username"]) @app.route('/disp_show/', methods=['GET', 'POST']) def page_disp_show(did): if 'username' not in session: return abort(403) if request.method == 'GET': disp = Dispositivo.query.filter_by(did=did).first_or_404() accessi = Accesso.query.filter_by(did=did).all() impiegati = Impiegato.query.all() opzioni = ["Centralino", "Dispositivo generico di rete", "Marcatempo", "PC", "Portatile", "POS", "Router", "Server", "Stampante di rete", "Switch", "Telefono IP", "Monitor", "Scanner", "Stampante locale"] reti = Rete.query.all() return render_template("dispositivo/show.htm", dispositivo=disp, accessi=accessi, impiegati=impiegati, type="disp", user=session["username"], opzioni=opzioni, reti=reti) else: if request.form["inv_ced"]: try: int(request.form["inv_ced"]) except ValueError: return render_template("error.htm", error="Il campo Inventario CED deve contenere un numero.") if request.form["inv_ente"]: try: int(request.form["inv_ente"]) except ValueError: return render_template("error.htm", error="Il campo Inventario ente deve contenere un numero.") disp = Dispositivo.query.get_or_404(did) accessi = Accesso.query.filter_by(did=did).all() disp.tipo = request.form['tipo'] disp.marca = request.form['marca'] disp.modello = request.form['modello'] disp.inv_ced = request.form['inv_ced'] disp.inv_ente = request.form['inv_ente'] disp.fornitore = request.form['fornitore'] disp.nid = int(request.form['rete']) disp.ip = request.form['ip'] # Trova tutti gli utenti, edizione sporco hack in html users = list() while True: # Trova tutti gli utenti esistenti userstring = 'utente{}'.format(len(users)) if userstring in request.form: users.append(request.form[userstring]) else: break for accesso in accessi: db.session.delete(accesso) for user in users: nuovologin = Accesso(int(user), disp.did) db.session.add(nuovologin) db.session.commit() return redirect(url_for('page_disp_list')) @app.route('/net_add', methods=['GET', 'POST']) def page_net_add(): """Pagina di creazione nuova rete: come tutte le altre pagine di creazione del sito, accetta GET per visualizzare la pagina e POST con form data per l'aggiunta effettiva.""" if 'username' not in session: return abort(403) if request.method == 'GET': return render_template("net/add.htm", type="net", user=session["username"]) else: nuovonet = Rete(nome=request.form["nome"], network_ip=request.form["network_ip"], subnet=request.form["subnet"], primary_dns=request.form["primary_dns"], secondary_dns=request.form["secondary_dns"]) db.session.add(nuovonet) db.session.commit() return redirect(url_for('page_net_list')) @app.route('/net_del/') def page_net_del(nid): """Pagina di cancellazione rete: accetta richieste GET per cancellare la rete specificata.""" if 'username' not in session: return abort(403) rete = Rete.query.get_or_404(nid) db.session.delete(rete) db.session.commit() return redirect(url_for('page_net_list')) @app.route('/net_list') def page_net_list(): if 'username' not in session: return abort(403) reti = Rete.query.all() return render_template("net/list.htm", reti=reti, type="net", user=session["username"]) @app.route('/net_details/') def page_net_details(nid): if 'username' not in session: return abort(403) net = Rete.query.filter_by(nid=nid).first_or_404() dispositivi = Dispositivo.query.join(Rete).filter_by(nid=nid).all() subnet = subnet_to_string(net.subnet) return render_template("net/details.htm", net=net, subnet=subnet, dispositivi=dispositivi, type="net", user=session["username"]) @app.route('/user_list') def page_user_list(): """Pagina di elenco degli utenti che possono connettersi al sito. Le password sono hashate.""" if 'username' not in session: return abort(403) utenti = User.query.all() return render_template("user/list.htm", utenti=utenti, type="user", user=session["username"]) @app.route('/user_del/') def page_user_del(uid): """Pagina di cancellazione impiegato: accetta richieste GET per cancellare l'utente specificato.""" if 'username' not in session: return abort(403) utente = User.query.get_or_404(uid) db.session.delete(utente) db.session.commit() return redirect(url_for('page_user_list')) @app.route('/user_add', methods=['GET', 'POST']) def page_user_add(): """Pagina di creazione nuovo utente del sito: come tutte le altre pagine di creazione del sito, accetta GET per visualizzare la pagina e POST con form data per l'aggiunta effettiva. Le password vengono hashate con bcrypt.""" if 'username' not in session: return abort(403) if request.method == 'GET': return render_template("user/add.htm", type="user", user=session["username"]) else: p = bytes(request.form["passwd"], encoding="utf-8") cenere = bcrypt.hashpw(p, bcrypt.gensalt()) nuovo = User(request.form['username'], cenere) db.session.add(nuovo) db.session.commit() return redirect(url_for('page_user_list')) @app.route('/query', methods=['GET', 'POST']) def page_query(): """Pagina delle query manuali: in GET visualizza la pagina per fare una query, mentre in POST visualizza i risultati.""" if 'username' not in session: return abort(403) if request.method == 'GET': return render_template("query.htm", user=session["username"], type="query") else: try: result = db.engine.execute("SELECT" + request.form["query"] + ";") except Exception as e: return render_template("query.htm", query=request.form["query"], error=repr(e), user=session["username"], type="query") return render_template("query.htm", query=request.form["query"], result=result, user=session["username"], type="query") @app.route('/smecds', methods=['GET']) def page_smecds(): """Pagina che visualizza i credits del sito""" if 'username' not in session: return abort(403) if request.method == 'GET': return render_template("smecds.htm", type="main", user=session["username"]) @app.errorhandler(403) def page_403(_): return render_template('403.htm', user=session["username"]) @app.errorhandler(404) def page_404(_): return render_template('404.htm', user=session["username"]) @app.errorhandler(500) def page_500(e): return render_template('500.htm', e=e, user=session["username"]) if __name__ == "__main__": # Se non esiste il database, crealo e inizializzalo! if not os.path.isfile("data.db"): db.create_all() try: # L'utente predefinito è "stagista" "smecds". nuovapassword = bcrypt.hashpw(b"smecds", bcrypt.gensalt()) nuovouser = User('stagista', nuovapassword) db.session.add(nuovouser) # Crea una rete nulla da utilizzare quando non ci sono altre reti disponibili retenulla = Rete(nome="Sconosciuta", network_ip="0.0.0.0", subnet=0, primary_dns="0.0.0.0", secondary_dns="0.0.0.0") db.session.add(retenulla) db.session.commit() except IntegrityError: # Se queste operazioni sono già state compiute in precedenza, annullale db.session.rollback() # Esegui il sito in modalità debug app.run(debug=True)