FLASK APPLICATION
unknown
plain_text
2 years ago
11 kB
8
Indexable
from flask import Flask, request, jsonify, render_template, redirect
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.rfc6749 import grants
import time
import os
import logging
from crypto import *
import json
import base64
from authlib.jose import jwt
from datetime import datetime, timezone, timedelta
# Configure logging
logging.basicConfig(level=logging.DEBUG, # Set to DEBUG to capture all messages
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
os.environ['AUTHLIB_INSECURE_TRANSPORT'] = '1'
app = Flask(__name__)
app.config['SECRET_KEY'] = 'Z76TOUIG8YPOJEKHLVLWH8W1UDSXVEX1'
app.config['OAUTH2_REFRESH_TOKEN_GENERATOR'] = True
tokens = {}
codes = {}
private_key = private_key
public_key = public_key
## For debugging purposes
f = open('private.txt', 'wb')
f.write(private_key)
f.close()
f = open('public.txt', 'wb')
f.write(public_key)
f.close()
users = {
'user1': {
'password': 'password1',
"sub": "user1",
"username": "user1",
"name": "Diana Krall",
"given_name": "Diana",
"family_name": "Krall",
"nickname": "Dina",
"email": "diana@example.org",
"email_verified": 'False',
"phone_number": "+46 90 7865000",
"Profile": 'Something',
"address": {
"street_address": "Umeå Universitet",
"locality": "Umeå",
"postal_code": "SE-90187",
"country": "Sweden"
}
}
}
openid_configuration = {
"issuer": "http://REDACTED/",
"authorization_endpoint": "http://REDACTED/authorize",
"token_endpoint": "http://REDACTED/token",
"userinfo_endpoint": "http://REDACTED/userinfo",
"jwks_uri": "http://REDACTED/.well-known/jwks.json",
"response_types_supported": [
"code",
"token",
"id_token",
"code token",
"code id_token",
"token id_token",
"code token id_token"
],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": [
"openid",
"profile",
"email",
"address",
"phone",
"offline_access"
],
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post"
],
"claims_supported": [
"sub",
"name",
"preferred_username",
"given_name",
"family_name",
"email",
"email_verified"
],
}
# Mock client class
class MockClient:
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.response_types = ["code",
"token",
"id_token",
"code token",
"code id_token",
"token id_token",
"code token id_token"]
self.redirect_uris = ['https://REDACTED.COM/sso-authenticate/REDACTED']
def check_response_type(self, response_type):
# Check if the client supports the given response type
return response_type in self.response_types
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris
def check_client_secret(self, client_secret):
return client_secret
def check_endpoint_auth_method(self, method, endpoint):
# Check if the client is authorized to use the specified auth method at the endpoint
# For simplicity, you might allow all methods, or implement specific checks
allowed_methods = ['client_secret_basic', 'client_secret_post', 'none']
return method
def check_grant_type(self, grant_type):
# Check if the client is allowed to use the specified grant type
allowed_grant_types = [
'authorization_code',
'refresh_token',
'password',
]
return grant_type
def get_allowed_scope(self, scope):
return scope
def query_client(client_id):
if client_id == 'test':
client = MockClient(client_id='test', client_secret='Z76TOUIG8YPOJEKHLVLWH8W1UDSXVEX1')
app.logger.debug(f"Found client: {client}")
return client
app.logger.warning(f"No client found for client_id: {client_id}")
return None
def save_token(token, request):
tokens[token['access_token']] = token
app.logger.debug("Token saved successfully")
authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token)
class AuthorizationCode:
def __init__(self, code, client_id, redirect_uri, scope, user_info, expires_in):
self.code = code
self.client_id = client_id
self.redirect_uri = redirect_uri
self.scope = scope
self.user_info = user_info
self.expires_in = expires_in
def get_redirect_uri(self):
return self.redirect_uri
def get_scope(self):
return self.scope
def get_user(self):
return self.user_info
def is_expired(self):
expired = time.time() > self.expires_in
return expired
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
def authenticate_user(self, authorization_code):
app.logger.debug(f"Authenticating user with authorization code: {authorization_code.code}")
# Extract user info from the authorization code object
user_info = users['user1']
if user_info:
app.logger.debug(f"User authenticated successfully: {user_info}")
return user_info
else:
app.logger.warning(f"Authentication failed for code: {authorization_code.code}")
return None
def save_authorization_code(self, code, request):
username = request.form.get('username')
user_data = users.get(username, {})
app.logger.debug(f"CURRENT USER DATA: {user_data}")
codes[code] = {
'client_id': request.client_id,
'redirect_uri': request.redirect_uri,
'scope': request.scope,
'user': user_data,
'expires_in': int(time.time()) + 36000
}
def query_authorization_code(self, code, client):
code_data = codes.get(code)
if code_data and code_data['client_id'] == client.client_id:
return AuthorizationCode(
code,
code_data['client_id'],
code_data['redirect_uri'],
code_data['scope'],
code_data['user'],
code_data['expires_in']
)
return None
def delete_authorization_code(self, authorization_code):
# For testing, remove the code from the dictionary
# In a real application, this would involve a database operation
codes.pop(authorization_code, None)
authorization.register_grant(AuthorizationCodeGrant)
@app.route('/.well-known/openid-configuration')
def openid_config():
return jsonify(openid_configuration)
@app.route('/.well-known/jwks.json')
def jwks():
return jsonify(jwk)
@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
app.logger.debug("Authorize endpoint called")
if request.method == 'GET':
return render_template('authorize.html', request=request)
try:
app.logger.debug(f"POST data: {request.form}")
user = request.form.get('username')
if user in users and users[user]['password'] == request.form.get('password'):
app.logger.debug(f"User authenticated: {user}")
response = authorization.create_authorization_response(grant_user=user)
return response
else:
app.logger.warning(f"Invalid username or password for user: {user}")
return "Invalid username or password", 401
except Exception as e:
app.logger.error(f"Error in /authorize: {e}", exc_info=True)
return "An internal server error occurred", 500
@app.route('/token', methods=['POST'])
def issue_token():
response = authorization.create_token_response()
if response.status_code == 200:
token_data = response.json
auth_code = request.form.get('code')
code_obj = codes.get(auth_code)
if code_obj:
# Current UTC time
current_utc = datetime.utcnow().replace(tzinfo=timezone.utc)
id_token_expires_at = current_utc + timedelta(minutes=30)
# Generate ID Token
id_token_payload = {
"iss": openid_configuration["issuer"],
"sub": code_obj["user"]["sub"],
"aud": code_obj["client_id"],
"exp": int(id_token_expires_at.timestamp()),
"iat": int(current_utc.timestamp()),
"auth_time": int(current_utc.timestamp()),
# Additional claims like nonce can be added here
}
# Sign ID Token
signed_id_token = jwt.encode({'alg': 'RS256', 'kid': 'test'}, id_token_payload, private_key)
app.logger.warning(f'Signed ID TOKEN: {signed_id_token}')
# Include ID Token in response
token_data['id_token'] = signed_id_token.decode('utf-8') # Ensure it's a string
# Expiry for the access token
access_token_expires_in = int((current_utc + timedelta(seconds=3600)).timestamp()) - int(current_utc.timestamp()) # 1 hour from now
tokens[token_data['access_token']] = {
'user': code_obj['user'],
'scope': token_data.get('scope'),
'expires_in': access_token_expires_in
}
# Construct the response
token_response = {
"access_token": token_data['access_token'],
"token_type": "Bearer",
"expires_in": access_token_expires_in,
"id_token": signed_id_token.decode('utf-8')
}
# If there's a refresh token or other data, include it too
if 'refresh_token' in token_data:
token_response['refresh_token'] = token_data['refresh_token']
# Return the constructed response
app.logger.debug(f"Token Response: {token_response}")
return jsonify(token_response), 200
else:
app.logger.error("Authorization code not found")
return "Authorization code not found", 400
return response
@app.route('/userinfo')
def userinfo():
auth_header = request.headers.get('Authorization')
if auth_header:
token_str = auth_header.split(' ')[1]
token_data = tokens.get(token_str)
if token_data and token_data['expires_in'] > time.time():
username = token_data['user']
user_info = users.get(username, {}).get('userinfo', {})
requested_scopes = set(token_data.get('scope', '').split())
filtered_info = {k: v for k, v in user_info.items() if k in requested_scopes}
return jsonify(filtered_info)
else:
app.logger.warning("Invalid or expired token")
else:
app.logger.warning("No authorization header found")
return jsonify({'error': 'invalid_token'}), 401
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80)
Editor is loading...
Leave a Comment