FLASK APPLICATION
unknown
plain_text
a year ago
11 kB
5
Indexable
from flask import Flask, request, jsonify, render_template, redirect from authlib.integrations.flask_oauth2 import AuthorizationServer from authlib.oauth2.rfc6749 import grants import time import os import logging from crypto import * import json import base64 from authlib.jose import jwt from datetime import datetime, timezone, timedelta # Configure logging logging.basicConfig(level=logging.DEBUG, # Set to DEBUG to capture all messages format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') os.environ['AUTHLIB_INSECURE_TRANSPORT'] = '1' app = Flask(__name__) app.config['SECRET_KEY'] = 'Z76TOUIG8YPOJEKHLVLWH8W1UDSXVEX1' app.config['OAUTH2_REFRESH_TOKEN_GENERATOR'] = True tokens = {} codes = {} private_key = private_key public_key = public_key ## For debugging purposes f = open('private.txt', 'wb') f.write(private_key) f.close() f = open('public.txt', 'wb') f.write(public_key) f.close() users = { 'user1': { 'password': 'password1', "sub": "user1", "username": "user1", "name": "Diana Krall", "given_name": "Diana", "family_name": "Krall", "nickname": "Dina", "email": "diana@example.org", "email_verified": 'False', "phone_number": "+46 90 7865000", "Profile": 'Something', "address": { "street_address": "Umeå Universitet", "locality": "Umeå", "postal_code": "SE-90187", "country": "Sweden" } } } openid_configuration = { "issuer": "http://REDACTED/", "authorization_endpoint": "http://REDACTED/authorize", "token_endpoint": "http://REDACTED/token", "userinfo_endpoint": "http://REDACTED/userinfo", "jwks_uri": "http://REDACTED/.well-known/jwks.json", "response_types_supported": [ "code", "token", "id_token", "code token", "code id_token", "token id_token", "code token id_token" ], "subject_types_supported": ["public"], "id_token_signing_alg_values_supported": ["RS256"], "scopes_supported": [ "openid", "profile", "email", "address", "phone", "offline_access" ], "token_endpoint_auth_methods_supported": [ "client_secret_basic", "client_secret_post" ], "claims_supported": [ "sub", "name", "preferred_username", "given_name", "family_name", "email", "email_verified" ], } # Mock client class class MockClient: def __init__(self, client_id, client_secret): self.client_id = client_id self.client_secret = client_secret self.response_types = ["code", "token", "id_token", "code token", "code id_token", "token id_token", "code token id_token"] self.redirect_uris = ['https://REDACTED.COM/sso-authenticate/REDACTED'] def check_response_type(self, response_type): # Check if the client supports the given response type return response_type in self.response_types def check_redirect_uri(self, redirect_uri): return redirect_uri in self.redirect_uris def check_client_secret(self, client_secret): return client_secret def check_endpoint_auth_method(self, method, endpoint): # Check if the client is authorized to use the specified auth method at the endpoint # For simplicity, you might allow all methods, or implement specific checks allowed_methods = ['client_secret_basic', 'client_secret_post', 'none'] return method def check_grant_type(self, grant_type): # Check if the client is allowed to use the specified grant type allowed_grant_types = [ 'authorization_code', 'refresh_token', 'password', ] return grant_type def get_allowed_scope(self, scope): return scope def query_client(client_id): if client_id == 'test': client = MockClient(client_id='test', client_secret='Z76TOUIG8YPOJEKHLVLWH8W1UDSXVEX1') app.logger.debug(f"Found client: {client}") return client app.logger.warning(f"No client found for client_id: {client_id}") return None def save_token(token, request): tokens[token['access_token']] = token app.logger.debug("Token saved successfully") authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token) class AuthorizationCode: def __init__(self, code, client_id, redirect_uri, scope, user_info, expires_in): self.code = code self.client_id = client_id self.redirect_uri = redirect_uri self.scope = scope self.user_info = user_info self.expires_in = expires_in def get_redirect_uri(self): return self.redirect_uri def get_scope(self): return self.scope def get_user(self): return self.user_info def is_expired(self): expired = time.time() > self.expires_in return expired class AuthorizationCodeGrant(grants.AuthorizationCodeGrant): def authenticate_user(self, authorization_code): app.logger.debug(f"Authenticating user with authorization code: {authorization_code.code}") # Extract user info from the authorization code object user_info = users['user1'] if user_info: app.logger.debug(f"User authenticated successfully: {user_info}") return user_info else: app.logger.warning(f"Authentication failed for code: {authorization_code.code}") return None def save_authorization_code(self, code, request): username = request.form.get('username') user_data = users.get(username, {}) app.logger.debug(f"CURRENT USER DATA: {user_data}") codes[code] = { 'client_id': request.client_id, 'redirect_uri': request.redirect_uri, 'scope': request.scope, 'user': user_data, 'expires_in': int(time.time()) + 36000 } def query_authorization_code(self, code, client): code_data = codes.get(code) if code_data and code_data['client_id'] == client.client_id: return AuthorizationCode( code, code_data['client_id'], code_data['redirect_uri'], code_data['scope'], code_data['user'], code_data['expires_in'] ) return None def delete_authorization_code(self, authorization_code): # For testing, remove the code from the dictionary # In a real application, this would involve a database operation codes.pop(authorization_code, None) authorization.register_grant(AuthorizationCodeGrant) @app.route('/.well-known/openid-configuration') def openid_config(): return jsonify(openid_configuration) @app.route('/.well-known/jwks.json') def jwks(): return jsonify(jwk) @app.route('/authorize', methods=['GET', 'POST']) def authorize(): app.logger.debug("Authorize endpoint called") if request.method == 'GET': return render_template('authorize.html', request=request) try: app.logger.debug(f"POST data: {request.form}") user = request.form.get('username') if user in users and users[user]['password'] == request.form.get('password'): app.logger.debug(f"User authenticated: {user}") response = authorization.create_authorization_response(grant_user=user) return response else: app.logger.warning(f"Invalid username or password for user: {user}") return "Invalid username or password", 401 except Exception as e: app.logger.error(f"Error in /authorize: {e}", exc_info=True) return "An internal server error occurred", 500 @app.route('/token', methods=['POST']) def issue_token(): response = authorization.create_token_response() if response.status_code == 200: token_data = response.json auth_code = request.form.get('code') code_obj = codes.get(auth_code) if code_obj: # Current UTC time current_utc = datetime.utcnow().replace(tzinfo=timezone.utc) id_token_expires_at = current_utc + timedelta(minutes=30) # Generate ID Token id_token_payload = { "iss": openid_configuration["issuer"], "sub": code_obj["user"]["sub"], "aud": code_obj["client_id"], "exp": int(id_token_expires_at.timestamp()), "iat": int(current_utc.timestamp()), "auth_time": int(current_utc.timestamp()), # Additional claims like nonce can be added here } # Sign ID Token signed_id_token = jwt.encode({'alg': 'RS256', 'kid': 'test'}, id_token_payload, private_key) app.logger.warning(f'Signed ID TOKEN: {signed_id_token}') # Include ID Token in response token_data['id_token'] = signed_id_token.decode('utf-8') # Ensure it's a string # Expiry for the access token access_token_expires_in = int((current_utc + timedelta(seconds=3600)).timestamp()) - int(current_utc.timestamp()) # 1 hour from now tokens[token_data['access_token']] = { 'user': code_obj['user'], 'scope': token_data.get('scope'), 'expires_in': access_token_expires_in } # Construct the response token_response = { "access_token": token_data['access_token'], "token_type": "Bearer", "expires_in": access_token_expires_in, "id_token": signed_id_token.decode('utf-8') } # If there's a refresh token or other data, include it too if 'refresh_token' in token_data: token_response['refresh_token'] = token_data['refresh_token'] # Return the constructed response app.logger.debug(f"Token Response: {token_response}") return jsonify(token_response), 200 else: app.logger.error("Authorization code not found") return "Authorization code not found", 400 return response @app.route('/userinfo') def userinfo(): auth_header = request.headers.get('Authorization') if auth_header: token_str = auth_header.split(' ')[1] token_data = tokens.get(token_str) if token_data and token_data['expires_in'] > time.time(): username = token_data['user'] user_info = users.get(username, {}).get('userinfo', {}) requested_scopes = set(token_data.get('scope', '').split()) filtered_info = {k: v for k, v in user_info.items() if k in requested_scopes} return jsonify(filtered_info) else: app.logger.warning("Invalid or expired token") else: app.logger.warning("No authorization header found") return jsonify({'error': 'invalid_token'}), 401 if __name__ == '__main__': app.run(host='0.0.0.0', port=80)
Editor is loading...
Leave a Comment