Outbound Agent

 avatar
unknown
python
2 years ago
2.5 kB
6
Indexable
import uuid
import json
import time
from threading import Thread
from queue import Queue

class RedisRegistry:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis = redis.Redis(host=host, port=port, db=db)

    def register_agent(self, agent_info):
        serialized_agent_info = pickle.dumps(agent_info)
        self.redis.set(agent_info["agent_id"], serialized_agent_info)
        print("Agent registered:", agent_info)

    def send_message(self, message):
        print("Message sent:", message)

class OutBoundAgent:
    def __init__(self, agent_id=None):
        self.agent_id = agent_id or str(uuid.uuid4())
        self.capabilities = {}
        self.registry = DummyRegistry()
        self.message_queue = Queue()

    def register_capability(self, name, description, plugin):
        self.capabilities[name] = {
            "description": description,
            "plugin": plugin
        }

    def register_agent(self):
        agent_info = {
            "agent_id": self.agent_id,
            "capabilities": list(self.capabilities.keys())
        }
        self.registry.register_agent(agent_info)

    def listen_for_messages(self):
        while True:
            if not self.message_queue.empty():
                message = self.message_queue.get()
                self.process_message(message)
            time.sleep(1)

    def process_message(self, message):
        message_type = message.get("message_type")
        
        if message_type == "capabilities_query":
            self.handle_capabilities_query(message)

    def handle_capabilities_query(self, message):
        sender_agent_id = message["sender_agent_id"]
        capabilities_response = {
            "sender_agent_id": self.agent_id,
            "receiver_agent_id": sender_agent_id,
            "message_type": "capabilities_response",
            "capabilities": self.capabilities
        }
        self.registry.send_message(capabilities_response)

def main():
    agent = OutBoundAgent()

    # Example capability plugin
    def example_plugin():
        return "This is an example plugin."

    agent.register_capability("example_capability", "An example capability.", example_plugin)
    agent.register_agent()

    # Start listening for messages
    message_listener_thread = Thread(target=agent.listen_for_messages)
    message_listener_thread.start()
    message_listener_thread.join()

if __name__ == "__main__":
    main()