Redis Server Implementation
unknown
python
3 years ago
2.5 kB
13
Indexable
import os
import logging
from flask import Flask, request, jsonify
from flask_restful import Api, Resource
from redis_helper import RedisHelper
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
api = Api(app)
redis_helper = RedisHelper()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AddAgent(Resource):
def post(self):
try:
agent_id = request.json['agent_id']
name = request.json['name']
description = request.json['description']
endpoint = request.json['endpoint']
tools = request.json['tools']
redis_helper.add_agent(agent_id, name, description, endpoint, tools)
return {"status": "SUCCESS", "agent_id": agent_id}
except Exception as e:
logger.exception(e)
return {"status": "ERROR", "message": str(e)}
class UpdateTools(Resource):
def put(self):
try:
agent_id = request.json['agent_id']
tools = request.json['tools']
redis_helper.update_tools(agent_id, tools)
return {"status": "SUCCESS", "agent_id": agent_id}
except Exception as e:
logger.exception(e)
return {"status": "ERROR", "message": str(e)}
class RemoveAgent(Resource):
def delete(self, agent_id):
try:
redis_helper.remove_agent(agent_id)
return {"status": "SUCCESS", "agent_id": agent_id}
except Exception as e:
logger.exception(e)
return {"status": "ERROR", "message": str(e)}
class SearchAgents(Resource):
def get(self, agent_id=None, search_query=None):
try:
if agent_id:
agent_data = redis_helper.get_agent_by_id(agent_id)
return agent_data if agent_data else {"status": "ERROR", "message": "Agent not found"}
else:
matching_agents = redis_helper.search_agents_by_tool_description(search_query)
return matching_agents
except Exception as e:
logger.exception(e)
return {"status": "ERROR", "message": str(e)}
api.add_resource(AddAgent, '/add_agent')
api.add_resource(UpdateTools, '/update_tools')
api.add_resource(RemoveAgent, '/remove_agent/<string:agent_id>')
api.add_resource(SearchAgents, '/search_agents/<string:agent_id>', '/search_agents', '/search_agents/<string:search_query>')
if __name__ == '__main__':
app.run(debug=True)
Editor is loading...