NEW LAMBDA_EDGE VALIDATE TOKEN
unknown
plain_text
2 years ago
7.5 kB
14
Indexable
# NEW LAMBDA EDGE VALIDATE TOKEN VERSION
from urllib.parse import urlencode, quote
import http.cookies as cookies
from jose import jwt
from jose.exceptions import *
import http.client
import hashlib
import base64
import boto3
import json
import hmac
import os
def retrieve_credentials() :
s3_client = boto3.client("s3")
response = s3_client.get_object(Bucket = "s3bucketchecksslvupham", Key = "data_serve_lambda_edge.json")
data = response['Body'].read().decode("utf-8")
meta_data = json.loads(data)
os.environ['CLIENT_ID'] = meta_data['cognito_user_pool_client_id']['value']
os.environ['CLIENT_SECRET'] = meta_data['cognito_user_pool_client_secret']['value']
os.environ['USER_POOL_ID'] = meta_data['cognito_user_pool_id']['value']
os.environ['USER_POOL_DOMAIN_URL'] = "stylcoggoogle" + meta_data['cognito_user_pool_domain_url']['value']
os.environ['COGNITO_IDP'] = "cognito-idp.us-east-1.amazonaws.com"
os.environ['CLOUDFRONT_DOMAIN'] = meta_data['cloudfront_domain_name']['value']
def encode_base64(A, B) :
credentials = f"{A}:{B}"
credentials_bytes = credentials.encode('utf-8')
return base64.b64encode(credentials_bytes).decode('utf-8')
def decode_base64(encode_token) :
encode_token = encode_token.encode('utf-8')
return base64.b64decode(encode_token).decode('utf-8')
def generate_secret_hash(A, B) :
CLIENT_SECRET = os.get('CLIENT_SECRET')
secret_hash = hmac.new(CLIENT_SECRET.encode('utf-8'), (A + B).encode('utf-8'), hashlib.sha256).digest()
return base64.b64encode(secret_hash).decode()
def exchange_code_for_token(request) :
CLIENT_ID = os.get('CLIENT_ID')
CLIENT_SECRET = os.get('CLIENT_SECRET')
CLOUDFRONT_DOMAIN = os.get('CLOUDFRONT_DOMAIN')
USER_POOL_DOMAIN_URL = os.get('USER_POOL_DOMAIN_URL')
credentials_base64 = encode_base64(CLIENT_ID, CLIENT_SECRET)
headers = {
'content-type': 'application/x-www-form-urlencoded',
'authorization': 'Basic ' + credentials_base64
}
code = request['querystring'].split("=")[1]
data = {
'grant_type': 'authorization_code',
'redirect_uri': f'https://{CLOUDFRONT_DOMAIN}/parseauth',
'code': code
}
conn = http.client.HTTPSConnection(USER_POOL_DOMAIN_URL)
conn.request("POST", "/oauth2/token", urlencode(data), headers)
response = conn.getresponse()
response_data = eval(response.read().decode('utf-8'))
# print("RESPONSE_DATA OF TOKENS : ", response_data)
token_base64 = encode_base64(response_data['access_token'], response_data['refresh_token'])
my_cookie = cookies.SimpleCookie()
my_cookie['token'] = token_base64
my_cookie['token']['max-age'] = response_data['expires_in']
my_cookie['token']['path'] = '/'
my_cookie['token']['secure'] = True
cookie_value = my_cookie.output(header = '')
print("MY COOKIE VALUE RETRIEVE FROM /oauth2/token ENDPOINT : ", cookie_value)
final_response = {
'status': '302',
'statusDescription': 'Found',
'headers': {
'location': [{
'key': 'Location',
'value': '/index.html'
}],
'set-cookie' : [{
'key': 'Set-Cookie',
'value' : cookie_value
}],
'cache-control': [{
'key': 'Cache-Control',
'value': 'no-cache'
}]
}
}
print("SET COOKIES SUCCESS, RETURN FINAL_RESPONSE ", final_response)
return final_response
def get_key(access_token) :
USER_POOL_ID = os.get('USER_POOL_ID')
COGNITO_IDP = os.get('COGNITO_IDP')
jwks_url = f"/{USER_POOL_ID}/.well-known/jwks.json"
conn = http.client.HTTPSConnection(COGNITO_IDP)
conn.request("GET", jwks_url)
response = conn.getresponse().read().decode('utf-8')
jwks_data = eval(response)
header = jwt.get_unverified_header(access_token)
header_kid = header['kid']
for item in jwks_data['keys']:
if (item['kid'] == header_kid) : return item
def refresh_access_token(refresh_token) :
CLIENT_ID = os.get('CLIENT_ID')
cognito_client = boto3.client("cognito-idp")
response = cognito_client.initiate_auth(
AuthFlow = 'REFRESH_TOKEN_AUTH',
AuthParameters = {
'REFRESH_TOKEN' : refresh_token,
'SECRET_HASH' : generate_secret_hash("Google_117727834150775985850", CLIENT_ID)
},
ClientId = CLIENT_ID
)
return response['AuthenticationResult']['AccessToken']
# ------------------------------- VALIDATION METHOD ------------------------------- #
def verify_jwt_token(access_token, refresh_token) :
CLIENT_ID = os.get('CLIENT_ID')
COGNITO_IDP = os.get('COGNITO_IDP')
USER_POOL_ID = os.get('USER_POOL_ID')
while (True) :
try :
key = get_key(access_token)
response = jwt.decode(
access_token, key,
options = {
"verify_iss": True,
"iss" : f"https://{COGNITO_IDP}/{USER_POOL_ID}",
"verify_signature": True,
"verify_exp": True
}
)
if (response['client_id'] == CLIENT_ID) : return True
return False
except ExpiredSignatureError as e:
print("Access token - ", e)
access_token = refresh_access_token(refresh_token)
except :
print("No cookie found")
return False
def lambda_handler(event, context) :
CLIENT_ID = os.get('CLIENT_ID')
CLOUDFRONT_DOMAIN = os.get('CLOUDFRONT_DOMAIN')
print("EVENT FROM CLOUDFRONT : ", event)
request = event['Records'][0]['cf']['request']
if (request['uri'].startswith('/parseauth')) :
return exchange_code_for_token(request)
cookie = None
access_token = None
refresh_token = None
if 'cookie' in request['headers'] :
cookie = request['headers']['cookie'][0]['value'].split("=")[1]
decode_cookie = decode_base64(cookie).split(":")
access_token = decode_cookie[0]
refresh_token = decode_cookie[1]
print("COOKIE VALUE : ", cookie)
if (verify_jwt_token(access_token, refresh_token)) :
print("SUCCESSFUL VALIDATION !!!")
return request
else :
print("FAIL VALIDATION, UNAUTHORIZE ACCESS !!!! REDIRECT TO SIGN-IN PAGE.")
redirect_uri = quote(f"https://{CLOUDFRONT_DOMAIN}/parseauth", safe = "")
response = {
'status': '302',
'statusDescription': 'Found',
'headers': {
'location': [{
'key': 'Location',
'value': f'https://stylcoggoogle.auth.us-east-1.amazoncognito.com/oauth2/authorize?client_id={CLIENT_ID}&response_type=code&scope=aws.cognito.signin.user.admin+email+openid+profile&redirect_uri={redirect_uri}'
}]
}
}
print("REDIRECT RESPONSE TO COGNITO SIGN-IN : ", response)
return responseEditor is loading...