Redis Server Implementation
unknown
python
a year ago
2.5 kB
3
Indexable
Never
import os import logging from flask import Flask, request, jsonify from flask_restful import Api, Resource from redis_helper import RedisHelper from dotenv import load_dotenv load_dotenv() app = Flask(__name__) api = Api(app) redis_helper = RedisHelper() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AddAgent(Resource): def post(self): try: agent_id = request.json['agent_id'] name = request.json['name'] description = request.json['description'] endpoint = request.json['endpoint'] tools = request.json['tools'] redis_helper.add_agent(agent_id, name, description, endpoint, tools) return {"status": "SUCCESS", "agent_id": agent_id} except Exception as e: logger.exception(e) return {"status": "ERROR", "message": str(e)} class UpdateTools(Resource): def put(self): try: agent_id = request.json['agent_id'] tools = request.json['tools'] redis_helper.update_tools(agent_id, tools) return {"status": "SUCCESS", "agent_id": agent_id} except Exception as e: logger.exception(e) return {"status": "ERROR", "message": str(e)} class RemoveAgent(Resource): def delete(self, agent_id): try: redis_helper.remove_agent(agent_id) return {"status": "SUCCESS", "agent_id": agent_id} except Exception as e: logger.exception(e) return {"status": "ERROR", "message": str(e)} class SearchAgents(Resource): def get(self, agent_id=None, search_query=None): try: if agent_id: agent_data = redis_helper.get_agent_by_id(agent_id) return agent_data if agent_data else {"status": "ERROR", "message": "Agent not found"} else: matching_agents = redis_helper.search_agents_by_tool_description(search_query) return matching_agents except Exception as e: logger.exception(e) return {"status": "ERROR", "message": str(e)} api.add_resource(AddAgent, '/add_agent') api.add_resource(UpdateTools, '/update_tools') api.add_resource(RemoveAgent, '/remove_agent/<string:agent_id>') api.add_resource(SearchAgents, '/search_agents/<string:agent_id>', '/search_agents', '/search_agents/<string:search_query>') if __name__ == '__main__': app.run(debug=True)