OSNAP
unknown
python
2 years ago
6.6 kB
8
Indexable
import os import json import enum import time import uuid import requests from typing import Callable, Dict, Union from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption class Scope(enum.Enum): PUBLIC = "public" PRIVATE = "private" class SignatureUtil: @staticmethod def generate_key_pair(): private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) private_pem = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) public_pem = private_key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) return private_pem, public_pem @staticmethod def sign_data(private_key_pem, data): private_key = serialization.load_pem_private_key(private_key_pem, None) signature = private_key.sign(data.encode(), padding.PKCS1v15(), hashes.SHA256()) return signature @staticmethod def verify_signature(public_key_pem, data, signature): public_key = serialization.load_pem_public_key(public_key_pem) try: public_key.verify(signature, data.encode(), padding.PKCS1v15(), hashes.SHA256()) return True except Exception as e: return False class OSNAPAgent: def __init__(self, name: str, description: str, scope: Scope, info_endpoint: str, registry_url: str, handlers: Dict[str, Callable]): self.id = str(uuid.uuid4()) self.name = name self.description = description self.scope = scope self.info_endpoint = info_endpoint self.registry_url = registry_url self.handlers = handlers # Generate a key pair self.private_key_pem, self.public_key_pem = SignatureUtil.generate_key_pair() def register(self) -> None: data = { "agent_id": self.id, "name": self.name, "description": self.description, "scope": self.scope.value, "info_endpoint": self.info_endpoint, "public_key": self.public_key_pem } response = requests.post(f"{self.registry_url}/register", json=data) if response.status_code == 200: print("Agent registered successfully.") else: print("Failed to register agent:", response.text) def unregister(self) -> None: data = { "agent_id": self.id, } response = requests.post(f"{self.registry_url}/unregister", json=data) if response.status_code == 200: print("Agent unregistered successfully.") else: print("Failed to unregister agent:", response.text) def update_tools_on_registry(self) -> None: data = { "agent_id": self.id, "tools": self.handlers['get_tools']() } response = requests.post(f"{self.registry_url}/update_tools", json=data) if response.status_code == 200: print("Agent tools updated successfully.") else: print("Failed to update agent tools:", response.text) def send_request_to_agent(self, destination_agent: 'OSNAPAgent', request: OSNAPRequest) -> Union[OSNAPResponse, OSNAPError]: if self.handlers['request_validation'](request) and \ SignatureUtil.verify_signature(destination_agent.public_key_p_key_pem, request.payload, request.signature): response = destination_agent.process_request(request) if self.handlers['response_validation'](response) and \ SignatureUtil.verify_signature(destination_agent.public_key_pem, response.payload, response.signature): return response else: return OSNAPError("Invalid response signature") else: return OSNAPError("Invalid request signature") def process_request(self, request: OSNAPRequest) -> Union[OSNAPResponse, OSNAPError]: if request.request_type == "info": handler = self.handlers.get('info') if handler: return handler(request) else: return OSNAPError("Info handler not found") elif request.request_type == "task": handler = self.handlers.get(request.task_name) if handler: return handler(request) else: return OSNAPError("Task handler not found") else: return OSNAPError("Invalid request type") def create_osnap_request(self, request_type: str, task_name: str = None, priority: int = 0, request_metadata: Dict = None) -> OSNAPRequest: request = OSNAPRequest(requester_id=self.id, request_type=request_type, task_name=task_name, priority=priority, request_metadata=request_metadata) signature = SignatureUtil.sign_data(self.private_key_pem, request.payload) request.signature = signature return request def create_osnap_response(self, payload: Dict) -> OSNAPResponse: response = OSNAPResponse(payload=payload) signature = SignatureUtil.sign_data(self.private_key_pem, response.payload) response.signature = signature return response def create_osnap_error(self, message: str) -> OSNAPError: error = OSNAPError(message) signature = SignatureUtil.sign_data(self.private_key_pem, error.payload) error.signature = signature return error class OSNAPRequest: def __init__(self, requester_id: str, request_type: str, task_name: str = None, priority: int = 0, request_metadata: Dict = None): self.requester_id = requester_id self.request_type = request_type self.task_name = task_name self.priority = priority self.request_metadata = request_metadata or {} self.timestamp = time.time() self.payload = json.dumps({ "requester_id": self.requester_id, "request_type": self.request_type, "task_name": self.task_name, "priority": self.priority, "request_metadata": self.request_metadata, "timestamp": self.timestamp }) self.signature = None class OSNAPResponse: def __init__(self, payload: Dict): self.payload = json.dumps(payload) self.signature = None class OSNAPError: def __init__(self, message: str): self.payload = json.dumps({"error": message}) self.signature = None
Editor is loading...