Untitled
unknown
plain_text
5 months ago
8.3 kB
3
Indexable
import sqlite3 import sys import bcrypt # import bcrypt import os from utility.encryption import Encryption class Database: database_name = os.path.join("db", "um.db") _instance = None ENCRYPTED_COLUMNS = { "members": [0, 1, 5, 6 ,7 ,8 ,9 ,10], "logs": [2, 3, 4], "users": [1] } @classmethod def get_instance(cls, _logger): if cls._instance is None or _logger is not None: cls._instance = cls(_logger) cls._instance.create_database() return cls._instance def __init__(self, _logger): self.conn = None self._logger = _logger self._encryption = Encryption() self._ensure_db_folder_exists() def _ensure_db_folder_exists(self): """ Ensure the db folder exists. """ try: if not os.path.exists("db"): os.makedirs("db") except Exception as e: print(f"Failed to create db folder: {e}") def connect_database(self): """ Connect to the SQLite database. """ try: self.conn = sqlite3.connect(self.database_name) return self.conn.cursor() except Exception as e: print(f"Failed to connect to database: {e}") sys.exit(1) def close_connection(self): """ Close the database connection. """ if self.conn: self.conn.close() def create_database(self): """ Create the users table. """ cursor = None try: cursor = self.connect_database() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, role TEXT NOT NULL, profile BLOB ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS members ( id INTEGER PRIMARY KEY AUTOINCREMENT, first_name TEXT NOT NULL, last_name TEXT NOT NULL, age INTEGER NOT NULL, gender TEXT NOT NULL, weight REAL NOT NULL, street_name BLOB NOT NULL, house_number BLOB NOT NULL, zipcode BLOB NOT NULL, city TEXT NOT NULL, email TEXT NOT NULL, phone TEXT NOT NULL, reg_date TEXT NOT NULL, member_id TEXT UNIQUE NOT NULL ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT, time TEXT, username TEXT, activity TEXT, additional_info TEXT, suspicious BOOLEAN, viewed BOOLEAN ) ''') self.conn.commit() except Exception as e: self._logger.log_activities("System", "create_database", f"SQL Error: {e}", "Yes") finally: self.close_connection() def select_statement(self, query, params=(), table_name=None): """ Execute a SELECT query and return the result. """ result = None try: cursor = self.connect_database() cursor.execute(query, params) result = cursor.fetchone() if result and table_name in self.ENCRYPTED_COLUMNS: encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name] result = [Encryption.decrypt_data(result[i]) if i in encrypt_indexes else result[i] for i in range(len(result))] except Exception as e: self._logger.log_activities("System", "select_statement", f"SQL Error: {e}", "Yes") finally: self.close_connection() return result def select_all(self, query, params=(), encrypt_indexes=[]): """ Execute a SELECT query and return all results. """ cursor = None results = None try: cursor = self.connect_database() cursor.execute(query, params) results = cursor.fetchall() except Exception as e: self._logger.log_activities("System", "select_all", f"SQL Error: {e}", "Yes") finally: self.close_connection() return results def insert_statement(self, query, params=(), table_name=None): """ Execute an INSERT query. """ cursor = None try: cursor = self.connect_database() if table_name.lower() in self.ENCRYPTED_COLUMNS: encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name.lower()] params = [self._encryption.encrypt_data(param) if i in encrypt_indexes and isinstance(param, str) else param for i, param in enumerate(params)] cursor.execute(query, params) self.conn.commit() except Exception as e: self._logger.log_activities("System", "insert_statement", f"SQL Error: {e}", "Yes") print(e) pass finally: self.close_connection() def authenticate_user(self, username, password): """ Authenticate a user. """ cursor = None try: cursor = self.connect_database() cursor.execute("SELECT * FROM users WHERE username = ?", (username,)) user = cursor.fetchone() if user and bcrypt.checkpw(password.encode('utf-8'), user[2]): return user[0], user[1], user[2], user[3] return None except Exception as e: print(e) finally: self.close_connection() def get_profile(self, profile): """ Get the profile of a user. """ cursor = None try: cursor = self.connect_database() cursor.execute("SELECT * FROM users WHERE id = ?", (profile,)) user = cursor.fetchone() return user except Exception as e: pass return None finally: self.close_connection() def check_if_table_exists(self, table_name): """ Check if a table exists. """ cursor = None try: cursor = self.connect_database() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) return cursor.fetchone() is not None except Exception as e: self._logger.log_activities("System", "check_if_table_exists", f"SQL Error: {e}", "Yes") return False finally: self.close_connection() def update_statement(self, query, params=(), table_name=None): """ Execute an UPDATE query. """ cursor = None try: cursor = self.connect_database() if table_name in self.ENCRYPTED_COLUMNS: encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name] params = [self._encryption.encrypt_data(param) if i in encrypt_indexes and isinstance(param, str) else param for i, param in enumerate(params)] cursor.execute(query, params) self.conn.commit() except Exception as e: self._logger.log_activities("System", "update_statement", f"SQL Error: {e}", "Yes") finally: self.close_connection() def delete_statement(self, query, params=()): """ Execute a DELETE query. """ cursor = None try: cursor = self.connect_database() cursor.execute(query, params) self.conn.commit() except Exception as e: self._logger.log_activities("System", "delete_statement", f"SQL Error: {e}", "Yes") finally: self.close_connection()
Editor is loading...
Leave a Comment