Outbound Agent
unknown
python
7 months ago
2.5 kB
5
Indexable
Never
import uuid import json import time from threading import Thread from queue import Queue class RedisRegistry: def __init__(self, host='localhost', port=6379, db=0): self.redis = redis.Redis(host=host, port=port, db=db) def register_agent(self, agent_info): serialized_agent_info = pickle.dumps(agent_info) self.redis.set(agent_info["agent_id"], serialized_agent_info) print("Agent registered:", agent_info) def send_message(self, message): print("Message sent:", message) class OutBoundAgent: def __init__(self, agent_id=None): self.agent_id = agent_id or str(uuid.uuid4()) self.capabilities = {} self.registry = DummyRegistry() self.message_queue = Queue() def register_capability(self, name, description, plugin): self.capabilities[name] = { "description": description, "plugin": plugin } def register_agent(self): agent_info = { "agent_id": self.agent_id, "capabilities": list(self.capabilities.keys()) } self.registry.register_agent(agent_info) def listen_for_messages(self): while True: if not self.message_queue.empty(): message = self.message_queue.get() self.process_message(message) time.sleep(1) def process_message(self, message): message_type = message.get("message_type") if message_type == "capabilities_query": self.handle_capabilities_query(message) def handle_capabilities_query(self, message): sender_agent_id = message["sender_agent_id"] capabilities_response = { "sender_agent_id": self.agent_id, "receiver_agent_id": sender_agent_id, "message_type": "capabilities_response", "capabilities": self.capabilities } self.registry.send_message(capabilities_response) def main(): agent = OutBoundAgent() # Example capability plugin def example_plugin(): return "This is an example plugin." agent.register_capability("example_capability", "An example capability.", example_plugin) agent.register_agent() # Start listening for messages message_listener_thread = Thread(target=agent.listen_for_messages) message_listener_thread.start() message_listener_thread.join() if __name__ == "__main__": main()