Untitled
unknown
plain_text
a year ago
8.3 kB
7
Indexable
import sqlite3
import sys
import bcrypt
# import bcrypt
import os
from utility.encryption import Encryption
class Database:
database_name = os.path.join("db", "um.db")
_instance = None
ENCRYPTED_COLUMNS = {
"members": [0, 1, 5, 6 ,7 ,8 ,9 ,10],
"logs": [2, 3, 4],
"users": [1]
}
@classmethod
def get_instance(cls, _logger):
if cls._instance is None or _logger is not None:
cls._instance = cls(_logger)
cls._instance.create_database()
return cls._instance
def __init__(self, _logger):
self.conn = None
self._logger = _logger
self._encryption = Encryption()
self._ensure_db_folder_exists()
def _ensure_db_folder_exists(self):
"""
Ensure the db folder exists.
"""
try:
if not os.path.exists("db"):
os.makedirs("db")
except Exception as e:
print(f"Failed to create db folder: {e}")
def connect_database(self):
"""
Connect to the SQLite database.
"""
try:
self.conn = sqlite3.connect(self.database_name)
return self.conn.cursor()
except Exception as e:
print(f"Failed to connect to database: {e}")
sys.exit(1)
def close_connection(self):
"""
Close the database connection.
"""
if self.conn:
self.conn.close()
def create_database(self):
"""
Create the users table.
"""
cursor = None
try:
cursor = self.connect_database()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT NOT NULL,
profile BLOB
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
age INTEGER NOT NULL,
gender TEXT NOT NULL,
weight REAL NOT NULL,
street_name BLOB NOT NULL,
house_number BLOB NOT NULL,
zipcode BLOB NOT NULL,
city TEXT NOT NULL,
email TEXT NOT NULL,
phone TEXT NOT NULL,
reg_date TEXT NOT NULL,
member_id TEXT UNIQUE NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
time TEXT,
username TEXT,
activity TEXT,
additional_info TEXT,
suspicious BOOLEAN,
viewed BOOLEAN
)
''')
self.conn.commit()
except Exception as e:
self._logger.log_activities("System", "create_database", f"SQL Error: {e}", "Yes")
finally:
self.close_connection()
def select_statement(self, query, params=(), table_name=None):
"""
Execute a SELECT query and return the result.
"""
result = None
try:
cursor = self.connect_database()
cursor.execute(query, params)
result = cursor.fetchone()
if result and table_name in self.ENCRYPTED_COLUMNS:
encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name]
result = [Encryption.decrypt_data(result[i]) if i in encrypt_indexes else result[i] for i in
range(len(result))]
except Exception as e:
self._logger.log_activities("System", "select_statement", f"SQL Error: {e}", "Yes")
finally:
self.close_connection()
return result
def select_all(self, query, params=(), encrypt_indexes=[]):
"""
Execute a SELECT query and return all results.
"""
cursor = None
results = None
try:
cursor = self.connect_database()
cursor.execute(query, params)
results = cursor.fetchall()
except Exception as e:
self._logger.log_activities("System", "select_all", f"SQL Error: {e}", "Yes")
finally:
self.close_connection()
return results
def insert_statement(self, query, params=(), table_name=None):
"""
Execute an INSERT query.
"""
cursor = None
try:
cursor = self.connect_database()
if table_name.lower() in self.ENCRYPTED_COLUMNS:
encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name.lower()]
params = [self._encryption.encrypt_data(param) if i in encrypt_indexes and isinstance(param, str)
else
param for i, param in enumerate(params)]
cursor.execute(query, params)
self.conn.commit()
except Exception as e:
self._logger.log_activities("System", "insert_statement", f"SQL Error: {e}", "Yes")
print(e)
pass
finally:
self.close_connection()
def authenticate_user(self, username, password):
"""
Authenticate a user.
"""
cursor = None
try:
cursor = self.connect_database()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if user and bcrypt.checkpw(password.encode('utf-8'), user[2]):
return user[0], user[1], user[2], user[3]
return None
except Exception as e:
print(e)
finally:
self.close_connection()
def get_profile(self, profile):
"""
Get the profile of a user.
"""
cursor = None
try:
cursor = self.connect_database()
cursor.execute("SELECT * FROM users WHERE id = ?", (profile,))
user = cursor.fetchone()
return user
except Exception as e:
pass
return None
finally:
self.close_connection()
def check_if_table_exists(self, table_name):
"""
Check if a table exists.
"""
cursor = None
try:
cursor = self.connect_database()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
except Exception as e:
self._logger.log_activities("System", "check_if_table_exists", f"SQL Error: {e}", "Yes")
return False
finally:
self.close_connection()
def update_statement(self, query, params=(), table_name=None):
"""
Execute an UPDATE query.
"""
cursor = None
try:
cursor = self.connect_database()
if table_name in self.ENCRYPTED_COLUMNS:
encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name]
params = [self._encryption.encrypt_data(param) if i in encrypt_indexes and isinstance(param, str)
else
param for i, param in enumerate(params)]
cursor.execute(query, params)
self.conn.commit()
except Exception as e:
self._logger.log_activities("System", "update_statement", f"SQL Error: {e}", "Yes")
finally:
self.close_connection()
def delete_statement(self, query, params=()):
"""
Execute a DELETE query.
"""
cursor = None
try:
cursor = self.connect_database()
cursor.execute(query, params)
self.conn.commit()
except Exception as e:
self._logger.log_activities("System", "delete_statement", f"SQL Error: {e}", "Yes")
finally:
self.close_connection()
Editor is loading...
Leave a Comment