Untitled

 avatar
unknown
plain_text
5 months ago
8.3 kB
3
Indexable
import sqlite3
import sys

import bcrypt
# import bcrypt
import os
from utility.encryption import Encryption


class Database:
    database_name = os.path.join("db", "um.db")
    _instance = None

    ENCRYPTED_COLUMNS = {
        "members": [0, 1, 5, 6 ,7 ,8 ,9 ,10],
        "logs": [2, 3, 4],
        "users": [1]
    }

    @classmethod
    def get_instance(cls, _logger):
        if cls._instance is None or _logger is not None:
            cls._instance = cls(_logger)
            cls._instance.create_database()
        return cls._instance

    def __init__(self, _logger):
        self.conn = None
        self._logger = _logger
        self._encryption = Encryption()
        self._ensure_db_folder_exists()


    def _ensure_db_folder_exists(self):
        """
        Ensure the db folder exists.
        """
        try:
            if not os.path.exists("db"):
                os.makedirs("db")
        except Exception as e:
            print(f"Failed to create db folder: {e}")

    def connect_database(self):
        """
        Connect to the SQLite database.
        """
        try:
            self.conn = sqlite3.connect(self.database_name)
            return self.conn.cursor()
        except Exception as e:
            print(f"Failed to connect to database: {e}")
            sys.exit(1)

    def close_connection(self):
        """
        Close the database connection.
        """
        if self.conn:
            self.conn.close()

    def create_database(self):
        """
        Create the users table.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT NOT NULL UNIQUE,
                    password_hash TEXT NOT NULL,
                    role TEXT NOT NULL,
                    profile BLOB
                )
            ''')
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS members (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    first_name TEXT NOT NULL,
                    last_name TEXT NOT NULL,
                    age INTEGER NOT NULL,
                    gender TEXT NOT NULL,
                    weight REAL NOT NULL,
                    street_name BLOB NOT NULL,
                    house_number BLOB NOT NULL,
                    zipcode BLOB NOT NULL,
                    city TEXT NOT NULL,
                    email TEXT NOT NULL,
                    phone TEXT NOT NULL,
                    reg_date TEXT NOT NULL,
                    member_id TEXT UNIQUE NOT NULL
                )
            ''')
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    date TEXT,
                    time TEXT,
                    username TEXT,
                    activity TEXT,
                    additional_info TEXT,
                    suspicious BOOLEAN,
                    viewed BOOLEAN
                )
            ''')
            self.conn.commit()
        except Exception as e:
            self._logger.log_activities("System", "create_database", f"SQL Error: {e}", "Yes")
        finally:
            self.close_connection()

    def select_statement(self, query, params=(), table_name=None):
        """
        Execute a SELECT query and return the result.
        """
        result = None
        try:
            cursor = self.connect_database()
            cursor.execute(query, params)
            result = cursor.fetchone()
            if result and table_name in self.ENCRYPTED_COLUMNS:
                encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name]
                result = [Encryption.decrypt_data(result[i]) if i in encrypt_indexes else result[i] for i in
                          range(len(result))]
        except Exception as e:
            self._logger.log_activities("System", "select_statement", f"SQL Error: {e}", "Yes")
        finally:
            self.close_connection()
        return result

    def select_all(self, query, params=(), encrypt_indexes=[]):
        """
        Execute a SELECT query and return all results.
        """
        cursor = None
        results = None
        try:
            cursor = self.connect_database()
            cursor.execute(query, params)
            results = cursor.fetchall()
        except Exception as e:
            self._logger.log_activities("System", "select_all", f"SQL Error: {e}", "Yes")
        finally:
            self.close_connection()
        return results

    def insert_statement(self, query, params=(), table_name=None):
        """
        Execute an INSERT query.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            if table_name.lower() in self.ENCRYPTED_COLUMNS:
                encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name.lower()]
                params = [self._encryption.encrypt_data(param) if i in encrypt_indexes and isinstance(param, str)
                          else
                          param for i, param in enumerate(params)]
            cursor.execute(query, params)
            self.conn.commit()
        except Exception as e:
            self._logger.log_activities("System", "insert_statement", f"SQL Error: {e}", "Yes")
            print(e)
            pass
        finally:
            self.close_connection()

    def authenticate_user(self, username, password):
        """
        Authenticate a user.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
            user = cursor.fetchone()
            if user and bcrypt.checkpw(password.encode('utf-8'), user[2]):
                return user[0], user[1], user[2], user[3]
            return None
        except Exception as e:
            print(e)
        finally:
            self.close_connection()

    def get_profile(self, profile):
        """
        Get the profile of a user.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            cursor.execute("SELECT * FROM users WHERE id = ?", (profile,))
            user = cursor.fetchone()
            return user
        except Exception as e:
            pass
            return None
        finally:
            self.close_connection()

    def check_if_table_exists(self, table_name):
        """
        Check if a table exists.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
            return cursor.fetchone() is not None
        except Exception as e:
            self._logger.log_activities("System", "check_if_table_exists", f"SQL Error: {e}", "Yes")
            return False
        finally:
            self.close_connection()

    def update_statement(self, query, params=(), table_name=None):
        """
        Execute an UPDATE query.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            if table_name in self.ENCRYPTED_COLUMNS:
                encrypt_indexes = self.ENCRYPTED_COLUMNS[table_name]
                params = [self._encryption.encrypt_data(param) if i in encrypt_indexes and isinstance(param, str)
                          else
                          param for i, param in enumerate(params)]
            cursor.execute(query, params)
            self.conn.commit()
        except Exception as e:
            self._logger.log_activities("System", "update_statement", f"SQL Error: {e}", "Yes")
        finally:
            self.close_connection()

    def delete_statement(self, query, params=()):
        """
        Execute a DELETE query.
        """
        cursor = None
        try:
            cursor = self.connect_database()
            cursor.execute(query, params)
            self.conn.commit()
        except Exception as e:
            self._logger.log_activities("System", "delete_statement", f"SQL Error: {e}", "Yes")
        finally:
            self.close_connection()
Editor is loading...
Leave a Comment