Untitled
unknown
plain_text
3 years ago
12 kB
3
Indexable
from flask import Flask, render_template, request, redirect, url_for, flash, session from flask_sqlalchemy import SQLAlchemy from sqlalchemy import func from datetime import datetime from flask_login import UserMixin, login_user, LoginManager, login_required, logout_user, current_user from sqlalchemy.exc import IntegrityError # initialising the app app = Flask(__name__) app.secret_key = "Secret Key" # SqlAlchemy Database Configuration With SQLite and MYSQL app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users_and_investments.db" #app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@localhost/crud' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # create db instance db = SQLAlchemy(app) # Creating model table for our User database class User(db.Model, UserMixin): User_ID = db.Column(db.Integer, primary_key=True) First_Name = db.Column(db.String(255)) Last_Name = db.Column(db.String(255)) Role = db.Column(db.Enum('Investor', 'Trader')) Active = db.Column(db.Enum('Y', 'N')) Email_ID = db.Column(db.String(255), unique=True) Mobile_Number = db.Column(db.String(255)) Password = db.Column(db.String(255)) Creation_Date = db.Column( db.DateTime, nullable=False, default=datetime.utcnow) Created_By = db.Column(db.String(255)) Modified_Date = db.Column( db.DateTime, server_default=func.now(), onupdate=func.current_timestamp()) Modified_By = db.Column(db.String(255)) def get_id(self): return (self.User_ID) # Creating model table for our User database class Investment(db.Model): Investment_ID = db.Column(db.Integer, primary_key=True) Investment_Type = db.Column(db.String(255)) Investment_Name = db.Column(db.String(255)) Company_Type = db.Column(db.Enum('Large-Cap', 'Mid-Cap', 'Small-Cap')) Quantity = db.Column(db.Integer) Market_Price = db.Column(db.Numeric(19, 4)) Currency = db.Column(db.String(50)) Active = db.Column(db.String(50)) Creation_Date = db.Column( db.DateTime, nullable=False, default=datetime.utcnow) Created_By = db.Column(db.String(255)) Modified_Date = db.Column( db.DateTime, server_default=func.now(), onupdate=func.current_timestamp()) Modified_By = db.Column(db.String(255)) def get_id(self): return (self.Investment_ID) # Error Handling @app.errorhandler(400) def handle_400_error(_error): """Return a http 400 error to client""" flash("Misunderstood") return redirect(url_for('Index')) @app.errorhandler(401) def handle_401_error(_error): """Return a http 401 error to client""" flash("Unauthorised") return redirect(url_for('Index')) @app.errorhandler(404) def handle_404_error(_error): """Return a http 404 error to client""" flash("Not Found") return redirect(url_for('Index')) @app.errorhandler(500) def handle_500_error(_error): """Return a http 500 error to client""" flash("Server Error") return redirect(url_for('Index')) @app.before_request def make_session_permanent(): session.permanent = False # Flask_Login Manager login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' @login_manager.user_loader def load_user(User_ID): return User.query.get_or_404(User_ID) # Login Page @app.route('/login', methods=['GET', 'POST']) def login(): # Checks if the table is empty and lets you create one user to login if User.query.first(): if request.method == 'POST': Email_ID = request.form['Email_ID'] Password = request.form['Password'] Email_check = User.query.filter_by(Email_ID=Email_ID).first() session["Email_check"] = Email_check.Email_ID if Email_check: Password_check = User.query.filter_by( Password=Password).first() if Password_check: login_user(Email_check) session['user'] = Email_check.User_ID flash("Welcome " + Email_check.First_Name + " " + Email_check.Last_Name) return redirect(url_for('Index')) else: flash("Wrong Password") else: flash("Email ID doesn't exist") return render_template("login.html") return render_template("index.html") # Logout @app.route('/logout', methods=['GET', 'POST']) @login_required def logout(): logout_user() flash("You Have Been Logged Out") return redirect(url_for('login')) ##FOR USERS## # This is the index route where we query all the Users @app.route('/') @app.route('/users') @login_required def Index(): all_data = User.query.all() logged_email = session.get("Email_check", None) print(session.get("Email_check", None)) return render_template("index.html", users=all_data, email = logged_email) # Gets a specific user using their user ID @app.route("/id", methods=["GET", "POST"]) @login_required def get_user_id(): if request.method == 'POST': User_ID = request.form['User_ID'] user = User.query.get(User_ID) if user: return render_template("search.html", users=user) flash("User ID not found") return redirect(url_for('Index')) # Gets a specific user using their user type @app.route("/type", methods=['GET', "POST"]) @login_required def get_user_role(): if request.method == 'POST': Role = request.form['Role'] user = User.query.filter_by(Role=Role).all() if user: return render_template("types.html", users=user) flash("User with given Role not found") return redirect(url_for('Index')) # Inseting data to the database using HTML forms @app.route('/insert', methods=['POST']) def insert(): if request.method == 'POST': First_Name = request.form['First_Name'] Last_Name = request.form['Last_Name'] Role = request.form['Role'] Active = request.form['Active'] Email_ID = request.form['Email_ID'] Mobile_Number = request.form['Mobile_Number'] Password = request.form['Password'] Created_By = request.form['Created_By'] new_user = User(First_Name=First_Name, Last_Name=Last_Name, Role=Role, Active=Active, Email_ID=Email_ID, Mobile_Number=Mobile_Number, Password=Password, Created_By=Created_By) try: db.session.add(new_user) db.session.commit() flash("User Inserted Successfully") return redirect(url_for('Index')) # To stop users from entering duplicate email id except IntegrityError: db.session.rollback() flash("Email ID already in use") return redirect(url_for('Index')) # Udpating user information @app.route('/update', methods=['GET', 'POST']) def update(): if request.method == 'POST': my_data = User.query.get(request.form.get('User_ID')) my_data.First_Name = request.form['First_Name'] my_data.Last_Name = request.form['Last_Name'] my_data.Role = request.form['Role'] my_data.Active = request.form['Active'] my_data.Email_ID = request.form['Email_ID'] my_data.Mobile_Number = request.form['Mobile_Number'] my_data.Password = request.form['Password'] db.session.commit() flash("User Updated Successfully") return redirect(url_for('Index')) # Deleting a user @app.route('/delete/<User_ID>/', methods=['GET', 'POST']) def delete(User_ID): my_data = User.query.get(User_ID) if session.get("Email_check", None) == my_data.Email_ID: flash('User cannot be deleted as it is logged in') return redirect(url_for('Index')) else: db.session.delete(my_data) db.session.commit() flash("User Deleted Successfully") return redirect(url_for('Index')) ##FOR Investments## # Invesments home page @app.route('/invesment') @login_required def investment_home(): all_data = Investment.query.all() return render_template("investment_home.html", investments=all_data) # Gets a specific investment using it ID @app.route("/investment_search", methods=["GET", "POST"]) @login_required def get_investment_id(): if request.method == 'POST': Investment_ID = request.form['Investment_ID'] investment = Investment.query.get(Investment_ID) if investment: return render_template("investment_search.html", investments=investment) flash("Investment ID not found") return redirect(url_for('investment_home')) # Gets a specific investment using its type @app.route("/investment_type", methods=['GET', "POST"]) @login_required def get_investment_type(): if request.method == 'POST': Investment_Type = request.form['Investment_Type'] investment = Investment.query.filter_by( Investment_Type=Investment_Type).all() if investment: return render_template("investment_types.html", investments=investment) flash("Investment with given type not found") return redirect(url_for('investment_home')) # Inseting investment data using HTML forms @app.route('/investment_insert', methods=['POST']) def investment_insert(): if request.method == 'POST': Investment_Type = request.form['Investment_Type'] Company_Type = request.form['Company_Type'] Investment_Name = request.form['Investment_Name'] Quantity = request.form['Quantity'] Market_Price = request.form['Market_Price'] Currency = request.form['Currency'] Active = request.form['Active'] Created_By = request.form['Created_By'] new_investment = Investment(Investment_Type=Investment_Type, Company_Type=Company_Type, Investment_Name=Investment_Name, Quantity=Quantity, Market_Price=Market_Price, Currency=Currency, Active=Active, Created_By=Created_By) db.session.add(new_investment) db.session.commit() flash("Investment Inserted Successfully") return redirect(url_for('investment_home')) # Updating investments @app.route('/investment_update', methods=['GET', 'POST']) def investment_update(): if request.method == 'POST': my_data = Investment.query.get(request.form.get('Investment_ID')) my_data.Quantity = request.form['Quantity'] my_data.Market_Price = request.form['Market_Price'] my_data.Active = request.form['Active'] db.session.commit() flash("Investment Updated Successfully") return redirect(url_for('investment_home')) # Deleting investments @app.route('/investment_delete/<Investment_ID>/', methods=['GET', 'POST']) def investment_delete(Investment_ID): my_data = Investment.query.get(Investment_ID) db.session.delete(my_data) db.session.commit() flash("Investment Deleted Successfully") return redirect(url_for('investment_home')) if __name__ == "__main__": app.run(debug=True)
Editor is loading...