325 lines
12 KiB
Python
325 lines
12 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash, session
|
|
import logging
|
|
from datetime import datetime
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
from db import get_db_connection, fetch_users, fetch_orders, fetch_roles, fetch_repairs, fetch_employees
|
|
from auth import encrypt_password, check_password
|
|
import random
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = 'aezakmi'
|
|
|
|
# ----------------- Logging part not related to project requirements -----------------
|
|
# Set up custom logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# Remove werkzeug logs by setting its logger to a higher level (e.g., ERROR)
|
|
werkzeug_logger = logging.getLogger('werkzeug')
|
|
werkzeug_logger.setLevel(logging.ERROR)
|
|
|
|
# Log client IP before each request
|
|
@app.before_request
|
|
def log_client_ip():
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
client_ip = client_ip.split(',')[0] # Get the first IP if it's a forwarded request
|
|
request.client_ip = client_ip # Store the client IP in the request context
|
|
|
|
# Override werkzeug's default logging to show client IP in the access log
|
|
@app.after_request
|
|
def log_request(response):
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
client_ip = client_ip.split(',')[0] # Get the first IP if it's a forwarded request
|
|
app.logger.info(f"{client_ip} - - [{request.date}] \"{request.method} {request.full_path} {request.environ.get('SERVER_PROTOCOL')}\" {response.status_code}")
|
|
return response
|
|
# ----------------- End of logging part -----------------
|
|
|
|
# Routes
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('logged_in', None)
|
|
session.pop('role_id', None)
|
|
session.pop('username', None)
|
|
return '''
|
|
<script>
|
|
alert('Úspěšně odhlášen.');
|
|
window.location.href = '/';
|
|
</script>
|
|
'''
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if session.get('logged_in'):
|
|
flash('Již jste přihlášen.', 'info')
|
|
if session.get('role_id') == 1:
|
|
return redirect(url_for('administrator'))
|
|
else:
|
|
return redirect(url_for('home'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM Zamestnanci WHERE username = ?', (username,)).fetchone()
|
|
conn.close()
|
|
|
|
if user is None or not check_password(user['heslo'], password):
|
|
flash('Nesprávné uživatelské jméno nebo heslo.', 'error')
|
|
else:
|
|
session['logged_in'] = True
|
|
session['role_id'] = user['Role_ID']
|
|
session['username'] = user['Username'] # Store username in session
|
|
flash('Úspěšně přihlášen.', 'success')
|
|
if user['Role_ID'] == 1:
|
|
return redirect(url_for('administrator'))
|
|
else:
|
|
return redirect(url_for('home'))
|
|
|
|
return render_template('login.html')
|
|
|
|
@app.route('/')
|
|
def home():
|
|
greeting = "Hello, Python!"
|
|
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
return render_template('home.html', greeting=greeting, current_time=current_time)
|
|
|
|
@app.route('/about')
|
|
def about():
|
|
return render_template('about.html')
|
|
|
|
@app.route('/administrator')
|
|
def administrator():
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
users = fetch_users(session.get('role_id'))
|
|
orders = fetch_orders()
|
|
roles = fetch_roles()
|
|
return render_template('administrator.html', users=users, orders=orders, roles=roles)
|
|
|
|
@app.route('/create_user', methods=['GET', 'POST'])
|
|
def create_user():
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
jmeno = request.form['jmeno']
|
|
prijmeni = request.form['prijmeni']
|
|
email = request.form['email']
|
|
role = request.form['role']
|
|
username = request.form['username']
|
|
heslo = encrypt_password(request.form['heslo'])
|
|
|
|
conn = get_db_connection()
|
|
existing_user = conn.execute('SELECT * FROM Zamestnanci WHERE Username = ?', (username,)).fetchone()
|
|
|
|
if existing_user:
|
|
conn.close()
|
|
return 'exists'
|
|
|
|
try:
|
|
conn.execute('INSERT INTO Zamestnanci (Jmeno, Prijmeni, Email, Role_ID, Username, Heslo) VALUES (?, ?, ?, ?, ?, ?)',
|
|
(jmeno, prijmeni, email, role, username, heslo))
|
|
conn.commit()
|
|
return 'success'
|
|
except sqlite3.Error as e:
|
|
return f'error: {e}'
|
|
finally:
|
|
conn.close()
|
|
|
|
return render_template('create_user.html')
|
|
|
|
@app.route('/edit_user/<int:user_id>', methods=['GET', 'POST'])
|
|
def edit_user(user_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)).fetchone()
|
|
|
|
if request.method == 'POST':
|
|
jmeno = request.form['jmeno']
|
|
prijmeni = request.form['prijmeni']
|
|
email = request.form['email']
|
|
role = request.form['role']
|
|
username = request.form['username']
|
|
heslo = request.form['heslo']
|
|
|
|
if heslo:
|
|
heslo = encrypt_password(heslo)
|
|
conn.execute('UPDATE Zamestnanci SET Jmeno = ?, Prijmeni = ?, Email = ?, Role_ID = ?, Username = ?, Heslo = ? WHERE ID_Uzivatele = ?',
|
|
(jmeno, prijmeni, email, role, username, heslo, user_id))
|
|
else:
|
|
conn.execute('UPDATE Zamestnanci SET Jmeno = ?, Prijmeni = ?, Email = ?, Role_ID = ?, Username = ? WHERE ID_Uzivatele = ?',
|
|
(jmeno, prijmeni, email, role, username, user_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Uživatel byl úspěšně aktualizován.')
|
|
return redirect(url_for('administrator'))
|
|
|
|
conn.close()
|
|
return render_template('edit_user.html', user=user)
|
|
|
|
@app.route('/delete_user/<int:user_id>', methods=['POST'])
|
|
def delete_user(user_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)).fetchone()
|
|
|
|
if user and user['Role_ID'] > session.get('role_id'):
|
|
conn.execute('DELETE FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,))
|
|
conn.commit()
|
|
flash('Uživatel byl úspěšně smazán.')
|
|
else:
|
|
flash('Nemáte oprávnění smazat tohoto uživatele.', 'error')
|
|
|
|
conn.close()
|
|
return redirect(url_for('administrator'))
|
|
|
|
@app.route('/edit_order/<int:order_id>', methods=['GET', 'POST'])
|
|
def edit_order(order_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
order = conn.execute('SELECT * FROM Objednavky WHERE ID_Objednavky = ?', (order_id,)).fetchone()
|
|
|
|
if request.method == 'POST':
|
|
stav = request.form['stav']
|
|
id_zamestnance = request.form['id_zamestnance']
|
|
popis = request.form['popis']
|
|
datum_konce = request.form['datum_konce']
|
|
|
|
conn.execute('UPDATE Objednavky SET Stav = ?, ID_Zamestnance = ?, Popis = ?, Datum_Konce = ? WHERE ID_Objednavky = ?',
|
|
(stav, id_zamestnance, popis, datum_konce, order_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Objednávka byla úspěšně aktualizována.')
|
|
return redirect(url_for('administrator'))
|
|
|
|
conn.close()
|
|
return render_template('edit_order.html', order=order)
|
|
|
|
@app.route('/repairs')
|
|
def repairs():
|
|
if not session.get('logged_in'):
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
repairs = fetch_repairs()
|
|
return render_template('repairs.html', repairs=repairs)
|
|
|
|
@app.route('/create_repair', methods=['GET', 'POST'])
|
|
def create_repair():
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
id_zamestnance = request.form['id_zamestnance']
|
|
nazev = request.form['nazev']
|
|
popis = request.form['popis']
|
|
|
|
conn = get_db_connection()
|
|
try:
|
|
conn.execute('INSERT INTO Opravy (ID_Zamestnance, Nazev, Popis) VALUES (?, ?, ?)',
|
|
(id_zamestnance, nazev, popis))
|
|
conn.commit()
|
|
flash('Nová oprava byla úspěšně vytvořena.', 'success')
|
|
return redirect(url_for('repairs'))
|
|
except sqlite3.Error as e:
|
|
flash(f'Chyba při vytváření opravy: {e}', 'error')
|
|
finally:
|
|
conn.close()
|
|
|
|
employees = fetch_employees()
|
|
return render_template('create_repair.html', employees=employees)
|
|
|
|
@app.route('/edit_repair/<int:repair_id>', methods=['GET', 'POST'])
|
|
def edit_repair(repair_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
repair = conn.execute('SELECT * FROM Opravy WHERE ID_Opravy = ?', (repair_id,)).fetchone()
|
|
|
|
if request.method == 'POST':
|
|
id_zamestnance = request.form['id_zamestnance']
|
|
nazev = request.form['nazev']
|
|
popis = request.form['popis']
|
|
|
|
conn.execute('UPDATE Opravy SET ID_Zamestnance = ?, Nazev = ?, Popis = ? WHERE ID_Opravy = ?',
|
|
(id_zamestnance, nazev, popis, repair_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Oprava byla úspěšně aktualizována.')
|
|
return redirect(url_for('repairs'))
|
|
|
|
employees = fetch_employees()
|
|
conn.close()
|
|
return render_template('edit_repair.html', repair=repair, employees=employees)
|
|
|
|
@app.route('/delete_repair/<int:repair_id>', methods=['POST'])
|
|
def delete_repair(repair_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
conn.execute('DELETE FROM Opravy WHERE ID_Opravy = ?', (repair_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Oprava byla úspěšně smazána.')
|
|
return redirect(url_for('repairs'))
|
|
|
|
@app.route('/create_reservation', methods=['POST'])
|
|
def create_reservation():
|
|
full_name = request.form['fullName']
|
|
email = request.form['email']
|
|
date = request.form['date']
|
|
description = request.form['description']
|
|
|
|
# Convert date to DD.MM.YYYY format
|
|
formatted_date = datetime.strptime(date, '%Y-%m-%d').strftime('%d.%m.%Y')
|
|
|
|
conn = get_db_connection()
|
|
try:
|
|
# Fetch a random user with role_id 2
|
|
user = conn.execute('SELECT ID_Uzivatele FROM Zamestnanci WHERE Role_ID = 2 ORDER BY RANDOM() LIMIT 1').fetchone()
|
|
if user:
|
|
user_id = user['ID_Uzivatele']
|
|
else:
|
|
user_id = 1 # Fallback to a default user ID if no user with role_id 2 is found
|
|
|
|
conn.execute('INSERT INTO Objednavky (Stav, ID_Zamestnance, Popis, ID_Vozidla, Datum_Zacatku, Cena) VALUES (?, ?, ?, ?, ?, ?)',
|
|
('Nová', user_id, description, 1, formatted_date, 0.0)) # Example values for ID_Vozidla
|
|
conn.commit()
|
|
flash('Rezervace byla úspěšně vytvořena.', 'success')
|
|
except sqlite3.Error as e:
|
|
flash(f'Chyba při vytváření rezervace: {e}', 'error')
|
|
finally:
|
|
conn.close()
|
|
|
|
return redirect(url_for('home'))
|
|
|
|
# Always redirect back home
|
|
@app.errorhandler(404)
|
|
def default_page(e):
|
|
return redirect(url_for('home'))
|
|
|
|
# Run the app
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, host="0.0.0.0", port=5005)
|