FLASK APPLICATION

 avatar
unknown
plain_text
a year ago
11 kB
5
Indexable
from flask import Flask, request, jsonify, render_template, redirect
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.rfc6749 import grants
import time
import os
import logging
from crypto import *
import json
import base64
from authlib.jose import jwt
from datetime import datetime, timezone, timedelta

# Configure logging
logging.basicConfig(level=logging.DEBUG,  # Set to DEBUG to capture all messages
                    format='%(asctime)s %(levelname)s: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

os.environ['AUTHLIB_INSECURE_TRANSPORT'] = '1'
app = Flask(__name__)
app.config['SECRET_KEY'] = 'Z76TOUIG8YPOJEKHLVLWH8W1UDSXVEX1'

app.config['OAUTH2_REFRESH_TOKEN_GENERATOR'] = True
tokens = {}
codes = {}  



private_key = private_key
public_key = public_key
## For debugging purposes
f = open('private.txt', 'wb')
f.write(private_key)
f.close()
f = open('public.txt', 'wb')
f.write(public_key)
f.close()

users = {
    'user1': {
        'password': 'password1',
        "sub": "user1",
        "username": "user1",
        "name": "Diana Krall",
        "given_name": "Diana",
        "family_name": "Krall",
        "nickname": "Dina",
        "email": "diana@example.org",
        "email_verified": 'False',
        "phone_number": "+46 90 7865000",
        "Profile": 'Something',
        "address": {
            "street_address": "Umeå Universitet",
            "locality": "Umeå",
            "postal_code": "SE-90187",
            "country": "Sweden"
            }
        }
}

openid_configuration = {
    "issuer": "http://REDACTED/",
    "authorization_endpoint": "http://REDACTED/authorize",
    "token_endpoint": "http://REDACTED/token",
    "userinfo_endpoint": "http://REDACTED/userinfo",
    "jwks_uri": "http://REDACTED/.well-known/jwks.json",
    "response_types_supported": [
        "code",
        "token",
        "id_token",
        "code token",
        "code id_token",
        "token id_token",
        "code token id_token"
    ],
    "subject_types_supported": ["public"],
    "id_token_signing_alg_values_supported": ["RS256"],
    "scopes_supported": [
        "openid",
        "profile",
        "email",
        "address",
        "phone",    
        "offline_access"
    ],
    "token_endpoint_auth_methods_supported": [
        "client_secret_basic", 
        "client_secret_post"
    ],
    "claims_supported": [
        "sub", 
        "name", 
        "preferred_username", 
        "given_name", 
        "family_name", 
        "email", 
        "email_verified"
    ],

}


# Mock client class
class MockClient:
    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.response_types = ["code",
        "token",
        "id_token",
        "code token",
        "code id_token",
        "token id_token",
        "code token id_token"]
        self.redirect_uris = ['https://REDACTED.COM/sso-authenticate/REDACTED']
        
    def check_response_type(self, response_type):
        # Check if the client supports the given response type
        return response_type in self.response_types


    def check_redirect_uri(self, redirect_uri):
        return redirect_uri in self.redirect_uris
    
    def check_client_secret(self, client_secret):
        return client_secret
    
    def check_endpoint_auth_method(self, method, endpoint):
        # Check if the client is authorized to use the specified auth method at the endpoint
        # For simplicity, you might allow all methods, or implement specific checks
        allowed_methods = ['client_secret_basic', 'client_secret_post', 'none']
        return method
    
    def check_grant_type(self, grant_type):
        # Check if the client is allowed to use the specified grant type
        allowed_grant_types = [
            'authorization_code',
            'refresh_token',
            'password',
        ]
        return grant_type
    
    def get_allowed_scope(self, scope):
        return scope


def query_client(client_id):
    if client_id == 'test':
        client = MockClient(client_id='test', client_secret='Z76TOUIG8YPOJEKHLVLWH8W1UDSXVEX1')
        app.logger.debug(f"Found client: {client}")
        return client
    app.logger.warning(f"No client found for client_id: {client_id}")
    return None

def save_token(token, request):
    tokens[token['access_token']] = token
    app.logger.debug("Token saved successfully")

authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token)

class AuthorizationCode:
    def __init__(self, code, client_id, redirect_uri, scope, user_info, expires_in):
        self.code = code
        self.client_id = client_id
        self.redirect_uri = redirect_uri
        self.scope = scope
        self.user_info = user_info
        self.expires_in = expires_in

    def get_redirect_uri(self):
        return self.redirect_uri

    def get_scope(self):
        return self.scope

    def get_user(self):
        return self.user_info

    def is_expired(self):
        expired = time.time() > self.expires_in
        return expired


class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
    def authenticate_user(self, authorization_code):
        app.logger.debug(f"Authenticating user with authorization code: {authorization_code.code}")
        # Extract user info from the authorization code object
        user_info = users['user1']
        if user_info:
            app.logger.debug(f"User authenticated successfully: {user_info}")
            return user_info
        else:
            app.logger.warning(f"Authentication failed for code: {authorization_code.code}")
            return None
    
    def save_authorization_code(self, code, request):
        username = request.form.get('username')
        user_data = users.get(username, {})
        app.logger.debug(f"CURRENT USER DATA: {user_data}")
        codes[code] = {
            'client_id': request.client_id,
            'redirect_uri': request.redirect_uri,
            'scope': request.scope,
            'user': user_data,
            'expires_in': int(time.time()) + 36000
            }

    def query_authorization_code(self, code, client):
        code_data = codes.get(code)
        if code_data and code_data['client_id'] == client.client_id:
            return AuthorizationCode(
                code, 
                code_data['client_id'], 
                code_data['redirect_uri'], 
                code_data['scope'], 
                code_data['user'], 
                code_data['expires_in']
            )
        return None

    def delete_authorization_code(self, authorization_code):
        # For testing, remove the code from the dictionary
        # In a real application, this would involve a database operation
        codes.pop(authorization_code, None)


authorization.register_grant(AuthorizationCodeGrant)

@app.route('/.well-known/openid-configuration')
def openid_config():
    return jsonify(openid_configuration)


@app.route('/.well-known/jwks.json')
def jwks():
    return jsonify(jwk)

@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
    app.logger.debug("Authorize endpoint called")
    if request.method == 'GET':
        return render_template('authorize.html', request=request)
    try:
        app.logger.debug(f"POST data: {request.form}")
        user = request.form.get('username')
        if user in users and users[user]['password'] == request.form.get('password'):
            app.logger.debug(f"User authenticated: {user}")
            response = authorization.create_authorization_response(grant_user=user)
            return response
        else:
            app.logger.warning(f"Invalid username or password for user: {user}")
            return "Invalid username or password", 401
    except Exception as e:
        app.logger.error(f"Error in /authorize: {e}", exc_info=True)
        return "An internal server error occurred", 500




@app.route('/token', methods=['POST'])
def issue_token():
    response = authorization.create_token_response()
    if response.status_code == 200:
        token_data = response.json
        auth_code = request.form.get('code')
        code_obj = codes.get(auth_code)
        if code_obj:
            # Current UTC time
            current_utc = datetime.utcnow().replace(tzinfo=timezone.utc)
            id_token_expires_at = current_utc + timedelta(minutes=30)

            # Generate ID Token
            id_token_payload = {
                "iss": openid_configuration["issuer"],
                "sub": code_obj["user"]["sub"],
                "aud": code_obj["client_id"],
                "exp": int(id_token_expires_at.timestamp()),
                "iat": int(current_utc.timestamp()),
                "auth_time": int(current_utc.timestamp()),
                # Additional claims like nonce can be added here
            }

            # Sign ID Token
            signed_id_token = jwt.encode({'alg': 'RS256', 'kid': 'test'}, id_token_payload, private_key)
            app.logger.warning(f'Signed ID TOKEN: {signed_id_token}')

            # Include ID Token in response
            token_data['id_token'] = signed_id_token.decode('utf-8')  # Ensure it's a string


            # Expiry for the access token
            access_token_expires_in = int((current_utc + timedelta(seconds=3600)).timestamp()) - int(current_utc.timestamp())  # 1 hour from now

            tokens[token_data['access_token']] = {
                'user': code_obj['user'],
                'scope': token_data.get('scope'),
                'expires_in': access_token_expires_in
            }
             # Construct the response
            token_response = {
                "access_token": token_data['access_token'],
                "token_type": "Bearer",
                "expires_in": access_token_expires_in,
                "id_token": signed_id_token.decode('utf-8')
            }

            # If there's a refresh token or other data, include it too
            if 'refresh_token' in token_data:
                token_response['refresh_token'] = token_data['refresh_token']

            # Return the constructed response
            app.logger.debug(f"Token Response: {token_response}")
            return jsonify(token_response), 200
        else:
            app.logger.error("Authorization code not found")
            return "Authorization code not found", 400
    return response


@app.route('/userinfo')
def userinfo():
    auth_header = request.headers.get('Authorization')
    if auth_header:
        token_str = auth_header.split(' ')[1]
        token_data = tokens.get(token_str)
        if token_data and token_data['expires_in'] > time.time():
            username = token_data['user']
            user_info = users.get(username, {}).get('userinfo', {})
            requested_scopes = set(token_data.get('scope', '').split())
            filtered_info = {k: v for k, v in user_info.items() if k in requested_scopes}
            return jsonify(filtered_info)
        else:
            app.logger.warning("Invalid or expired token")
    else:
        app.logger.warning("No authorization header found")
    return jsonify({'error': 'invalid_token'}), 401




if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80)
Editor is loading...
Leave a Comment