from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify import logging from datetime import datetime from db import get_db_connection, fetch_users, fetch_orders, fetch_roles, fetch_repairs, fetch_employees, fetch_products, update_product, add_product_stock from auth import encrypt_password, check_password import sqlite3 app = Flask(__name__) app.secret_key = 'e462a50b0401f495cfb5c67c3871fe4b' # ----------------- Logging part not related to project requirements ----------------- # Set up custom logging logging.basicConfig(level=logging.DEBUG) # Remove werkzeug logs by setting its logger to a higher level (e.g., ERROR) werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.ERROR) # Log client IP before each request @app.before_request def log_client_ip(): if request.remote_addr == '127.0.0.1': client_ip = request.headers.get('CF-Connecting-IP', request.headers.get('X-Forwarded-For', request.remote_addr)) else: client_ip = request.remote_addr client_ip = client_ip.split(',')[0] # Get the first IP if it's a forwarded request request.client_ip = client_ip # Store the client IP in the request context # Override werkzeug's default logging to show client IP in the access log @app.after_request def log_request(response): if request.remote_addr == '127.0.0.1': client_ip = request.headers.get('CF-Connecting-IP', request.headers.get('X-Forwarded-For', request.remote_addr)) else: client_ip = request.remote_addr client_ip = client_ip.split(',')[0] # Get the first IP if it's a forwarded request app.logger.info(f"{client_ip} - - [{request.date}] \"{request.method} {request.full_path} {request.environ.get('SERVER_PROTOCOL')}\" {response.status_code}") return response # ----------------- End of logging part ----------------- # Routes @app.route('/logout') def logout(): session.pop('logged_in', None) session.pop('role_id', None) session.pop('username', None) session.pop('user_id', None) return ''' ''' @app.route('/login', methods=['GET', 'POST']) def login(): if session.get('logged_in'): flash('Již jste přihlášen.', 'info') if session.get('role_id') == 1: return redirect(url_for('administrator')) elif session.get('role_id') == 2: return redirect(url_for('managers')) elif session.get('role_id') == 3: return redirect(url_for('repairs')) else: return redirect(url_for('home')) if request.method == 'POST': username = request.form['username'] password = request.form['password'] conn = get_db_connection() user = conn.execute('SELECT * FROM Zamestnanci WHERE username = ?', (username,)).fetchone() conn.close() if user is None or not check_password(user['heslo'], password): return ''' ''' else: session['logged_in'] = True session['role_id'] = user['Role_ID'] session['username'] = user['Username'] session['user_id'] = user['ID_Uzivatele'] flash('Úspěšně přihlášen.', 'success') if user['Role_ID'] == 1: return redirect(url_for('administrator')) elif user['Role_ID'] == 2: return redirect(url_for('managers')) elif user['Role_ID'] == 3: return redirect(url_for('repairs')) else: return redirect(url_for('home')) return render_template('login.html') @app.route('/') def home(): greeting = "Hello, Python!" current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return render_template('home.html', greeting=greeting, current_time=current_time) @app.route('/about') def about(): return render_template('about.html') @app.route('/administrator') def administrator(): if not session.get('logged_in') or session.get('role_id') != 1: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) users = fetch_users(session.get('role_id')) orders = fetch_orders() roles = fetch_roles() products = fetch_products() return render_template('administrator.html', users=users, orders=orders, roles=roles, products=products) @app.route('/managers') def managers(): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) users = fetch_users(session.get('role_id')) orders = fetch_orders() roles = fetch_roles() return render_template('managers.html', users=users, orders=orders, roles=roles) @app.route('/create_user', methods=['GET', 'POST']) def create_user(): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) if request.method == 'POST': jmeno = request.form['jmeno'] prijmeni = request.form['prijmeni'] email = request.form['email'] role = request.form['role'] username = request.form['username'] heslo = encrypt_password(request.form['heslo']) conn = get_db_connection() existing_user = conn.execute('SELECT * FROM Zamestnanci WHERE Username = ?', (username,)).fetchone() if existing_user: conn.close() return 'exists' try: conn.execute('INSERT INTO Zamestnanci (Jmeno, Prijmeni, Email, Role_ID, Username, Heslo) VALUES (?, ?, ?, ?, ?, ?)', (jmeno, prijmeni, email, role, username, heslo)) conn.commit() return 'success' except sqlite3.Error as e: return f'error: {e}' finally: conn.close() roles = fetch_roles() return render_template('create_user.html', roles=roles) @app.route('/edit_user/', methods=['GET', 'POST']) def edit_user(user_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() user = conn.execute('SELECT * FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)).fetchone() if request.method == 'POST': jmeno = request.form['jmeno'] prijmeni = request.form['prijmeni'] email = request.form['email'] role = request.form['role'] username = request.form['username'] heslo = request.form['heslo'] if heslo: heslo = encrypt_password(heslo) conn.execute('UPDATE Zamestnanci SET Jmeno = ?, Prijmeni = ?, Email = ?, Role_ID = ?, Username = ?, Heslo = ? WHERE ID_Uzivatele = ?', (jmeno, prijmeni, email, role, username, heslo, user_id)) else: conn.execute('UPDATE Zamestnanci SET Jmeno = ?, Prijmeni = ?, Email = ?, Role_ID = ?, Username = ? WHERE ID_Uzivatele = ?', (jmeno, prijmeni, email, role, username, user_id)) conn.commit() conn.close() flash('Uživatel byl úspěšně aktualizován.') return redirect(url_for('administrator')) conn.close() return render_template('edit_user.html', user=user) @app.route('/delete_user/', methods=['POST']) def delete_user(user_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() user = conn.execute('SELECT * FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)).fetchone() if user and user['Role_ID'] > session.get('role_id'): conn.execute('DELETE FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)) conn.commit() flash('Uživatel byl úspěšně smazán.') else: flash('Nemáte oprávnění smazat tohoto uživatele.', 'error') conn.close() return redirect(url_for('administrator')) @app.route('/edit_order/', methods=['GET', 'POST']) def edit_order(order_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2, 3]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() order = conn.execute('SELECT * FROM Objednavky WHERE ID_Objednavky = ?', (order_id,)).fetchone() users = fetch_users(session.get('role_id')) if request.method == 'POST': stav = request.form['stav'] id_zamestnance = request.form['id_zamestnance'] popis = request.form['popis'] datum_konce = request.form['datum_konce'] conn.execute('UPDATE Objednavky SET Stav = ?, ID_Zamestnance = ?, Popis = ?, Datum_Konce = ? WHERE ID_Objednavky = ?', (stav, id_zamestnance, popis, datum_konce, order_id)) conn.commit() conn.close() flash('Objednávka byla úspěšně aktualizována.') return redirect(url_for('administrator')) conn.close() return render_template('edit_order.html', order=order, users=users) @app.route('/delete_order/', methods=['POST']) def delete_order(order_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2, 3]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() conn.execute('DELETE FROM Objednavky WHERE ID_Objednavky = ?', (order_id,)) conn.commit() conn.close() flash('Objednávka byla úspěšně smazána.') return redirect(url_for('administrator')) @app.route('/repairs') def repairs(): if not session.get('logged_in') or session.get('role_id') not in [1, 3]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) repairs = fetch_repairs() orders = fetch_orders() users = fetch_users(session.get('role_id')) return render_template('repairs.html', repairs=repairs, orders=orders, users=users) @app.route('/create_repair', methods=['GET', 'POST']) def create_repair(): if not session.get('logged_in') and session.get('role_id') != 1 or session.get('role_id') not in [1, 3]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) if request.method == 'POST': id_zamestnance = request.form['id_zamestnance'] nazev = request.form['nazev'] popis = request.form['popis'] products = [{'id': pid, 'quantity': qty} for pid, qty in zip(request.form.getlist('products[0][id]'), request.form.getlist('products[0][quantity]'))] app.logger.debug(f"Received id_zamestnance: {id_zamestnance}") app.logger.debug(f"Received nazev: {nazev}") app.logger.debug(f"Received popis: {popis}") app.logger.debug(f"Received products: {products}") conn = get_db_connection() try: conn.execute('BEGIN') sql_insert_repair = 'INSERT INTO Opravy (ID_Zamestnance, Nazev, Popis) VALUES (?, ?, ?)' app.logger.debug(f"Executing SQL: {sql_insert_repair} with values ({id_zamestnance}, {nazev}, {popis})") conn.execute(sql_insert_repair, (id_zamestnance, nazev, popis)) repair_id = conn.execute('SELECT last_insert_rowid()').fetchone()[0] debug_info = f"Inserted Repair ID: {repair_id}\n" for product in products: product_id = product['id'] quantity = product['quantity'] app.logger.debug(f"Inserting product {product_id} with quantity {quantity} for repair {repair_id}") if quantity and int(quantity) > 0: conn.execute('INSERT INTO Pouzite_Produkty (ID_Opravy, ID_Produktu, Pocet_Produktu) VALUES (?, ?, ?)', (repair_id, product_id, quantity)) debug_info += f"Inserted Product ID: {product_id}, Quantity: {quantity}\n" conn.commit() session['debug'] = debug_info flash('Nová oprava byla úspěšně vytvořena.', 'success') return redirect(url_for('repairs')) except sqlite3.Error as e: conn.rollback() app.logger.error(f"Error creating repair: {e}") flash(f'Chyba při vytváření opravy: {e}', 'error') finally: conn.close() employees = fetch_employees() products = fetch_products() return render_template('create_repair.html', employees=employees, products=products) @app.route('/edit_repair/', methods=['GET', 'POST']) def edit_repair(repair_id): if not session.get('logged_in') or session.get('role_id') not in [1, 3]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() repair = conn.execute('SELECT * FROM Opravy WHERE ID_Opravy = ?', (repair_id,)).fetchone() repair = dict(repair) if request.method == 'POST': id_zamestnance = request.form['id_zamestnance'] nazev = request.form['nazev'] popis = request.form['popis'] products = [{'id': pid, 'quantity': qty} for pid, qty in zip(request.form.getlist('products[][id]'), request.form.getlist('products[][quantity]'))] conn.execute('BEGIN') try: conn.execute('UPDATE Opravy SET ID_Zamestnance = ?, Nazev = ?, Popis = ? WHERE ID_Opravy = ?', (id_zamestnance, nazev, popis, repair_id)) conn.execute('DELETE FROM Pouzite_Produkty WHERE ID_Opravy = ?', (repair_id,)) for product in products: product_id = product['id'] quantity = product['quantity'] if quantity and int(quantity) > 0: conn.execute('INSERT INTO Pouzite_Produkty (ID_Opravy, ID_Produktu, Pocet_Produktu) VALUES (?, ?, ?)', (repair_id, product_id, quantity)) conn.commit() flash('Oprava byla úspěšně aktualizována.') except sqlite3.Error as e: conn.rollback() app.logger.error(f"Error updating repair: {e}") flash(f'Chyba při aktualizaci opravy: {e}', 'error') finally: conn.close() return redirect(url_for('repairs')) employees = fetch_employees() products = fetch_products() repair['products'] = [dict(row) for row in conn.execute(''' SELECT Produkty.ID_Produktu, Produkty.Nazev, Pouzite_Produkty.Pocet_Produktu FROM Pouzite_Produkty JOIN Produkty ON Pouzite_Produkty.ID_Produktu = Produkty.ID_Produktu WHERE Pouzite_Produkty.ID_Opravy = ? ''', (repair_id,)).fetchall()] conn.close() return render_template('edit_repair.html', repair=repair, employees=employees, products=products) @app.route('/delete_repair/', methods=['POST']) def delete_repair(repair_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2, 3]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() conn.execute('DELETE FROM Opravy WHERE ID_Opravy = ?', (repair_id,)) conn.commit() conn.close() flash('Oprava byla úspěšně smazána.') return redirect(url_for('repairs')) @app.route('/create_reservation', methods=['POST']) def create_reservation(): full_name = request.form['fullName'] email = request.form['email'] datum_konce = request.form['datum_konce'] description = request.form['description'] conn = get_db_connection() try: # Fetch a random user with role_id 3 user = conn.execute('SELECT ID_Uzivatele FROM Zamestnanci WHERE Role_ID = 3 ORDER BY RANDOM() LIMIT 1').fetchone() if user: user_id = user['ID_Uzivatele'] else: user_id = 1 # Fallback to a default user ID if no user with role_id 3 is found conn.execute('INSERT INTO Objednavky (Stav, ID_Zamestnance, Popis, ID_Vozidla, Datum_Zacatku, Datum_Konce, Cena) VALUES (?, ?, ?, ?, ?, ?, ?)', ('Nová', user_id, description, 1, datetime.now().strftime('%Y-%m-%d'), datum_konce, 0.0)) # Use the current date for Datum_Zacatku conn.commit() flash('Rezervace byla úspěšně vytvořena.', 'success') except sqlite3.Error as e: flash(f'Chyba při vytváření rezervace: {e}', 'error') finally: conn.close() return ''' ''' @app.route('/edit_product/', methods=['GET', 'POST']) def edit_product(product_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() product = conn.execute('SELECT * FROM Produkty WHERE ID_Produktu = ?', (product_id,)).fetchone() if request.method == 'POST': nazev = request.form['nazev'] popis = request.form['popis'] momentalni_zasoba = request.form['momentalni_zasoba'] minimalni_zasoba = request.form['minimalni_zasoba'] update_product(product_id, nazev, popis, momentalni_zasoba, minimalni_zasoba) flash('Produkt byl úspěšně aktualizován.') return redirect(url_for('administrator')) conn.close() return render_template('edit_product.html', product=product) @app.route('/add_product_stock/', methods=['GET', 'POST']) def add_product_stock(product_id): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() product = conn.execute('SELECT * FROM Produkty WHERE ID_Produktu = ?', (product_id,)).fetchone() if request.method == 'POST': quantity = request.form['quantity'] add_product_stock(product_id, quantity) flash('Zásoba byla úspěšně přidána.') return redirect(url_for('administrator')) conn.close() return render_template('add_product_stock.html', product=product) @app.route('/create_product', methods=['GET', 'POST']) def create_product(): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) if request.method == 'POST': nazev = request.form['nazev'] popis = request.form['popis'] momentalni_zasoba = request.form['momentalni_zasoba'] minimalni_zasoba = request.form['minimalni_zasoba'] conn = get_db_connection() try: conn.execute('INSERT INTO Produkty (Nazev, Popis, Momentalni_Zasoba, Minimalni_Zasoba) VALUES (?, ?, ?, ?)', (nazev, popis, momentalni_zasoba, minimalni_zasoba)) conn.commit() flash('Nový produkt byl úspěšně přidán.', 'success') return redirect(url_for('administrator')) except sqlite3.Error as e: flash(f'Chyba při přidávání produktu: {e}', 'error') finally: conn.close() return render_template('create_product.html') @app.route('/statistics') def statistics(): if not session.get('logged_in') or session.get('role_id') not in [1, 2]: flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error') return redirect(url_for('login')) conn = get_db_connection() repairs_data = conn.execute(''' SELECT Zamestnanci.Jmeno || ' ' || Zamestnanci.Prijmeni AS employee, COUNT(Opravy.ID_Opravy) AS count FROM Opravy JOIN Zamestnanci ON Opravy.ID_Zamestnance = Zamestnanci.ID_Uzivatele GROUP BY Zamestnanci.ID_Uzivatele ''').fetchall() repairs_data = [dict(employee=row['employee'], count=row['count']) for row in repairs_data] conn.close() return render_template('statistics.html', repairs_data=repairs_data) @app.route('/repairs_by_date') def repairs_by_date(): start_date = request.args.get('start') end_date = request.args.get('end') conn = get_db_connection() repairs_data = conn.execute(''' SELECT DATE(Datum_Zacatku) AS date, COUNT(*) AS count FROM Objednavky WHERE Datum_Zacatku BETWEEN ? AND ? GROUP BY DATE(Datum_Zacatku) ''', (start_date, end_date)).fetchall() repairs_data = [dict(date=row['date'], count=row['count']) for row in repairs_data] conn.close() return jsonify(repairs_data) # Always redirect back home @app.errorhandler(404) def default_page(e): return redirect(url_for('home')) # Run the app if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=5005)