NEW LAMBDA_EDGE VALIDATE TOKEN
unknown
plain_text
a year ago
7.5 kB
2
Indexable
Never
# NEW LAMBDA EDGE VALIDATE TOKEN VERSION from urllib.parse import urlencode, quote import http.cookies as cookies from jose import jwt from jose.exceptions import * import http.client import hashlib import base64 import boto3 import json import hmac import os def retrieve_credentials() : s3_client = boto3.client("s3") response = s3_client.get_object(Bucket = "s3bucketchecksslvupham", Key = "data_serve_lambda_edge.json") data = response['Body'].read().decode("utf-8") meta_data = json.loads(data) os.environ['CLIENT_ID'] = meta_data['cognito_user_pool_client_id']['value'] os.environ['CLIENT_SECRET'] = meta_data['cognito_user_pool_client_secret']['value'] os.environ['USER_POOL_ID'] = meta_data['cognito_user_pool_id']['value'] os.environ['USER_POOL_DOMAIN_URL'] = "stylcoggoogle" + meta_data['cognito_user_pool_domain_url']['value'] os.environ['COGNITO_IDP'] = "cognito-idp.us-east-1.amazonaws.com" os.environ['CLOUDFRONT_DOMAIN'] = meta_data['cloudfront_domain_name']['value'] def encode_base64(A, B) : credentials = f"{A}:{B}" credentials_bytes = credentials.encode('utf-8') return base64.b64encode(credentials_bytes).decode('utf-8') def decode_base64(encode_token) : encode_token = encode_token.encode('utf-8') return base64.b64decode(encode_token).decode('utf-8') def generate_secret_hash(A, B) : CLIENT_SECRET = os.get('CLIENT_SECRET') secret_hash = hmac.new(CLIENT_SECRET.encode('utf-8'), (A + B).encode('utf-8'), hashlib.sha256).digest() return base64.b64encode(secret_hash).decode() def exchange_code_for_token(request) : CLIENT_ID = os.get('CLIENT_ID') CLIENT_SECRET = os.get('CLIENT_SECRET') CLOUDFRONT_DOMAIN = os.get('CLOUDFRONT_DOMAIN') USER_POOL_DOMAIN_URL = os.get('USER_POOL_DOMAIN_URL') credentials_base64 = encode_base64(CLIENT_ID, CLIENT_SECRET) headers = { 'content-type': 'application/x-www-form-urlencoded', 'authorization': 'Basic ' + credentials_base64 } code = request['querystring'].split("=")[1] data = { 'grant_type': 'authorization_code', 'redirect_uri': f'https://{CLOUDFRONT_DOMAIN}/parseauth', 'code': code } conn = http.client.HTTPSConnection(USER_POOL_DOMAIN_URL) conn.request("POST", "/oauth2/token", urlencode(data), headers) response = conn.getresponse() response_data = eval(response.read().decode('utf-8')) # print("RESPONSE_DATA OF TOKENS : ", response_data) token_base64 = encode_base64(response_data['access_token'], response_data['refresh_token']) my_cookie = cookies.SimpleCookie() my_cookie['token'] = token_base64 my_cookie['token']['max-age'] = response_data['expires_in'] my_cookie['token']['path'] = '/' my_cookie['token']['secure'] = True cookie_value = my_cookie.output(header = '') print("MY COOKIE VALUE RETRIEVE FROM /oauth2/token ENDPOINT : ", cookie_value) final_response = { 'status': '302', 'statusDescription': 'Found', 'headers': { 'location': [{ 'key': 'Location', 'value': '/index.html' }], 'set-cookie' : [{ 'key': 'Set-Cookie', 'value' : cookie_value }], 'cache-control': [{ 'key': 'Cache-Control', 'value': 'no-cache' }] } } print("SET COOKIES SUCCESS, RETURN FINAL_RESPONSE ", final_response) return final_response def get_key(access_token) : USER_POOL_ID = os.get('USER_POOL_ID') COGNITO_IDP = os.get('COGNITO_IDP') jwks_url = f"/{USER_POOL_ID}/.well-known/jwks.json" conn = http.client.HTTPSConnection(COGNITO_IDP) conn.request("GET", jwks_url) response = conn.getresponse().read().decode('utf-8') jwks_data = eval(response) header = jwt.get_unverified_header(access_token) header_kid = header['kid'] for item in jwks_data['keys']: if (item['kid'] == header_kid) : return item def refresh_access_token(refresh_token) : CLIENT_ID = os.get('CLIENT_ID') cognito_client = boto3.client("cognito-idp") response = cognito_client.initiate_auth( AuthFlow = 'REFRESH_TOKEN_AUTH', AuthParameters = { 'REFRESH_TOKEN' : refresh_token, 'SECRET_HASH' : generate_secret_hash("Google_117727834150775985850", CLIENT_ID) }, ClientId = CLIENT_ID ) return response['AuthenticationResult']['AccessToken'] # ------------------------------- VALIDATION METHOD ------------------------------- # def verify_jwt_token(access_token, refresh_token) : CLIENT_ID = os.get('CLIENT_ID') COGNITO_IDP = os.get('COGNITO_IDP') USER_POOL_ID = os.get('USER_POOL_ID') while (True) : try : key = get_key(access_token) response = jwt.decode( access_token, key, options = { "verify_iss": True, "iss" : f"https://{COGNITO_IDP}/{USER_POOL_ID}", "verify_signature": True, "verify_exp": True } ) if (response['client_id'] == CLIENT_ID) : return True return False except ExpiredSignatureError as e: print("Access token - ", e) access_token = refresh_access_token(refresh_token) except : print("No cookie found") return False def lambda_handler(event, context) : CLIENT_ID = os.get('CLIENT_ID') CLOUDFRONT_DOMAIN = os.get('CLOUDFRONT_DOMAIN') print("EVENT FROM CLOUDFRONT : ", event) request = event['Records'][0]['cf']['request'] if (request['uri'].startswith('/parseauth')) : return exchange_code_for_token(request) cookie = None access_token = None refresh_token = None if 'cookie' in request['headers'] : cookie = request['headers']['cookie'][0]['value'].split("=")[1] decode_cookie = decode_base64(cookie).split(":") access_token = decode_cookie[0] refresh_token = decode_cookie[1] print("COOKIE VALUE : ", cookie) if (verify_jwt_token(access_token, refresh_token)) : print("SUCCESSFUL VALIDATION !!!") return request else : print("FAIL VALIDATION, UNAUTHORIZE ACCESS !!!! REDIRECT TO SIGN-IN PAGE.") redirect_uri = quote(f"https://{CLOUDFRONT_DOMAIN}/parseauth", safe = "") response = { 'status': '302', 'statusDescription': 'Found', 'headers': { 'location': [{ 'key': 'Location', 'value': f'https://stylcoggoogle.auth.us-east-1.amazoncognito.com/oauth2/authorize?client_id={CLIENT_ID}&response_type=code&scope=aws.cognito.signin.user.admin+email+openid+profile&redirect_uri={redirect_uri}' }] } } print("REDIRECT RESPONSE TO COGNITO SIGN-IN : ", response) return response