Untitled
unknown
plain_text
a year ago
16 kB
11
Indexable
import os
from src.dependables import logging_config
logger = logging_config.logger(__name__)
from authlib.integrations.starlette_client import OAuth, OAuthError
from fastapi.responses import RedirectResponse
from alembic import command
from alembic.config import Config
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select
from src.dependables.token_cache import remove_token_from_cache, clear_token_cache
from src.database import dbutils
from src.database.models.Metadata import MetadataModel
from src.dependables import verify_admin_token
from src.dependables import db
from urllib import parse
from sqlalchemy import create_engine, text
from . import api, app, auth
import traceback
# import logging
# from httpx import AsyncClient
# import httpx
# logging.basicConfig(level=logging.DEBUG)
# httpx_logger = logging.getLogger("httpx")
# httpx_logger.setLevel(logging.DEBUG)
router = APIRouter()
authorize_url=os.environ.get('AUTHORIZATION_ENDPOINT')
access_token_url=os.environ.get('ACCESS_TOKEN_ENDPOINT')
jwks_uri=os.environ.get('JWKS_ID_TOKEN_ENDPOINT')
scope=os.environ.get('OIDC_SCOPE')
client_id=os.environ.get('OIDC_CLIENT_ID')
client_secret=os.environ.get('OIDC_CLIENT_SECRET')
redirect_uri=os.environ.get("AUTHCALLBACK_REDIRECT_URI")
#### Login Flow Start ####
class UnauthenticatedError(HTTPException):
def __init__(self) -> None:
super().__init__(status_code=401, detail="You are not authenticated.")
oauth = OAuth()
# async def create_custom_client():
# async with httpx.AsyncClient() as client:
# return client
# custom_client = create_custom_client()
oauth.register( # this allows us to call oauth.discord later on
'pingfed',
authorize_url=authorize_url,
access_token_url=access_token_url,
jwks_uri=jwks_uri,
scope=scope,
client_id=client_id,
client_secret=client_secret,
# client_kwargs={'client': custom_client},
)
# define the endpoints for the OAuth2 flow
@router.get('/login')
async def get_authorization_code(request: Request):
# Clear Session Before Login start
request.session.clear()
"""OAuth2 flow, step 1: have the user log into Discord to obtain an authorization code grant
"""
return await oauth.pingfed.authorize_redirect(request, redirect_uri)
# Change the path to /authCallback later on and give the PCR to ping fed accordingly
# @router.get('/AuthPlayground/authorization_code/callback', name='authCallback')
@router.get('/authcallback', name='authCallback')
async def authCallback(request: Request):
"""OAuth2 flow, step 2: exchange the authorization code for access token
"""
print(f"authorize_url: {authorize_url}")
print(f"access_token_url: {access_token_url}")
print(f"jwks_uri: {jwks_uri}")
print(f"scope: {scope}")
print(f"client_id: {client_id}")
print(f"client_secret: {client_secret}")
# exchange auth code for token
try:
# async with httpx.AsyncClient() as client:
# request_data = {
# "code": request.query_params.get("code"),
# "grant_type": "authorization_code",
# "redirect_uri": str(request.url_for("authCallback")),
# "client_id": client_id,
# "client_secret": client_secret,
# }
# print(f"Request Data: {request_data}")
# print(f"Request URL: {access_token_url}")
# response = await client.post(access_token_url, data=request_data)
token = await oauth.pingfed.authorize_access_token(request)
access_token = token.get("access_token")
id_token = token.get("id_token")
refresh_token = token.get("refresh_token")
# print(f"Response Status Code: {response.status_code}")
# print(f"Response Data: {response.text}")
# token = oauth.pingfed.parse_access_token_response(response)
except OAuthError as e:
print(f"OAuthError: {e}")
traceback.print_exc()
raise UnauthenticatedError() from e
except Exception as e:
print(f"Exception: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail="Internal Server Error") from e
# Bellow is required if you want to store the token in the session for later use in
# other requests (e.g. to make authenticated requests to other APIs outside application)
# or to refresh the token when it expires from application itself.
# request.session["access_token"] = token.get("access_token")
# request.session["refresh_token"] = token.get("refresh_token")
# Following is cookie approach is NOT working when frontend is in different domain
# headers = {
# "Authorization": f"Bearer {token.get('access_token')}",
# "ID-Token": f"Bearer {token.get('id_token')}",
# "Refresh-Token": f"Bearer {token.get('refresh_token')}",
# "Location": f"{os.environ['FRONTEND_URL']}"
# }
redirect_url = (
f"{os.environ['FRONTEND_URL']}?"
f"access_token={access_token}&"
f"id_token={id_token}&"
f"refresh_token={refresh_token}"
)
return RedirectResponse(url=redirect_url, status_code=307)
# # response.set_cookie(key='access_token', value=token.get('access_token'), secure=True)
# response.set_cookie(key='access_token', value=token.get('access_token'), samesite="None", secure=True)
# # response.set_cookie(key='id_token', value=token.get('id_token'), secure=True)
# response.set_cookie(key='id_token', value=token.get('id_token'), samesite="None", secure=True)
# # response.set_cookie(key='refresh_token', value=token.get('refresh_token'), secure=True)
# return response
## IF you want to set as cookies to frontend URL Which is in different domain
# print("Token is: ", token.get("access_token"))
# redirect_response = RedirectResponse(url=os.environ['FRONTEND_URL'])
# redirect_response.set_cookie(
# key="access_token",
# value=token.get("access_token"),
# httponly=True,
# secure=True,
# samesite="None",
# )
# redirect_response.set_cookie(
# key="id_token",
# value=token.get("id_token"),
# httponly=True,
# secure=True,
# samesite="None",
# )
# return redirect_response
#### Login Flow End ####
# @router.get("/delete", dependencies=[Depends(authDependable)])
# def delete(request: Request):
# if request.state.auth.role != "admin":
# raise HTTPException(status_code=403)
# os.remove(os.environ.get("DATABASE_URL"))
@router.get("/logout")
async def logout(request: Request):
# response = RedirectResponse(url=os.environ['FRONTEND_URL'])
# response.delete_cookie("access_token")
# response.delete_cookie("id_token")
# response.delete_cookie("refresh_token")
# return response
authorization: str = request.headers.get("Authorization")
if not authorization:
return {"message": "You are not logged in"}
if not authorization.startswith("Bearer "):
return {"message": "Invalid Authorization Scheme"}
access_token = authorization[len("Bearer "):] # Extracting the token stripping the Bearer
remove_token_from_cache(access_token)
id_token: str = request.headers.get("ID-Token")
if id_token and id_token.startswith("Bearer "):
id_token = id_token[len("Bearer "):]
remove_token_from_cache(id_token)
return {"message": "You are sucessfully logged out"}
@router.get("/upgrade", dependencies=[Depends(db), Depends(verify_admin_token)])
def upgrade(request: Request):
# if not request.state.is_admin:
# raise HTTPException(status_code=403)
# if not os.path.exists(os.environ.get("DATABASE_BACKUP_FOLDER_URL")):
# os.makedirs(os.environ.get("DATABASE_BACKUP_FOLDER_URL"))
# shutil.copy2(
# os.environ.get("DATABASE_URL"),
# f"{os.environ.get('DATABASE_BACKUP_FOLDER_URL')}{os.environ.get('MODE')}-{datetime.datetime.now().strftime('%d-%m-%Y-%H:%M:%S')}.db",
# )
SQL_SERVER = os.environ.get('SQLSERVER')
SQL_DATABASE = os.environ.get('SQLDATABASE')
alembic_cfg = Config()
alembic_cfg.set_main_option("script_location", "src/database/migrations")
if os.environ.get("ENV") != "local":
connecting_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server=tcp:{SQL_SERVER},1433;Database={SQL_DATABASE};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;Authentication=ActiveDirectoryMSI"
params = parse.quote_plus(connecting_string).replace("%", "%%")
alembic_cfg.set_main_option(
"sqlalchemy.url",
"mssql+pyodbc:///?odbc_connect=%s" % params,
)
else:
alembic_cfg.set_main_option(
"sqlalchemy.url",
f"mssql+pyodbc://{SQL_SERVER}:{SQL_DATABASE}@MSSQL?TrustServerCertificate=yes",
)
try:
dbutils.lock_db(request.state.db)
command.upgrade(alembic_cfg, "head")
# if there is no metadata entry, create one to handle the lock db and updated data datetimes
metadata = request.state.db.scalar(select(MetadataModel))
if metadata is None:
new_metadata = MetadataModel(is_db_locked=False)
request.state.db.add(new_metadata)
request.state.db.commit()
except Exception as e:
raise e
finally:
dbutils.unlock_db(request.state.db)
# @router.get("/upgrade/{version}", dependencies=[Depends(authDependable)])
# def upgrade_version(request: Request, version: str):
# if not request.state.is_admin:
# raise HTTPException(status_code=403)
# if not os.path.exists(os.environ.get("DATABASE_BACKUP_FOLDER_URL")):
# os.makedirs(os.environ.get("DATABASE_BACKUP_FOLDER_URL"))
# shutil.copy2(
# os.environ.get("DATABASE_URL"),
# f"{os.environ.get('DATABASE_BACKUP_FOLDER_URL')}{os.environ.get('MODE')}-{datetime.datetime.now().strftime('%d-%m-%Y-%H:%M:%S')}.db",
# )
# alembic_cfg = Config()
# alembic_cfg.set_main_option("script_location", "src/database/migrations")
# alembic_cfg.set_main_option(
# "sqlalchemy.url", f"sqlite:///{os.environ.get('DATABASE_URL')}"
# )
# command.upgrade(alembic_cfg, version)
# @router.get("/downgrade/{version}", dependencies=[Depends(authDependable)])
# def downgrade_version(request: Request, version: str):
# if not request.state.is_admin:
# raise HTTPException(status_code=403)
# alembic_cfg = Config()
# alembic_cfg.set_main_option("script_location", "src/database/migrations")
# alembic_cfg.set_main_option(
# "sqlalchemy.url", f"sqlite:///{os.environ.get('DATABASE_URL')}"
# )
# command.downgrade(alembic_cfg, version)
# @router.get("/current", dependencies=[Depends(authDependable)])
# def current_version(request: Request):
# if not request.state.is_admin:
# raise HTTPException(status_code=403)
# alembic_cfg = Config()
# alembic_cfg.set_main_option("script_location", "src/database/migrations")
# alembic_cfg.set_main_option(
# "sqlalchemy.url", f"sqlite:///{os.environ.get('DATABASE_URL')}"
# )
# return command.current(alembic_cfg)
# @router.get("/list-backups", dependencies=[Depends(authDependable)])
# def list_backups(request: Request):
# if not request.state.is_admin:
# raise HTTPException(status_code=403)
# backups = os.listdir(os.environ.get("DATABASE_BACKUP_FOLDER_URL"))
# return [backup for backup in backups]
from sqlalchemy.orm import sessionmaker
@router.get("/vars", dependencies=[Depends(db), Depends(verify_admin_token)])
def vars(request: Request):
# if not request.state.is_admin:
# raise HTTPException(status_code=403)
if os.environ.get("ENV") != 'local':
connecting_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server=tcp:{os.environ.get('SQLSERVER')},1433;Database={os.environ.get('SQLDATABASE')};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;Authentication=ActiveDirectoryMSI"
params = parse.quote_plus(connecting_string)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params).execution_options(schema_translate_map={None: os.environ.get('DATABASE_SCHEMA')})
else:
engine = create_engine( f"mssql+pyodbc://{os.environ.get('SQLSERVER')}:{os.environ.get('SQLDATABASE')}@MSSQL?TrustServerCertificate=yes").execution_options(schema_translate_map={None: os.environ.get('DATABASE_SCHEMA')})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
if not engine: print("--------- Connection wasn't created")
with engine.connect() as con:
print("---------- Connection has been created\n---------Table Query")
result = con.execute(text(f"SELECT STRING_AGG(TABLE_NAME, ', ') AS tables_in_schema FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'app_schema'"))
row = result.fetchone()
print(f"-----------Tables in app_schema: \n{row}-------")
result = con.execute(text(f"SELECT STRING_AGG(COLUMN_NAME, ', ') AS users_columns FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'app_schema' AND TABLE_NAME = 'users'"))
row = result.fetchone()
print(f"-----------Columns in Users Table in app_schema: \n{row}-------")
result = con.execute(text(f"SELECT count(*) AS users_count FROM app_schema.users"))
row = result.fetchone()
print(f"-----------Row count in Users Table in app_schema: \n{row}-------")
result = con.execute(text(f"SELECT MAX(user_id) FROM app_schema.users"))
row = result.fetchone()
print(f"-----------A User ID from Users: \n{row}-------")
result = con.execute(text(f"SELECT count(*) AS metrics_count FROM app_schema.metrics"))
row = result.fetchone()
print(f"-----------Row count in Metrics Table in app_schema: \n{row}-------")
result = con.execute(text(f"SELECT MAX(eb_sales_unit) FROM app_schema.metrics"))
row = result.fetchone()
print(f"-----------Max eb_sales_unit from Metrics: \n{row}-------")
result = con.execute(text(f"SELECT count(*) AS users2_count FROM app_schema.users2"))
row = result.fetchone()
print(f"-----------Row count in Users2 Table in app_schema: \n{row}-------")
result = con.execute(text(f"SELECT MAX(user_id) FROM app_schema.users2"))
row = result.fetchone()
print(f"-----------A User ID from Users2: \n{row}-------")
# All the users with admin role
result = con.execute(text(f"SELECT STRING_AGG(first_name, ',') as Admins FROM app_schema.users WHERE role = 'admin'"))
row = result.fetchone()
print(f"-----------Admins from User Table: \n{row}-------")
# All the users with powerUser role
result = con.execute(text(f"SELECT STRING_AGG(first_name, ',') as PowerUsers FROM app_schema.users WHERE role = 'powerUser'"))
row = result.fetchone()
print(f"-----------powerUsers from User Table: \n{row}-------")
# envvars = {
# "Seeder_Schema": os.environ.get("Seeder_schema"),
# #"Database_Schema": os.environ.get("DATABASE_SCHEMA"),
# }
# return envvars
@router.get("/cache", dependencies=[Depends(db), Depends(verify_admin_token)])
def clear_cache(request: Request):
clear_token_cache()
logger.info("Cache Cleared")
router.include_router(api.router)
router.include_router(app.router)
router.include_router(auth.router)
#router.include_router(cron.router)
Editor is loading...
Leave a Comment