Lambda_edge_refresh_token

mail@pastecode.io avatar
unknown
plain_text
a year ago
9.8 kB
2
Indexable
Never
from urllib.parse import urlencode, quote
import http.cookies as cookies
from jose import jwt
from jose.exceptions import *
import http.client 
import hashlib 
import base64
import boto3
import json
import hmac 
import os 

def retrieve_credentials() :
    s3_client = boto3.client("s3")
    response = s3_client.get_object(Bucket = "s3bucketchecksslvupham", Key = "data_serve_lambda_edge.json")

    return response['Body'].read().decode("utf-8")

def set_env_variables(data) :
    data = json.loads(data)

    os.environ['CLIENT_ID']              = data['cognito_user_pool_client_id']['value']
    os.environ['CLIENT_SECRET']          = data['cognito_user_pool_client_secret']['value']
    
    os.environ['USER_POOL_ID']           = data['cognito_user_pool_id']['value']
    os.environ['USER_POOL_DOMAIN_URL']   = "stylcoggoogle" + data['cognito_user_pool_domain_url']['value']
    os.environ['COGNITO_IDP']            = "cognito-idp.us-east-1.amazonaws.com"

    os.environ['CLOUDFRONT_DOMAIN']      = data['cloudfront_domain_name']['value']

def read_data() :
    file_path = os.path.join(f"/tmp", "data_serve_lambda_edge.json")
    while (True) :
        try :
            with open(file_path, "r") as file : 
                data = file.read()            
            
            print("READ TEMPORARY CREDETIALS")
            set_env_variables(data)
            break
        
        except :
            print("RETRIEVE CREDENTIALS FROM S3")
            with open(file_path, "w") as file : 
                file.write(retrieve_credentials())  

def encode_base64(A, B) :
    credentials = f"{A}:{B}"
    credentials_bytes = credentials.encode('utf-8')
    return  base64.b64encode(credentials_bytes).decode('utf-8')

def decode_base64(encode_token) :
    encode_token = encode_token.encode('utf-8')
    return base64.b64decode(encode_token).decode('utf-8')

def generate_secret_hash(A, B) :
    CLIENT_SECRET = os.getenv('CLIENT_SECRET')
    
    secret_hash = hmac.new(CLIENT_SECRET.encode('utf-8'), (A + B).encode('utf-8'), hashlib.sha256).digest()
    return base64.b64encode(secret_hash).decode()

def get_username(access_token) :
    response = jwt.decode(access_token, get_key(access_token))
    return response['username']


def exchange_code_for_token(request) : 
    CLIENT_ID               = os.getenv('CLIENT_ID')
    CLIENT_SECRET           = os.getenv('CLIENT_SECRET')
    CLOUDFRONT_DOMAIN       = os.getenv('CLOUDFRONT_DOMAIN')
    USER_POOL_DOMAIN_URL    = os.getenv('USER_POOL_DOMAIN_URL')

    credentials_base64 = encode_base64(CLIENT_ID, CLIENT_SECRET)

    headers = {
        'content-type': 'application/x-www-form-urlencoded',
        'authorization': 'Basic ' + credentials_base64
    }


    code = request['querystring'].split("=")[1]
    data = {
        'grant_type': 'authorization_code',
        'redirect_uri': f'https://{CLOUDFRONT_DOMAIN}/parseauth',
        'code': code
    }

    conn = http.client.HTTPSConnection(USER_POOL_DOMAIN_URL)
    conn.request("POST", "/oauth2/token", urlencode(data), headers)

    response = conn.getresponse()
    response_data = eval(response.read().decode('utf-8'))
    
    print("RESPONSE_DATA OF TOKENS : ", response_data)
    access_token = response_data['access_token']
    refresh_token = response_data['refresh_token']
    username = get_username(access_token)

    my_cookie = cookies.SimpleCookie()
    my_cookie['token'] = f"{encode_base64(access_token, refresh_token)}:{username}"
    my_cookie['token']['max-age'] = 2592000
    my_cookie['token']['path'] = '/'
    my_cookie['token']['secure'] = True

    cookie_value = my_cookie.output(header = '')
    print("MY COOKIE VALUE RETRIEVE FROM /oauth2/token ENDPOINT : ", cookie_value)
    
    final_response = {
        'status': '302',
        'statusDescription': 'Found',
        'headers': {
            'location': [{
                'key': 'Location',
                'value': '/'
            }],

            'set-cookie' : [{
                'key': 'Set-Cookie',
                'value' : cookie_value
            }],

            'cache-control': [{ 
                'key': 'Cache-Control',
                'value': 'no-cache'
            }]
        }
    }

    print("SET COOKIES SUCCESS, RETURN FINAL_RESPONSE ", final_response)
    return final_response


def get_key(access_token) :
    USER_POOL_ID    = os.getenv('USER_POOL_ID')
    COGNITO_IDP     = os.getenv('COGNITO_IDP') 
             
    jwks_url = f"/{USER_POOL_ID}/.well-known/jwks.json"
    
    conn = http.client.HTTPSConnection(COGNITO_IDP)
    conn.request("GET", jwks_url)
    
    response = conn.getresponse().read().decode('utf-8')
    jwks_data = eval(response)

    header = jwt.get_unverified_header(access_token)
    header_kid = header['kid']

    for item in jwks_data['keys']:
        if (item['kid'] == header_kid) : return item

def refresh_access_token(refresh_token) :
    CLIENT_ID = os.getenv('CLIENT_ID')
    USERNAME  = os.getenv('USERNAME')
    USER_POOL_ID    = os.getenv('USER_POOL_ID')
    
    cognito_client = boto3.client("cognito-idp", region_name = "us-east-1")
    
    response = cognito_client.admin_initiate_auth(
        AuthFlow = 'REFRESH_TOKEN_AUTH',
        AuthParameters = {
            'REFRESH_TOKEN' : refresh_token,
            'SECRET_HASH' : generate_secret_hash(USERNAME, CLIENT_ID)
        },
        UserPoolId = USER_POOL_ID,
        ClientId = CLIENT_ID
    )

    return response['AuthenticationResult']['AccessToken']

            

# ------------------------------- VALIDATION METHOD ------------------------------- #

def verify_jwt_token(access_token, refresh_token) :
    CLIENT_ID       = os.getenv('CLIENT_ID')
    COGNITO_IDP     = os.getenv('COGNITO_IDP')          
    USER_POOL_ID    = os.getenv('USER_POOL_ID')
   
    try : 
        key = get_key(access_token)
        response = jwt.decode(
            access_token, key,
            options = {
                "verify_iss": True,
                "iss" : f"https://{COGNITO_IDP}/{USER_POOL_ID}",
                "verify_signature": True,
                "verify_exp": True
            }
        )
        
        if (response['client_id'] == CLIENT_ID) : return access_token
        return -1 
    
    except ExpiredSignatureError as e: 
        print("Access token - ", e)
        access_token = refresh_access_token(refresh_token)
        return access_token
        
    except :
        print("NO COOKIE FOUND")
        return -1

def lambda_handler(event, context) :
    read_data()

    CLIENT_ID               = os.getenv('CLIENT_ID')
    CLOUDFRONT_DOMAIN       = os.getenv('CLOUDFRONT_DOMAIN')
    
    print("EVENT FROM CLOUDFRONT : ", event)
    
    request = event['Records'][0]['cf']['request']

    if (request['uri'].startswith('/parseauth')) :
        return exchange_code_for_token(request)

    cookie = None
    new_access_token = None 

    if 'cookie' in request['headers'] : 
        cookie = request['headers']['cookie'][0]['value'].split("=")[1]
        try : 
            token, username = cookie.split(":")
            os.environ['USERNAME'] = username
        except :
            token = cookie
        
        access_token, refresh_token = decode_base64(token).split(":")
        print("COOKIE VALUE : ", cookie)
        
        new_access_token = verify_jwt_token(access_token, refresh_token)
        print("ACCESS TOKEN DERIVE FROM VERIFICATION : ", access_token)

    if (cookie == None or new_access_token == -1) :
        print("FAIL VALIDATION, UNAUTHORIZE ACCESS !!!! REDIRECT TO SIGN-IN PAGE.")
        
        redirect_uri = quote(f"https://{CLOUDFRONT_DOMAIN}/parseauth", safe = "")
        response = {
            'status': '302',
            'statusDescription': 'Found',
            'headers': {
                'location': [{
                    'key': 'Location',
                    'value': f'https://stylcoggoogle.auth.us-east-1.amazoncognito.com/oauth2/authorize?client_id={CLIENT_ID}&response_type=code&scope=aws.cognito.signin.user.admin+email+openid+profile&redirect_uri={redirect_uri}' 
                    
                }]
            }
        }

        print("REDIRECT RESPONSE TO COGNITO SIGN-IN : ", response)
        return response

    else :
        if (new_access_token != access_token) : 
            my_cookie = cookies.SimpleCookie()
            my_cookie['token'] = f"{encode_base64(new_access_token, refresh_token)}:{username}"
            my_cookie['token']['max-age'] = 2592000
            my_cookie['token']['path'] = '/'
            my_cookie['token']['secure'] = True

            cookie_value = my_cookie.output(header = '')

            final_response = {
                'status': '302',
                'statusDescription': 'Found',
                'headers': {
                    'location': [{
                        'key': 'Location',
                        'value': '/'
                    }],

                    'set-cookie' : [{
                        'key': 'Set-Cookie',
                        'value' : cookie_value
                    }],

                    'cache-control': [{ 
                        'key': 'Cache-Control',
                        'value': 'no-cache'
                    }]
                }
            }

            print("SUCCESSFUL VALIDATION FINAL RESPONSE : ", final_response)
            return final_response

        
        else : 
            print("SUCCESSFUL VALIDATION ORIGIN : ", request)
            return request