216 lines
7.9 KiB
Python
216 lines
7.9 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash, session
|
|
import logging
|
|
from datetime import datetime
|
|
import sqlite3
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = 'aezakmi'
|
|
|
|
# ----------------- Logging part not related to project requirements -----------------
|
|
# Set up custom logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# Remove werkzeug logs by setting its logger to a higher level (e.g., ERROR)
|
|
werkzeug_logger = logging.getLogger('werkzeug')
|
|
werkzeug_logger.setLevel(logging.ERROR)
|
|
|
|
# Log client IP before each request
|
|
@app.before_request
|
|
def log_client_ip():
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
client_ip = client_ip.split(',')[0] # Get the first IP if it's a forwarded request
|
|
request.client_ip = client_ip # Store the client IP in the request context
|
|
|
|
# Override werkzeug's default logging to show client IP in the access log
|
|
@app.after_request
|
|
def log_request(response):
|
|
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
|
client_ip = client_ip.split(',')[0] # Get the first IP if it's a forwarded request
|
|
app.logger.info(f"{client_ip} - - [{request.date}] \"{request.method} {request.full_path} {request.environ.get('SERVER_PROTOCOL')}\" {response.status_code}")
|
|
return response
|
|
# ----------------- End of logging part -----------------
|
|
# Database connection
|
|
def get_db_connection():
|
|
conn = sqlite3.connect('./static/db/db.sqlite')
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
# Function to convert row to dictionary
|
|
def dict_from_row(row):
|
|
return {key: row[key] for key in row.keys()}
|
|
|
|
# Function to fetch users from database
|
|
def fetch_users(role_id):
|
|
conn = get_db_connection()
|
|
users = conn.execute('SELECT * FROM Zamestnanci WHERE Role_ID >= ?', (role_id,)).fetchall()
|
|
users = [dict_from_row(user) for user in users]
|
|
conn.close()
|
|
app.logger.debug(f"Fetched users: {users}")
|
|
return users
|
|
|
|
# Function to encrypt password
|
|
def encrypt_password(password):
|
|
return generate_password_hash(password)
|
|
|
|
# Function to check password
|
|
def check_password(stored_password, provided_password):
|
|
return check_password_hash(stored_password, provided_password)
|
|
|
|
# Routes
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('logged_in', None)
|
|
session.pop('role_id', None)
|
|
session.pop('username', None)
|
|
return '''
|
|
<script>
|
|
alert('Úspěšně odhlášen.');
|
|
window.location.href = '/';
|
|
</script>
|
|
'''
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if session.get('logged_in'):
|
|
flash('Již jste přihlášen.', 'info')
|
|
if session.get('role_id') == 1:
|
|
return redirect(url_for('administrator'))
|
|
else:
|
|
return redirect(url_for('home'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM Zamestnanci WHERE username = ?', (username,)).fetchone()
|
|
conn.close()
|
|
|
|
if user is None or not check_password(user['heslo'], password):
|
|
flash('Nesprávné uživatelské jméno nebo heslo.', 'error')
|
|
else:
|
|
session['logged_in'] = True
|
|
session['role_id'] = user['Role_ID']
|
|
session['username'] = user['Username'] # Store username in session
|
|
flash('Úspěšně přihlášen.', 'success')
|
|
if user['Role_ID'] == 1:
|
|
return redirect(url_for('administrator'))
|
|
else:
|
|
return redirect(url_for('home'))
|
|
|
|
return render_template('login.html')
|
|
|
|
@app.route('/')
|
|
def home():
|
|
greeting = "Hello, Python!"
|
|
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
return render_template('home.html', greeting=greeting, current_time=current_time)
|
|
|
|
@app.route('/about')
|
|
def about():
|
|
return render_template('about.html')
|
|
|
|
@app.route('/administrator')
|
|
def administrator():
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
users = fetch_users(session.get('role_id'))
|
|
return render_template('administrator.html', users=users)
|
|
|
|
@app.route('/create_user', methods=['GET', 'POST'])
|
|
def create_user():
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
jmeno = request.form['jmeno']
|
|
prijmeni = request.form['prijmeni']
|
|
email = request.form['email']
|
|
role = request.form['role']
|
|
username = request.form['username']
|
|
heslo = encrypt_password(request.form['heslo'])
|
|
|
|
conn = get_db_connection()
|
|
existing_user = conn.execute('SELECT * FROM Zamestnanci WHERE Username = ?', (username,)).fetchone()
|
|
|
|
if existing_user:
|
|
conn.close()
|
|
return 'exists'
|
|
|
|
try:
|
|
conn.execute('INSERT INTO Zamestnanci (Jmeno, Prijmeni, Email, Role_ID, Username, Heslo) VALUES (?, ?, ?, ?, ?, ?)',
|
|
(jmeno, prijmeni, email, role, username, heslo))
|
|
conn.commit()
|
|
return 'success'
|
|
except sqlite3.Error as e:
|
|
return f'error: {e}'
|
|
finally:
|
|
conn.close()
|
|
|
|
return render_template('create_user.html')
|
|
|
|
@app.route('/edit_user/<int:user_id>', methods=['GET', 'POST'])
|
|
def edit_user(user_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)).fetchone()
|
|
|
|
if request.method == 'POST':
|
|
jmeno = request.form['jmeno']
|
|
prijmeni = request.form['prijmeni']
|
|
email = request.form['email']
|
|
role = request.form['role']
|
|
username = request.form['username']
|
|
heslo = request.form['heslo']
|
|
|
|
if heslo:
|
|
heslo = encrypt_password(heslo)
|
|
conn.execute('UPDATE Zamestnanci SET Jmeno = ?, Prijmeni = ?, Email = ?, Role_ID = ?, Username = ?, Heslo = ? WHERE ID_Uzivatele = ?',
|
|
(jmeno, prijmeni, email, role, username, heslo, user_id))
|
|
else:
|
|
conn.execute('UPDATE Zamestnanci SET Jmeno = ?, Prijmeni = ?, Email = ?, Role_ID = ?, Username = ? WHERE ID_Uzivatele = ?',
|
|
(jmeno, prijmeni, email, role, username, user_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Uživatel byl úspěšně aktualizován.')
|
|
return redirect(url_for('administrator'))
|
|
|
|
conn.close()
|
|
return render_template('edit_user.html', user=user)
|
|
|
|
@app.route('/delete_user/<int:user_id>', methods=['POST'])
|
|
def delete_user(user_id):
|
|
if not session.get('logged_in') or session.get('role_id') != 1:
|
|
flash('Nemáte oprávnění k přístupu na tuto stránku.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,)).fetchone()
|
|
|
|
if user and user['Role_ID'] > session.get('role_id'):
|
|
conn.execute('DELETE FROM Zamestnanci WHERE ID_Uzivatele = ?', (user_id,))
|
|
conn.commit()
|
|
flash('Uživatel byl úspěšně smazán.')
|
|
else:
|
|
flash('Nemáte oprávnění smazat tohoto uživatele.', 'error')
|
|
|
|
conn.close()
|
|
return redirect(url_for('administrator'))
|
|
|
|
# Always redirect back home
|
|
@app.errorhandler(404)
|
|
def default_page(e):
|
|
return redirect(url_for('home'))
|
|
|
|
# Run the app
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, host="0.0.0.0", port=5005)
|