Untitled
unknown
plain_text
3 years ago
12 kB
7
Indexable
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
from datetime import datetime
from flask_login import UserMixin, login_user, LoginManager, login_required, logout_user, current_user
from sqlalchemy.exc import IntegrityError
# initialising the app
app = Flask(__name__)
app.secret_key = "Secret Key"
# SqlAlchemy Database Configuration With SQLite and MYSQL
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users_and_investments.db"
#app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:password@localhost/crud'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# create db instance
db = SQLAlchemy(app)
# Creating model table for our User database
class User(db.Model, UserMixin):
User_ID = db.Column(db.Integer, primary_key=True)
First_Name = db.Column(db.String(255))
Last_Name = db.Column(db.String(255))
Role = db.Column(db.Enum('Investor', 'Trader'))
Active = db.Column(db.Enum('Y', 'N'))
Email_ID = db.Column(db.String(255), unique=True)
Mobile_Number = db.Column(db.String(255))
Password = db.Column(db.String(255))
Creation_Date = db.Column(
db.DateTime, nullable=False, default=datetime.utcnow)
Created_By = db.Column(db.String(255))
Modified_Date = db.Column(
db.DateTime, server_default=func.now(), onupdate=func.current_timestamp())
Modified_By = db.Column(db.String(255))
def get_id(self):
return (self.User_ID)
# Creating model table for our User database
class Investment(db.Model):
Investment_ID = db.Column(db.Integer, primary_key=True)
Investment_Type = db.Column(db.String(255))
Investment_Name = db.Column(db.String(255))
Company_Type = db.Column(db.Enum('Large-Cap', 'Mid-Cap', 'Small-Cap'))
Quantity = db.Column(db.Integer)
Market_Price = db.Column(db.Numeric(19, 4))
Currency = db.Column(db.String(50))
Active = db.Column(db.String(50))
Creation_Date = db.Column(
db.DateTime, nullable=False, default=datetime.utcnow)
Created_By = db.Column(db.String(255))
Modified_Date = db.Column(
db.DateTime, server_default=func.now(), onupdate=func.current_timestamp())
Modified_By = db.Column(db.String(255))
def get_id(self):
return (self.Investment_ID)
# Error Handling
@app.errorhandler(400)
def handle_400_error(_error):
"""Return a http 400 error to client"""
flash("Misunderstood")
return redirect(url_for('Index'))
@app.errorhandler(401)
def handle_401_error(_error):
"""Return a http 401 error to client"""
flash("Unauthorised")
return redirect(url_for('Index'))
@app.errorhandler(404)
def handle_404_error(_error):
"""Return a http 404 error to client"""
flash("Not Found")
return redirect(url_for('Index'))
@app.errorhandler(500)
def handle_500_error(_error):
"""Return a http 500 error to client"""
flash("Server Error")
return redirect(url_for('Index'))
@app.before_request
def make_session_permanent():
session.permanent = False
# Flask_Login Manager
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
@login_manager.user_loader
def load_user(User_ID):
return User.query.get_or_404(User_ID)
# Login Page
@app.route('/login', methods=['GET', 'POST'])
def login():
# Checks if the table is empty and lets you create one user to login
if User.query.first():
if request.method == 'POST':
Email_ID = request.form['Email_ID']
Password = request.form['Password']
Email_check = User.query.filter_by(Email_ID=Email_ID).first()
session["Email_check"] = Email_check.Email_ID
if Email_check:
Password_check = User.query.filter_by(
Password=Password).first()
if Password_check:
login_user(Email_check)
session['user'] = Email_check.User_ID
flash("Welcome " + Email_check.First_Name +
" " + Email_check.Last_Name)
return redirect(url_for('Index'))
else:
flash("Wrong Password")
else:
flash("Email ID doesn't exist")
return render_template("login.html")
return render_template("index.html")
# Logout
@app.route('/logout', methods=['GET', 'POST'])
@login_required
def logout():
logout_user()
flash("You Have Been Logged Out")
return redirect(url_for('login'))
##FOR USERS##
# This is the index route where we query all the Users
@app.route('/')
@app.route('/users')
@login_required
def Index():
all_data = User.query.all()
logged_email = session.get("Email_check", None)
print(session.get("Email_check", None))
return render_template("index.html", users=all_data, email = logged_email)
# Gets a specific user using their user ID
@app.route("/id", methods=["GET", "POST"])
@login_required
def get_user_id():
if request.method == 'POST':
User_ID = request.form['User_ID']
user = User.query.get(User_ID)
if user:
return render_template("search.html", users=user)
flash("User ID not found")
return redirect(url_for('Index'))
# Gets a specific user using their user type
@app.route("/type", methods=['GET', "POST"])
@login_required
def get_user_role():
if request.method == 'POST':
Role = request.form['Role']
user = User.query.filter_by(Role=Role).all()
if user:
return render_template("types.html", users=user)
flash("User with given Role not found")
return redirect(url_for('Index'))
# Inseting data to the database using HTML forms
@app.route('/insert', methods=['POST'])
def insert():
if request.method == 'POST':
First_Name = request.form['First_Name']
Last_Name = request.form['Last_Name']
Role = request.form['Role']
Active = request.form['Active']
Email_ID = request.form['Email_ID']
Mobile_Number = request.form['Mobile_Number']
Password = request.form['Password']
Created_By = request.form['Created_By']
new_user = User(First_Name=First_Name, Last_Name=Last_Name, Role=Role, Active=Active,
Email_ID=Email_ID, Mobile_Number=Mobile_Number, Password=Password, Created_By=Created_By)
try:
db.session.add(new_user)
db.session.commit()
flash("User Inserted Successfully")
return redirect(url_for('Index'))
# To stop users from entering duplicate email id
except IntegrityError:
db.session.rollback()
flash("Email ID already in use")
return redirect(url_for('Index'))
# Udpating user information
@app.route('/update', methods=['GET', 'POST'])
def update():
if request.method == 'POST':
my_data = User.query.get(request.form.get('User_ID'))
my_data.First_Name = request.form['First_Name']
my_data.Last_Name = request.form['Last_Name']
my_data.Role = request.form['Role']
my_data.Active = request.form['Active']
my_data.Email_ID = request.form['Email_ID']
my_data.Mobile_Number = request.form['Mobile_Number']
my_data.Password = request.form['Password']
db.session.commit()
flash("User Updated Successfully")
return redirect(url_for('Index'))
# Deleting a user
@app.route('/delete/<User_ID>/', methods=['GET', 'POST'])
def delete(User_ID):
my_data = User.query.get(User_ID)
if session.get("Email_check", None) == my_data.Email_ID:
flash('User cannot be deleted as it is logged in')
return redirect(url_for('Index'))
else:
db.session.delete(my_data)
db.session.commit()
flash("User Deleted Successfully")
return redirect(url_for('Index'))
##FOR Investments##
# Invesments home page
@app.route('/invesment')
@login_required
def investment_home():
all_data = Investment.query.all()
return render_template("investment_home.html", investments=all_data)
# Gets a specific investment using it ID
@app.route("/investment_search", methods=["GET", "POST"])
@login_required
def get_investment_id():
if request.method == 'POST':
Investment_ID = request.form['Investment_ID']
investment = Investment.query.get(Investment_ID)
if investment:
return render_template("investment_search.html", investments=investment)
flash("Investment ID not found")
return redirect(url_for('investment_home'))
# Gets a specific investment using its type
@app.route("/investment_type", methods=['GET', "POST"])
@login_required
def get_investment_type():
if request.method == 'POST':
Investment_Type = request.form['Investment_Type']
investment = Investment.query.filter_by(
Investment_Type=Investment_Type).all()
if investment:
return render_template("investment_types.html", investments=investment)
flash("Investment with given type not found")
return redirect(url_for('investment_home'))
# Inseting investment data using HTML forms
@app.route('/investment_insert', methods=['POST'])
def investment_insert():
if request.method == 'POST':
Investment_Type = request.form['Investment_Type']
Company_Type = request.form['Company_Type']
Investment_Name = request.form['Investment_Name']
Quantity = request.form['Quantity']
Market_Price = request.form['Market_Price']
Currency = request.form['Currency']
Active = request.form['Active']
Created_By = request.form['Created_By']
new_investment = Investment(Investment_Type=Investment_Type, Company_Type=Company_Type, Investment_Name=Investment_Name,
Quantity=Quantity, Market_Price=Market_Price, Currency=Currency, Active=Active, Created_By=Created_By)
db.session.add(new_investment)
db.session.commit()
flash("Investment Inserted Successfully")
return redirect(url_for('investment_home'))
# Updating investments
@app.route('/investment_update', methods=['GET', 'POST'])
def investment_update():
if request.method == 'POST':
my_data = Investment.query.get(request.form.get('Investment_ID'))
my_data.Quantity = request.form['Quantity']
my_data.Market_Price = request.form['Market_Price']
my_data.Active = request.form['Active']
db.session.commit()
flash("Investment Updated Successfully")
return redirect(url_for('investment_home'))
# Deleting investments
@app.route('/investment_delete/<Investment_ID>/', methods=['GET', 'POST'])
def investment_delete(Investment_ID):
my_data = Investment.query.get(Investment_ID)
db.session.delete(my_data)
db.session.commit()
flash("Investment Deleted Successfully")
return redirect(url_for('investment_home'))
if __name__ == "__main__":
app.run(debug=True)
Editor is loading...